brain_pipe.pipeline.default.DefaultPipeline¶
- class DefaultPipeline(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)¶
 Bases:
PipelineA default implementation fo a pipeline (sequence) of PreprocessingSteps.
This pipeline adds logic to iterate over
brain_pipe.pipeline.base.PipelineStepobjects in a thread/multiprocessing safe manner. It also adds logic to handle errors that occur during preprocessing and can optionally keep the history of the applied.See also
Overview,
brain_pipe.pipeline.base.PipelineStep,pipeline,brain_pipe.pipeline.base.Pipeline,pipelineAttributes
CONTINUEON_ERRORRAISESTOPGet the on_error value.
- __init__(steps: ~typing.Sequence[~brain_pipe.pipeline.base.PipelineStep], previous_steps_key: str = 'previous_steps', error_handler_fn=<function default_error_handler_fn>, on_error: str = 'stop', *args, **kwargs)¶
 Create a preprocessing pipeline.
- Parameters:
 steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a specific order.
previous_steps_key (str) – Key to use to store the metadata of the preprocessing in the data_dicts.
error_handler_fn (Callable[[Exception, Dict[str,Any]], Any]) – Function that handles exceptions when they occur
Methods
__init__(steps[, previous_steps_key, ...])Create a preprocessing pipeline.
check_reload(steps, data_dicts)Check if we can reload data from a step.
iterate_over_steps(steps, data_dict)Iterate over a sequence of preprocessing steps.
parse_dict_keys(key[, name, ...])Parse a key or a sequence of keys.
run_step(step, data_dict[, step_index])Run a single preprocessing step.
- check_reload(steps: Sequence[PipelineStep], data_dicts: Sequence[Dict[str, Any]])¶
 Check if we can reload data from a step.
- Parameters:
 steps (Sequence[PipelineStep]) – The steps that are applied to the data_dicts
data_dicts (Sequence[Dict[str, Any]]) – The data_dicts that are passed through the pipeline
- Returns:
 The steps and data_dicts that can be used. If reloaded, the steps are truncated to the step that was reloaded.
- Return type:
 Tuple[Sequence[PipelineStep], Sequence[Dict[str, Any]]]
Notes
This will check steps from the end of the pipeline to the beginning, and reload the first step that is reloadable.
- iterate_over_steps(steps: Sequence[PipelineStep], data_dict: Sequence[Dict[str, Any]] | Dict[str, Any]) Sequence[Dict[str, Any]]¶
 Iterate over a sequence of preprocessing steps.
- Parameters:
 steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a
data_dict (Union[Sequence[Dict[str, Any]], Dict[str, Any]]) – A data dictionary or a sequence of data dictionaries.
- Returns:
 A sequence of data dictionaries.
- Return type:
 Sequence[Dict[str, Any]]
- property on_error¶
 Get the on_error value.
- Returns:
 The on_error value. One of
ON_ERROR.- Return type:
 
- parse_dict_keys(key: str | Sequence[str] | Mapping[str, str], name='key', require_ordered_dict=False) OrderedDict[str, str]¶
 Parse a key or a sequence of keys.
- Parameters:
 - Returns:
 A mapping of input keys to output keys.
- Return type:
 - Raises:
 TypeError – If the key is not a string, a sequence of strings or a mapping of strings. If the key is a mapping but require_ordered_dict is True and the mapping is not an OrderedDict.
- run_step(step: PipelineStep, data_dict: Dict[str, Any], step_index: int | None = None) Dict[str, Any] | Sequence[Dict[str, Any]]¶
 Run a single preprocessing step.
- Parameters:
 step (PipelineStep) – The preprocessing step to be run.
data_dict (Dict[str, Any]) – The data dictionary to be preprocessed.
step_index (Optional[int]) – The index of the step in the pipeline.
- Returns:
 A data dictionary or a sequence of data dictionaries.
- Return type: