brain_pipe.pipeline.cache.pipeline.CachingPreprocessingPipeline¶
- class CachingPreprocessingPipeline(steps: Sequence[PipelineStep], pipeline_cache: PipelineCache, overwrite=False)¶
Bases:
DefaultPipeline
Default caching preprocessing pipeline implementation.
Attributes
CONTINUE
ON_ERROR
RAISE
STOP
Get the on_error value.
- __init__(steps: Sequence[PipelineStep], pipeline_cache: PipelineCache, overwrite=False)¶
Create a new CachingPreprocessingPipeline instance.
- Parameters:
steps (Sequence[PipelineStep]) – The preprocessing steps to be run.
pipeline_cache (PipelineCache) – The pipeline cache class to be used.
overwrite (bool) – Whether to overwrite existing cache files.
Methods
__init__
(steps, pipeline_cache[, overwrite])Create a new CachingPreprocessingPipeline instance.
check_reload
(steps, data_dicts)Check if we can reload data from a step.
iterate_over_steps
(steps, data_dict)Iterate over a sequence of preprocessing steps.
parse_dict_keys
(key[, name, ...])Parse a key or a sequence of keys.
run_step
(step, data_dict[, step_index])Run a single preprocessing step.
- check_reload(steps: Sequence[PipelineStep], data_dicts: Sequence[Dict[str, Any]])¶
Check if we can reload data from a step.
- Parameters:
steps (Sequence[PipelineStep]) – The steps that are applied to the data_dicts
data_dicts (Sequence[Dict[str, Any]]) – The data_dicts that are passed through the pipeline
- Returns:
The steps and data_dicts that can be used. If reloaded, the steps are truncated to the step that was reloaded.
- Return type:
Tuple[Sequence[PipelineStep], Sequence[Dict[str, Any]]]
Notes
This will check steps from the end of the pipeline to the beginning, and reload the first step that is reloadable.
- iterate_over_steps(steps: Sequence[PipelineStep], data_dict: Sequence[Dict[str, Any]] | Dict[str, Any]) Sequence[Dict[str, Any]] ¶
Iterate over a sequence of preprocessing steps.
- Parameters:
steps (Sequence[PipelineStep]) – A sequence of preprocessing steps to be applied in a
data_dict (Union[Sequence[Dict[str, Any]], Dict[str, Any]]) – A data dictionary or a sequence of data dictionaries.
- Returns:
A sequence of data dictionaries.
- Return type:
Sequence[Dict[str, Any]]
- property on_error¶
Get the on_error value.
- Returns:
The on_error value. One of
ON_ERROR
.- Return type:
- parse_dict_keys(key: str | Sequence[str] | Mapping[str, str], name='key', require_ordered_dict=False) OrderedDict[str, str] ¶
Parse a key or a sequence of keys.
- Parameters:
- Returns:
A mapping of input keys to output keys.
- Return type:
- Raises:
TypeError – If the key is not a string, a sequence of strings or a mapping of strings. If the key is a mapping but require_ordered_dict is True and the mapping is not an OrderedDict.
- run_step(step: PipelineStep, data_dict: Dict[str, Any], step_index=None) Dict[str, Any] | Sequence[Dict[str, Any]] ¶
Run a single preprocessing step.
- Parameters:
step (PipelineStep) – The preprocessing step to be run.
data_dict (Dict[str, Any]) – The data dictionary to be preprocessed.
step_index (Optional[int]) – The index of the step in the pipeline. None if the step is not part of a pipeline.
- Returns:
A list containing the data dictionaries for the next step.
- Return type:
Sequence[Dict[str, Any]]