.. _pipeline: Overview ======== In the BIDS preprocessing package, we use a pipeline structure to organize the processing steps in a way that is customizable and extensible. A simple example is shown in :numref:`pipeline-figure`. .. _pipeline-figure: .. figure:: /_images/simple_pipeline.svg :align: center :width: 100% **Simple pipeline structure** A :class:`.DataLoader` is used to load data from a source (e.g. files, data generated on the fly) and structured in a Python :class:`dict`. This :class:`dict` is then passed to a :class:`.Pipeline`, which will sequentially run all its :class:`.PipelineStep` s on the data. After all steps have been completed, the :class:`.Pipeline` will return the final :class:`dict` to the user. The relevant classes here are the :class:`.DataLoader`, :class:`.PipelineStep` and :class:`.Pipeline`. Concept ------- The goal we try to accomplish is to preprocess brain imaging data in an efficient but extensible manner. Most of the time, this means that we should: 1. Find and read data into a format that is easy to work with (:class:`.DataLoader`) 2. Preform one or multiple preprocessing steps (e.g. bandpass filtering, motion correction, etc., :class:`.Pipeline`) 3. (Optional) Save the results to disk with a :class:`.PipelineStep` (:class:`.Saver`) 4. Return the preprocessed data to the user You are free to define multiple :class:`.DataLoader`, :class:`.Pipeline` and :class:`.Save` steps for more complex pipelines. See :numref:`pipeline-figure-multiple` for an example. .. note:: The :class:`.Pipeline` is a subclass of :class:`.PipelineStep`, so a :class:`.Pipeline` can be used as a :class:`.PipelineStep` in another :class:`.Pipeline`. :class:`.Pipeline` s can be defined in code (i.e. in a Python script), or in a configuration file (e.g. YAML). For more information about configuration files, see :ref:`configuration`. .. _pipeline-figure-multiple: .. figure:: /_images/multiple_pipeline.svg :align: center :width: 100% **More elaborate pipeline** Multiple :class:`.DataLoader` and :class:`.Pipeline` s can be run sequentially. The ``data_dict`` ^^^^^^^^^^^^^^^^^ To be able to pass both data and metadata between steps, we use a simple Python :class:`dict` object (commonly called the ``data_dict`` in the documentation or code). Practically, each :class:`.PipelineStep` is just a functor (i.e. a class that implements the ``__call__`` method) that takes such a :class:`dict` as input and returns a new :class:`dict` with preprocessed data as an output. .. note:: This makes it also possible for a step to use data generated by steps that came multiple steps before it in the pipeline. .. danger:: This can waste a lot of memory, as each step will have to store the data from some previous steps in memory. This is not a problem for small datasets, but may be a problem for larger datasets. It is possible to overwrite or delete the data from previous steps if you will not need it again in a later step. Convenient classes ------------------ In the :mod:`brain_pipe` module, a lot a :class:`.PipelineStep`s are already defined for you. Additionally, there is a :class:`.DefaultPipeline` class that can be used to easily create a pipeline without worrying about the specific implementation. Additional considerations ------------------------- The :class:`.Pipeline` will work on a datapoint by datapoint basis, i.e. it will receive a datapoint from the :class:`.DataLoader` and run all steps sequentially on the file before starting with the next datapoint. It is possbile to parallelize this process using modeules like :mod:`multiprocessing`. This will allow for multiple datapoints to be processed at the same time. A default implementation of this is provided in :mod:`bids_preprocessing/utils/multiprocessing`. .. danger:: Most multiprocessing libraries require the data in the initial ``data_dict`` and data stored in the :class:`.PipelineStep`/ :class:`.Pipeline` to be picklable. For the default :mod:`pickle` module, this means that the use of non-picklable objects (e.g. `lambda` functions) will probably cause an error. .. warning:: While multiprocessing can speed up processing immensly, it comes at the cost of higher RAM usage. As most brain imaging data is already quite large, the bottleneck for most pipelines is the RAM usage. When the exceeding the RAM limit, the operating system will start to use the hard drive as a temporary storage space. This is called "swapping" and will slow down the processing. To be able to easily add, remove or relocate different :class:`.PipelineStep` in your pipeline, it is recommended to keep your steps as modular as possible. This means that each step should only do one thing, and that the steps should be as independent as possible. .. note:: When contributing to the package, please try to keep this in mind. This will make it easier for others to use your code, and will make it easier to maintain the code in the future. For your own personal project though, it is completely up to you how you want to structure your pipeline. If you want to have a single step that does everything, that is fine by me ;). The :class:`.Runner` class can be used to run multiple :class:`.Pipelines` sequentially. Writing your own :class:`.PipelineStep` --------------------------------------- Following the recommendations above, you can easily write your own :class:`.PipelineStep` by subclassing it and implementing the ``__call__`` method. This method should take a single :class:`dict` (the ``data_dict``) as input, and return a single :class:`dict` as output. .. note:: Strictly speaking, the subclassing isn't necessary, but it is recommended as some helper functions are provided in the :class:`.PipelineStep` class. In theory, any function that takes a single :class:`dict` as input and returns a single :class:`dict` as output can be used in lieu of a :class:`.PipelineStep`. Small Example ------------- We will show in a toy example how to create a simple :class:`DataLoader` and a :class:`.DefaultPipeline` with 2 steps. The :class:`DataLoader` will generate some data. The first step will add a bit of data, and the second one will remove the mean from the data. Let's first import the necessary base classes: >>> from brain_pipe.pipeline.base import PipelineStep ... from brain_pipe.pipeline.default import DefaultPipeline ... from brain_pipe.pipeline.data_loader import DataLoader Now we can create our steps: >>> class RangeDataLoader(DataLoader): ... def __iter__(self): ... yield {'data': list(range(9))} >>> class AddData(PipelineStep): ... def __call__(self, data_dict: dict): ... data_dict['data'].append(9) ... return data_dict >>> class RemoveMean(PipelineStep): ... def __call__(self, data_dict: dict): ... mean = sum(data_dict['data'])/len(data_dict['data']) ... data_dict['data'] = [x - mean for x in data_dict['data']] ... return data_dict Alright, let's construct our pipeline and DataLoader: >>> pipeline = Pipeline([AddData(), RemoveMean()]) ... data_loader = RangeDataLoader() Now we can run the pipeline on the data manually: >>> results = [] ... for data_dict in data_loader: ... results.append(pipeline(data_dict)) ... results [{'data': [-4.5, -3.5, -2.5, -1.5, -0.5, 0.5, 1.5, 2.5, 3.5, 4.5]}] Or we can use a :class:`Runner` to run the pipeline on the data: >>> from brain_pipe.pipeline.runner import Runner ... runner = Runner((data_loader, pipeline)) ... results = runner.run() ... results [{'data': [-4.5, -3.5, -2.5, -1.5, -0.5, 0.5, 1.5, 2.5, 3.5, 4.5]}] For a more elaborate example, look at `the sparrKULee example <../../../examples/exporl/sparrKULee.py>`_.