brain_pipe.pipeline.default.DefaultPipeline¶
- class DefaultPipeline(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)¶
Bases:
Pipeline
A default implementation fo a pipeline (sequence) of PreprocessingSteps.
This pipeline adds logic to iterate over
brain_pipe.pipeline.base.PipelineStep
objects in a thread/multiprocessing safe manner. It also adds logic to handle errors that occur during preprocessing and can optionally keep the history of the applied.See also
Overview,
brain_pipe.pipeline.base.PipelineStep
,pipeline
,brain_pipe.pipeline.base.Pipeline
,pipeline
Attributes
CONTINUE
ON_ERROR
RAISE
STOP
Get the on_error value.
- __init__(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)¶
Create a preprocessing pipeline.
- Parameters:
steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a specific order.
previous_steps_key (str) – Key to use to store the metadata of the preprocessing in the data_dicts.
error_handler_fn (Callable[[Exception, Dict[str,Any]], Any]) – Function that handles exceptions when they occur
Methods
__init__
(steps[, previous_steps_key, ...])Create a preprocessing pipeline.
check_reload
(steps, data_dicts)Check if we can reload data from a step.
iterate_over_steps
(steps, data_dict)Iterate over a sequence of preprocessing steps.
parse_dict_keys
(key[, name, ...])Parse a key or a sequence of keys.
run_step
(step, data_dict[, step_index])Run a single preprocessing step.
- check_reload(steps: Sequence[PipelineStep], data_dicts: Sequence[Dict[str, Any]])¶
Check if we can reload data from a step.
- Parameters:
steps (Sequence[PipelineStep]) – The steps that are applied to the data_dicts
data_dicts (Sequence[Dict[str, Any]]) – The data_dicts that are passed through the pipeline
- Returns:
The steps and data_dicts that can be used. If reloaded, the steps are truncated to the step that was reloaded.
- Return type:
Tuple[Sequence[PipelineStep], Sequence[Dict[str, Any]]]
Notes
This will check steps from the end of the pipeline to the beginning, and reload the first step that is reloadable.
- iterate_over_steps(steps: Sequence[PipelineStep], data_dict: Sequence[Dict[str, Any]] | Dict[str, Any]) Sequence[Dict[str, Any]] ¶
Iterate over a sequence of preprocessing steps.
- Parameters:
steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a
data_dict (Union[Sequence[Dict[str, Any]], Dict[str, Any]]) – A data dictionary or a sequence of data dictionaries.
- Returns:
A sequence of data dictionaries.
- Return type:
Sequence[Dict[str, Any]]
- property on_error¶
Get the on_error value.
- Returns:
The on_error value. One of
ON_ERROR
.- Return type:
- parse_dict_keys(key: str | Sequence[str] | Mapping[str, str], name='key', require_ordered_dict=False) OrderedDict[str, str] ¶
Parse a key or a sequence of keys.
- Parameters:
- Returns:
A mapping of input keys to output keys.
- Return type:
- Raises:
TypeError – If the key is not a string, a sequence of strings or a mapping of strings. If the key is a mapping but require_ordered_dict is True and the mapping is not an OrderedDict.
- run_step(step: PipelineStep, data_dict: Dict[str, Any], step_index: int | None = None) Dict[str, Any] | Sequence[Dict[str, Any]] ¶
Run a single preprocessing step.
- Parameters:
step (PipelineStep) – The preprocessing step to be run.
data_dict (Dict[str, Any]) – The data dictionary to be preprocessed.
step_index (Optional[int]) – The index of the step in the pipeline.
- Returns:
A data dictionary or a sequence of data dictionaries.
- Return type: