brain_pipe.pipeline.cache.pipeline.CachingPreprocessingPipeline

class CachingPreprocessingPipeline(steps: Sequence[PipelineStep], pipeline_cache: PipelineCache, overwrite=False)

Bases: DefaultPipeline

Default caching preprocessing pipeline implementation.

Attributes

CONTINUE

ON_ERROR

RAISE

STOP

on_error

Get the on_error value.

__init__(steps: Sequence[PipelineStep], pipeline_cache: PipelineCache, overwrite=False)

Create a new CachingPreprocessingPipeline instance.

Parameters:
  • steps (Sequence[PipelineStep]) – The preprocessing steps to be run.

  • pipeline_cache (PipelineCache) – The pipeline cache class to be used.

  • overwrite (bool) – Whether to overwrite existing cache files.

Methods

__init__(steps, pipeline_cache[, overwrite])

Create a new CachingPreprocessingPipeline instance.

check_reload(steps, data_dicts)

Check if we can reload data from a step.

iterate_over_steps(steps, data_dict)

Iterate over a sequence of preprocessing steps.

parse_dict_keys(key[, name, ...])

Parse a key or a sequence of keys.

run_step(step, data_dict[, step_index])

Run a single preprocessing step.

check_reload(steps: Sequence[PipelineStep], data_dicts: Sequence[Dict[str, Any]])

Check if we can reload data from a step.

Parameters:
  • steps (Sequence[PipelineStep]) – The steps that are applied to the data_dicts

  • data_dicts (Sequence[Dict[str, Any]]) – The data_dicts that are passed through the pipeline

Returns:

The steps and data_dicts that can be used. If reloaded, the steps are truncated to the step that was reloaded.

Return type:

Tuple[Sequence[PipelineStep], Sequence[Dict[str, Any]]]

Notes

This will check steps from the end of the pipeline to the beginning, and reload the first step that is reloadable.

iterate_over_steps(steps: Sequence[PipelineStep], data_dict: Sequence[Dict[str, Any]] | Dict[str, Any]) Sequence[Dict[str, Any]]

Iterate over a sequence of preprocessing steps.

Parameters:
  • steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a

  • data_dict (Union[Sequence[Dict[str, Any]], Dict[str, Any]]) – A data dictionary or a sequence of data dictionaries.

Returns:

A sequence of data dictionaries.

Return type:

Sequence[Dict[str, Any]]

property on_error

Get the on_error value.

Returns:

The on_error value. One of ON_ERROR.

Return type:

str

parse_dict_keys(key: str | Sequence[str] | Mapping[str, str], name='key', require_ordered_dict=False) OrderedDict[str, str]

Parse a key or a sequence of keys.

Parameters:
  • key (Union[str, Sequence[str], Mapping[str,str]]) – A key or a sequence of keys.

  • name (str) – The name of the key. Used for error messages.

  • require_ordered_dict (bool) – If True, the key must be an OrderedDict. If False, the key can also be an ordinary dict.

Returns:

A mapping of input keys to output keys.

Return type:

OrderedDict[str, str]

Raises:

TypeError – If the key is not a string, a sequence of strings or a mapping of strings. If the key is a mapping but require_ordered_dict is True and the mapping is not an OrderedDict.

run_step(step: PipelineStep, data_dict: Dict[str, Any], step_index=None) Dict[str, Any] | Sequence[Dict[str, Any]]

Run a single preprocessing step.

Parameters:
  • step (PipelineStep) – The preprocessing step to be run.

  • data_dict (Dict[str, Any]) – The data dictionary to be preprocessed.

  • step_index (Optional[int]) – The index of the step in the pipeline. None if the step is not part of a pipeline.

Returns:

A list containing the data dictionaries for the next step.

Return type:

Sequence[Dict[str, Any]]