Source code for pymialsrtk.pipelines.anatomical.abstract

# Copyright © 2016-2023 Medical Image Analysis Laboratory, University Hospital
# Center and University of Lausanne (UNIL-CHUV), Switzerland
#
#  This software is distributed under the open-source license Modified BSD.

"""Abstract base class for the anatomical pipeline."""

import abc
import os
import shutil
from datetime import datetime
from nipype.info import __version__ as __nipype_version__
from nipype import logging as nipype_logging

# Import the implemented interface from pymialsrtk
from pymialsrtk.bids.utils import write_bids_derivative_description

# Get pymialsrtk version
from pymialsrtk.info import __version__


[docs]class AbstractAnatomicalPipeline: """Class used to represent the workflow of the anatomical pipeline. Attributes ----------- m_bids_dir : string BIDS root directory (required) m_output_dir : string Output derivatives directory (required) m_subject : string Subject ID (in the form ``sub-XX``) m_wf : nipype.pipeline.Workflow Nipype workflow of the reconstruction pipeline m_sr_id : string ID of the reconstruction useful to distinguish when multiple reconstructions with different order of stacks are run on the same subject m_session : string Session ID if applicable (in the form ``ses-YY``) m_stacks : list(int) List of stack to be used in the reconstruction. The specified order is kept if `skip_stacks_ordering` is True. m_masks_derivatives_dir : string directory basename in BIDS directory derivatives where to search for masks (optional) m_do_nlm_denoising : bool Whether the NLM denoising preprocessing should be performed prior to motion estimation. (default is False) m_skip_stacks_ordering : bool (optional) Whether the automatic stacks ordering should be skipped. (default is False) Examples -------- >>> from pymialsrtk.pipelines.anatomical.srr import AnatomicalPipeline >>> # Create a new instance >>> pipeline = AnatomicalPipeline(bids_dir='/path/to/bids_dir', output_dir='/path/to/output_dir', subject='sub-01', p_stacks=[1,3,2,0], sr_id=1, session=None, paramTV={deltat_TV = "0.001", lambda_TV = "0.75", num_primal_dual_loops = "20"}, masks_derivatives_dir="/custom/mask_dir", masks_desc=None, p_dict_custom_interfaces=None) >>> # Create the super resolution Nipype workflow >>> pipeline.create_workflow() >>> # Execute the workflow >>> res = pipeline.run(number_of_cores=1) # doctest: +SKIP """ m_pipeline_name = None m_run_start_time = None m_run_end_time = None m_run_elapsed_time = None m_bids_dir = None m_output_dir = None m_subject = None m_wf = None m_sr_id = None m_session = None m_stacks = None m_masks_derivatives_dir = None m_use_manual_masks = False m_masks_desc = None m_verbose = None m_openmp_number_of_cores = None m_nipype_number_of_cores = None m_sub_ses = None m_sub_path = None m_wf_base_dir = None m_final_res_dir = None def __init__( self, p_bids_dir, p_output_dir, p_subject, p_ga=None, p_stacks=None, p_sr_id=1, p_session=None, p_masks_derivatives_dir=None, p_masks_desc=None, p_dict_custom_interfaces=None, p_verbose=None, p_openmp_number_of_cores=None, p_nipype_number_of_cores=None, p_run_type=None, ): """Constructor of AnatomicalPipeline class instance.""" # BIDS processing parameters self.m_bids_dir = p_bids_dir self.m_output_dir = p_output_dir self.m_subject = p_subject self.m_ga = p_ga self.m_sr_id = p_sr_id self.m_session = p_session self.m_stacks = p_stacks self.m_run_type = p_run_type self.m_verbose = p_verbose self.m_openmp_number_of_cores = p_openmp_number_of_cores self.m_nipype_number_of_cores = p_nipype_number_of_cores # Use manual/custom brain masks # If masks directory is not specified use the # automated brain extraction method. self.m_masks_derivatives_dir = p_masks_derivatives_dir self.m_use_manual_masks = ( True if self.m_masks_derivatives_dir is not None else False ) self.m_masks_desc = p_masks_desc if self.m_use_manual_masks else None self.m_sub_ses = self.m_subject self.m_sub_path = self.m_subject if self.m_session is not None: self.m_sub_ses = "".join([self.m_sub_ses, "_", self.m_session]) self.m_sub_path = os.path.join(self.m_subject, self.m_session) self.m_wf_base_dir = os.path.join( self.m_output_dir, "-".join(["nipype", __nipype_version__]), self.m_sub_path, f"{self.m_run_type}-{self.m_sr_id}", ) self.m_final_res_dir = os.path.join( self.m_output_dir, "-".join(["pymialsrtk", __version__]), self.m_sub_path, ) if not os.path.exists(self.m_wf_base_dir): os.makedirs(self.m_wf_base_dir) print("Process directory: {}".format(self.m_wf_base_dir)) # Initialization (Not sure we can control the name of nipype log) if os.path.isfile(os.path.join(self.m_wf_base_dir, "pypeline.log")): os.unlink(os.path.join(self.m_wf_base_dir, "pypeline.log"))
[docs] @abc.abstractmethod def create_workflow(self): """Create the Niype workflow of the super-resolution pipeline. It is composed of a succession of Nodes and their corresponding parameters, where the output of node i goes to the input of node i+1. The more specific definition given in each node implementing the method. """
[docs] def run(self, memory=None, logger=None): """Execute the workflow of the super-resolution reconstruction pipeline. Nipype execution engine will take care of the management and execution of all processing steps involved in the super-resolution reconstruction pipeline. Note that the complete execution graph is saved as a PNG image to support transparency on the whole processing. Parameters ---------- memory : int Maximal memory used by the workflow """ # Use nipype.interface logger to print some information messages if logger: iflogger = logger else: iflogger = nipype_logging.getLogger("nipype.interface") iflogger.info("**** Workflow graph creation ****") self.m_wf.write_graph( dotfilename="graph.dot", graph2use="colored", format="png", simple_form=True, ) # Copy and rename the generated "graph.png" image src = os.path.join(self.m_wf.base_dir, self.m_wf.name, "graph.png") # String formatting for saving subject_str = f"{self.m_subject}" dst_base = os.path.join( self.m_output_dir, "-".join(["pymialsrtk", __version__]), self.m_subject, ) if self.m_session is not None: subject_str += f"_{self.m_session}" dst_base = os.path.join(dst_base, self.m_session) dst = os.path.join( dst_base, "figures", f"{subject_str}_{self.m_run_type}-SR_id-{self.m_sr_id}_" + "desc-processing_graph.png", ) # Create the figures/ and parent directories if they do not exist figures_dir = os.path.dirname(dst) os.makedirs(figures_dir, exist_ok=True) # Make the copy iflogger.info(f"\t > Copy {src} to {dst}...") shutil.copy(src=src, dst=dst) # Create dictionary of arguments passed to plugin_args args_dict = { "raise_insufficient": False, "n_procs": self.m_nipype_number_of_cores, } if (memory is not None) and (memory > 0): args_dict["memory_gb"] = memory iflogger.info("**** Processing ****") # datetime object containing current start date and time start = datetime.now() self.m_run_start_time = start.strftime("%B %d, %Y / %H:%M:%S") print(f" Start date / time : {self.m_run_start_time}") # Execute the workflow if self.m_nipype_number_of_cores > 1: res = self.m_wf.run(plugin="MultiProc", plugin_args=args_dict) else: res = self.m_wf.run() # Copy and rename the workflow execution log src = os.path.join(self.m_wf.base_dir, "pypeline.log") dst = os.path.join( dst_base, "logs", f"{subject_str}_{self.m_run_type}-SR_id-{self.m_sr_id}_log.txt", ) # Create the logs/ and parent directories if they do not exist logs_dir = os.path.dirname(dst) os.makedirs(logs_dir, exist_ok=True) # Make the copy iflogger.info(f"\t > Copy {src} to {dst}...") shutil.copy(src=src, dst=dst) # datetime object containing current end date and time end = datetime.now() self.m_run_end_time = end.strftime("%B %d, %Y / %H:%M:%S") print(f" End date / time : {self.m_run_end_time}") # Compute elapsed running time in minutes and seconds duration = end - start (minutes, seconds) = divmod(duration.total_seconds(), 60) self.m_run_elapsed_time = f"{int(minutes)} min. and {int(seconds)} s." print(f" Elapsed time: {self.m_run_end_time}") iflogger.info("**** Write dataset derivatives description ****") for toolbox in ["pymialsrtk", "nipype"]: write_bids_derivative_description( bids_dir=self.m_bids_dir, deriv_dir=self.m_output_dir, pipeline_name=toolbox, ) return res