brain_pipe.pipeline.default.DefaultPipeline

class DefaultPipeline(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)

Bases: Pipeline

A default implementation fo a pipeline (sequence) of PreprocessingSteps.

This pipeline adds logic to iterate over brain_pipe.pipeline.base.PipelineStep objects in a thread/multiprocessing safe manner. It also adds logic to handle errors that occur during preprocessing and can optionally keep the history of the applied.

Attributes

CONTINUE

ON_ERROR

RAISE

STOP

on_error

Get the on_error value.

__init__(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)

Create a preprocessing pipeline.

Parameters:
  • steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a specific order.

  • previous_steps_key (str) – Key to use to store the metadata of the preprocessing in the data_dicts.

  • error_handler_fn (Callable[[Exception, Dict[str,Any]], Any]) – Function that handles exceptions when they occur

Methods

__init__(steps[, previous_steps_key, ...])

Create a preprocessing pipeline.

check_reload(steps, data_dicts)

Check if we can reload data from a step.

iterate_over_steps(steps, data_dict)

Iterate over a sequence of preprocessing steps.

parse_dict_keys(key[, name, ...])

Parse a key or a sequence of keys.

run_step(step, data_dict[, step_index])

Run a single preprocessing step.

check_reload(steps: Sequence[PipelineStep], data_dicts: Sequence[Dict[str, Any]])

Check if we can reload data from a step.

Parameters:
  • steps (Sequence[PipelineStep]) – The steps that are applied to the data_dicts

  • data_dicts (Sequence[Dict[str, Any]]) – The data_dicts that are passed through the pipeline

Returns:

The steps and data_dicts that can be used. If reloaded, the steps are truncated to the step that was reloaded.

Return type:

Tuple[Sequence[PipelineStep], Sequence[Dict[str, Any]]]

Notes

This will check steps from the end of the pipeline to the beginning, and reload the first step that is reloadable.

iterate_over_steps(steps: Sequence[PipelineStep], data_dict: Sequence[Dict[str, Any]] | Dict[str, Any]) Sequence[Dict[str, Any]]

Iterate over a sequence of preprocessing steps.

Parameters:
  • steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a

  • data_dict (Union[Sequence[Dict[str, Any]], Dict[str, Any]]) – A data dictionary or a sequence of data dictionaries.

Returns:

A sequence of data dictionaries.

Return type:

Sequence[Dict[str, Any]]

property on_error

Get the on_error value.

Returns:

The on_error value. One of ON_ERROR.

Return type:

str

parse_dict_keys(key: str | Sequence[str] | Mapping[str, str], name='key', require_ordered_dict=False) OrderedDict[str, str]

Parse a key or a sequence of keys.

Parameters:
  • key (Union[str, Sequence[str], Mapping[str,str]]) – A key or a sequence of keys.

  • name (str) – The name of the key. Used for error messages.

  • require_ordered_dict (bool) – If True, the key must be an OrderedDict. If False, the key can also be an ordinary dict.

Returns:

A mapping of input keys to output keys.

Return type:

OrderedDict[str, str]

Raises:

TypeError – If the key is not a string, a sequence of strings or a mapping of strings. If the key is a mapping but require_ordered_dict is True and the mapping is not an OrderedDict.

run_step(step: PipelineStep, data_dict: Dict[str, Any], step_index: int | None = None) Dict[str, Any] | Sequence[Dict[str, Any]]

Run a single preprocessing step.

Parameters:
  • step (PipelineStep) – The preprocessing step to be run.

  • data_dict (Dict[str, Any]) – The data dictionary to be preprocessed.

  • step_index (Optional[int]) – The index of the step in the pipeline.

Returns:

A data dictionary or a sequence of data dictionaries.

Return type:

Union[Dict[str, Any], Sequence[Dict[str, Any]]]