Source code for pymialsrtk.pipelines.anatomical.preprocessing

# Copyright © 2016-2023 Medical Image Analysis Laboratory, University Hospital
# Center and University of Lausanne (UNIL-CHUV), Switzerland
#
#  This software is distributed under the open-source license Modified BSD.

"""Module for the preprocessing pipeline."""

import os

from nipype import config
from nipype import logging as nipype_logging
from nipype.pipeline import engine as pe

import pymialsrtk.interfaces.utils as utils

# Import the implemented interface from pymialsrtk
from pymialsrtk.workflows.input_stage import create_input_stage
import pymialsrtk.workflows.preproc_stage as preproc_stage
from pymialsrtk.workflows.output_stage import create_preproc_output_stage
from pymialsrtk.workflows.registration_stage import create_registration_stage
from .abstract import AbstractAnatomicalPipeline


[docs]class PreprocessingPipeline(AbstractAnatomicalPipeline): """Class used to represent the workflow of the Preprocessing pipeline. Attributes ----------- m_bids_dir : string BIDS root directory (required) m_output_dir : string Output derivatives directory (required) m_subject : string Subject ID (in the form ``sub-XX``) m_wf : nipype.pipeline.Workflow Nipype workflow of the preprocessing pipeline m_sr_id : string ID of the preprocessing useful to distinguish when multiple preprocessing with different order of stacks are run on the same subject m_session : string Session ID if applicable (in the form ``ses-YY``) m_stacks : list(int) List of stack to be used in the preprocessing. The specified order is kept if `skip_stacks_ordering` is True. m_masks_derivatives_dir : string directory basename in BIDS directory derivatives where to search for masks (optional) m_do_nlm_denoising : bool Whether the NLM denoising preprocessing should be performed prior to motion estimation. (default is False) m_skip_stacks_ordering : bool (optional) Whether the automatic stacks ordering should be skipped. (default is False) Examples -------- >>> from pymialsrtk.pipelines.anatomical.srr import PreprocessingPipeline >>> # Create a new instance >>> pipeline = PreprocessingPipeline(bids_dir='/path/to/bids_dir', output_dir='/path/to/output_dir', subject='sub-01', p_stacks=[1,3,2,0], sr_id=1, session=None, paramTV={deltatTV = "0.001", lambdaTV = "0.75", num_primal_dual_loops = "20"}, masks_derivatives_dir="/custom/mask_dir", masks_desc=None, p_dict_custom_interfaces=None) >>> # Create the super resolution Nipype workflow >>> pipeline.create_workflow() >>> # Execute the workflow >>> res = pipeline.run(number_of_cores=1) # doctest: +SKIP """ m_pipeline_name = "preproc_pipeline" # Custom interfaces options m_do_nlm_denoising = None m_skip_stacks_ordering = None m_do_registration = None m_skip_svr = None def __init__( self, p_bids_dir, p_output_dir, p_subject, p_ga=None, p_stacks=None, p_sr_id=1, p_session=None, p_masks_derivatives_dir=None, p_masks_desc=None, p_dict_custom_interfaces=None, p_verbose=None, p_openmp_number_of_cores=None, p_nipype_number_of_cores=None, ): """Constructor of PreprocessingPipeline class instance.""" super().__init__( p_bids_dir, p_output_dir, p_subject, p_ga, p_stacks, p_sr_id, p_session, p_masks_derivatives_dir, p_masks_desc, p_dict_custom_interfaces, p_verbose, p_openmp_number_of_cores, p_nipype_number_of_cores, "pre", ) if p_dict_custom_interfaces is not None: self.m_skip_preprocessing = ( p_dict_custom_interfaces["skip_preprocessing"] if "skip_preprocessing" in p_dict_custom_interfaces.keys() else False ) self.m_do_nlm_denoising = ( p_dict_custom_interfaces["do_nlm_denoising"] if "do_nlm_denoising" in p_dict_custom_interfaces.keys() else False ) self.m_skip_stacks_ordering = ( p_dict_custom_interfaces["skip_stacks_ordering"] if ( (self.m_stacks is not None) and ( "skip_stacks_ordering" in p_dict_custom_interfaces.keys() ) ) else False ) self.m_do_registration = ( p_dict_custom_interfaces["preproc_do_registration"] if "preproc_do_registration" in p_dict_custom_interfaces.keys() else False ) self.m_skip_svr = ( p_dict_custom_interfaces["skip_svr"] if "skip_svr" in p_dict_custom_interfaces.keys() else False ) self.check_parameters_integrity(p_dict_custom_interfaces) else: self.m_skip_preprocessing = False self.m_do_nlm_denoising = False self.m_skip_stacks_ordering = False self.m_do_registration = False self.m_skip_svr = False if self.m_skip_preprocessing: if self.m_do_nlm_denoising: raise RuntimeError( "`do_nlm denoising` is incompatible with `skip_preprocessing`." )
[docs] def check_parameters_integrity(self, p_dict_custom_interfaces): """Check parameters integrity. This checks whether the custom interfaces dictionary contains only keys that are used in preprocessing, and raises an exception if it doesn't. Parameters ---------- p_dict_custom_interfaces : dict dictionary of custom inferfaces for a given subject that is to be processed. """ forbidden_keys = [ "do_refine_hr_mask", "do_reconstruct_labels", "do_anat_orientation" "do_multi_parameters", "do_srr_assessment", ] for k in forbidden_keys: if ( k in p_dict_custom_interfaces.keys() and p_dict_custom_interfaces[k] ): raise RuntimeError( f"{k} should not be enabled " f"when run_type=preprocessing." )
[docs] def create_workflow(self): """Create the Niype workflow of the super-resolution pipeline. It is composed of a succession of Nodes and their corresponding parameters, where the output of node i goes to the input of node i+1. """ self.m_wf = pe.Workflow( name=self.m_pipeline_name, base_dir=self.m_wf_base_dir ) self.m_wf.config["logging"] = { "log_directory": os.path.join(self.m_wf_base_dir), "log_to_file": True, } self.m_wf.config["execution"] = { "remove_unnecessary_outputs": True, "stop_on_first_crash": True, "stop_on_first_rerun": True, "crashfile_format": "txt", "use_relative_paths": True, "write_provenance": False, } config.update_config(self.m_wf.config) # Update nypipe logging with config nipype_logging.update_logging(config) # config.enable_provenance() input_stage = create_input_stage( p_bids_dir=self.m_bids_dir, p_sub_ses=self.m_sub_ses, p_sub_path=self.m_sub_path, p_use_manual_masks=self.m_use_manual_masks, p_masks_desc=self.m_masks_desc, p_masks_derivatives_dir=self.m_masks_derivatives_dir, p_labels_derivatives_dir=None, p_skip_stacks_ordering=self.m_skip_stacks_ordering, p_do_reconstruct_labels=False, p_stacks=self.m_stacks, p_do_srr_assessment=False, p_verbose=self.m_verbose, name="input_mgmt_stage", ) preprocessing_stage = preproc_stage.create_preproc_stage( p_skip_preprocessing=self.m_skip_preprocessing, p_do_nlm_denoising=self.m_do_nlm_denoising, p_do_reconstruct_labels=False, p_verbose=self.m_verbose, ) preproc_mgmt_stage = create_preproc_output_stage( self.m_sub_ses, self.m_sr_id, self.m_run_type, self.m_use_manual_masks, p_do_nlm_denoising=self.m_do_nlm_denoising, p_do_registration=self.m_do_registration, name="preproc_mgmt_stage", ) preproc_mgmt_stage.inputs.inputnode.final_res_dir = ( self.m_final_res_dir ) if self.m_do_registration: registration_stage = create_registration_stage( p_do_nlm_denoising=self.m_do_nlm_denoising, p_skip_svr=self.m_skip_svr, p_sub_ses=self.m_sub_ses, p_verbose=self.m_verbose, ) # Build workflow : connections of the nodes # Nodes ready : Linking now self.m_wf.connect( input_stage, "outputnode.t2ws_filtered", preprocessing_stage, "inputnode.input_images", ) self.m_wf.connect( input_stage, "outputnode.masks_filtered", preprocessing_stage, "inputnode.input_masks", ) if self.m_do_registration: if self.m_do_nlm_denoising: self.m_wf.connect( preprocessing_stage, ("outputnode.output_images_nlm", utils.sort_ascending), registration_stage, "inputnode.input_images_nlm", ) self.m_wf.connect( preprocessing_stage, ("outputnode.output_images", utils.sort_ascending), registration_stage, "inputnode.input_images", ) self.m_wf.connect( preprocessing_stage, ("outputnode.output_masks", utils.sort_ascending), registration_stage, "inputnode.input_masks", ) self.m_wf.connect( input_stage, "outputnode.stacks_order", registration_stage, "inputnode.stacks_order", ) self.m_wf.connect( registration_stage, "outputnode.output_sdi", preproc_mgmt_stage, "inputnode.input_sdi", ) self.m_wf.connect( registration_stage, "outputnode.output_transforms", preproc_mgmt_stage, "inputnode.input_transforms", ) else: self.m_wf.connect( preprocessing_stage, ("outputnode.output_images", utils.sort_ascending), preproc_mgmt_stage, "inputnode.input_images", ) self.m_wf.connect( input_stage, "outputnode.stacks_order", preproc_mgmt_stage, "inputnode.stacks_order", ) self.m_wf.connect( preprocessing_stage, "outputnode.output_masks", preproc_mgmt_stage, "inputnode.input_masks", ) if self.m_do_nlm_denoising: self.m_wf.connect( preprocessing_stage, ("outputnode.output_images_nlm", utils.sort_ascending), preproc_mgmt_stage, "inputnode.input_images_nlm", ) if not self.m_skip_stacks_ordering: self.m_wf.connect( input_stage, "outputnode.report_image", preproc_mgmt_stage, "inputnode.report_image", ) self.m_wf.connect( input_stage, "outputnode.motion_tsv", preproc_mgmt_stage, "inputnode.motion_tsv", )