Overview

In the BIDS preprocessing package, we use a pipeline structure to organize the processing steps in a way that is customizable and extensible. A simple example is shown in Fig. 2.

_images/simple_pipeline.svg

Fig. 2 Simple pipeline structure

A DataLoader is used to load data from a source (e.g. files, data generated on the fly) and structured in a Python dict. This dict is then passed to a Pipeline, which will sequentially run all its PipelineStep s on the data. After all steps have been completed, the Pipeline will return the final dict to the user.

The relevant classes here are the DataLoader, PipelineStep and Pipeline.

Concept

The goal we try to accomplish is to preprocess brain imaging data in an efficient but extensible manner. Most of the time, this means that we should:

  1. Find and read data into a format that is easy to work with (DataLoader)

  2. Preform one or multiple preprocessing steps (e.g. bandpass filtering, motion correction, etc., Pipeline)

  3. (Optional) Save the results to disk with a PipelineStep (Saver)

  4. Return the preprocessed data to the user

You are free to define multiple DataLoader, Pipeline and Save steps for more complex pipelines. See Fig. 3 for an example.

Note

The Pipeline is a subclass of PipelineStep, so a Pipeline can be used as a PipelineStep in another Pipeline.

Pipeline s can be defined in code (i.e. in a Python script), or in a configuration file (e.g. YAML). For more information about configuration files, see Defining and running Pipelines through configuration files.

_images/multiple_pipeline.svg

Fig. 3 More elaborate pipeline Multiple DataLoader and Pipeline s can be run sequentially.

The data_dict

To be able to pass both data and metadata between steps, we use a simple Python dict object (commonly called the data_dict in the documentation or code). Practically, each PipelineStep is just a functor (i.e. a class that implements the __call__ method) that takes such a dict as input and returns a new dict with preprocessed data as an output.

Note

This makes it also possible for a step to use data generated by steps that came multiple steps before it in the pipeline.

Danger

This can waste a lot of memory, as each step will have to store the data from some previous steps in memory. This is not a problem for small datasets, but may be a problem for larger datasets. It is possible to overwrite or delete the data from previous steps if you will not need it again in a later step.

Convenient classes

In the brain_pipe module, a lot a PipelineStep`s are already defined for you. Additionally, there is a :class:.DefaultPipeline` class that can be used to easily create a pipeline without worrying about the specific implementation.

Additional considerations

The Pipeline will work on a datapoint by datapoint basis, i.e. it will receive a datapoint from the DataLoader and run all steps sequentially on the file before starting with the next datapoint.

It is possbile to parallelize this process using modeules like multiprocessing. This will allow for multiple datapoints to be processed at the same time.

A default implementation of this is provided in bids_preprocessing/utils/multiprocessing.

Danger

Most multiprocessing libraries require the data in the initial data_dict and data stored in the PipelineStep/ Pipeline to be picklable. For the default pickle module, this means that the use of non-picklable objects (e.g. lambda functions) will probably cause an error.

Warning

While multiprocessing can speed up processing immensly, it comes at the cost of higher RAM usage. As most brain imaging data is already quite large, the bottleneck for most pipelines is the RAM usage. When the exceeding the RAM limit, the operating system will start to use the hard drive as a temporary storage space. This is called “swapping” and will slow down the processing.

To be able to easily add, remove or relocate different PipelineStep in your pipeline, it is recommended to keep your steps as modular as possible. This means that each step should only do one thing, and that the steps should be as independent as possible.

Note

When contributing to the package, please try to keep this in mind. This will make it easier for others to use your code, and will make it easier to maintain the code in the future.

For your own personal project though, it is completely up to you how you want to structure your pipeline. If you want to have a single step that does everything, that is fine by me ;).

The Runner class can be used to run multiple Pipelines sequentially.

Writing your own PipelineStep

Following the recommendations above, you can easily write your own PipelineStep by subclassing it and implementing the __call__ method. This method should take a single dict (the data_dict) as input, and return a single dict as output.

Note

Strictly speaking, the subclassing isn’t necessary, but it is recommended as some helper functions are provided in the PipelineStep class.

In theory, any function that takes a single dict as input and returns a single dict as output can be used in lieu of a PipelineStep.

Small Example

We will show in a toy example how to create a simple DataLoader and a DefaultPipeline with 2 steps. The DataLoader will generate some data. The first step will add a bit of data, and the second one will remove the mean from the data.

Let’s first import the necessary base classes:

>>> from brain_pipe.pipeline.base import PipelineStep
... from brain_pipe.pipeline.default import DefaultPipeline
... from brain_pipe.pipeline.data_loader import DataLoader

Now we can create our steps:

>>> class RangeDataLoader(DataLoader):
...    def __iter__(self):
...        yield {'data': list(range(9))}
>>> class AddData(PipelineStep):
...    def __call__(self, data_dict: dict):
...        data_dict['data'].append(9)
...        return data_dict
>>> class RemoveMean(PipelineStep):
...    def __call__(self, data_dict: dict):
...        mean = sum(data_dict['data'])/len(data_dict['data'])
...        data_dict['data'] = [x - mean for x in data_dict['data']]
...        return data_dict

Alright, let’s construct our pipeline and DataLoader:

>>> pipeline = Pipeline([AddData(), RemoveMean()])
... data_loader = RangeDataLoader()

Now we can run the pipeline on the data manually:

>>> results = []
... for data_dict in data_loader:
...        results.append(pipeline(data_dict))
... results
[{'data': [-4.5, -3.5, -2.5, -1.5, -0.5, 0.5, 1.5, 2.5, 3.5, 4.5]}]

Or we can use a Runner to run the pipeline on the data:

>>> from brain_pipe.pipeline.runner import Runner
... runner = Runner((data_loader, pipeline))
... results = runner.run()
... results
[{'data': [-4.5, -3.5, -2.5, -1.5, -0.5, 0.5, 1.5, 2.5, 3.5, 4.5]}]

For a more elaborate example, look at the sparrKULee example.