pipefunc.map module#
pipefunc.map: Modules that handle MapSpecs and its runs.
- class pipefunc.map.DictArray(folder, shape, internal_shape=None, shape_mask=None, *, mapping=None)[source]#
Bases:
StorageBaseA
numpy.ndarraybacked by a dict with internal structure.- dump(key, value)[source]#
Dump โvalueโ into the location associated with โkeyโ.
- Return type:
Examples
>>> arr = DictArray(...) >>> arr.dump((2, 1, 5), dict(a=1, b=2))
- property dump_in_subprocess: bool#
Indicates if the storage can be dumped in a subprocess and read by the main process.
- property mask: MaskedArray#
Return the mask associated with the array.
- requires_serialization: bool = False#
- storage_id: str = 'dict'#
- class pipefunc.map.FileArray(folder, shape, internal_shape=None, shape_mask=None, *, filename_template='__{:d}__.pickle')[source]#
Bases:
StorageBaseArray interface to a folder of files on disk.
__getitem__ returns โnp.ma.maskedโ for non-existent files.
- dump(key, value)[source]#
Dump โvalueโ into the file associated with โkeyโ.
- Return type:
Examples
>>> arr = FileArray(...) >>> arr.dump((2, 1, 5), dict(a=1, b=2))
- property dump_in_subprocess: bool#
Indicates if the storage can be dumped in a subprocess and read by the main process.
- classmethod from_data(data, folder)[source]#
Create a FileArray from an existing list or NumPy array.
This method serializes the provided data (which can be a Python list or a NumPy array) into the FileArrayโs on-disk format within the specified folder. Each element of the input data is dumped individually.
This is useful for preparing large datasets for use with pipeline.map in distributed environments (like SLURM), as it allows
pipefuncto pass around a lightweightFileArrayobject instead of serializing the entire large dataset for each task.- Parameters:
- Returns:
A new FileArray instance populated with the provided data.
- Return type:
- property mask: MaskedArray#
Return a masked numpy array containing the mask.
The returned numpy array has dtype โboolโ and a mask for masking out missing data.
- requires_serialization: bool = True#
- storage_id: str = 'file_array'#
- to_array(*, splat_internal=None)[source]#
Return a masked numpy array containing all the data.
The returned numpy array has dtype โobjectโ and a mask for masking out missing data.
- shape: ShapeTuple#
- internal_shape: ShapeTuple#
- shape_mask: tuple[bool, ...]#
- class pipefunc.map.MapSpec(inputs, outputs, _is_generated=False)[source]#
Bases:
objectSpecification for how to map input axes to output axes.
Examples
>>> mapped = MapSpec.from_string("a[i, j], b[i, j], c[k] -> q[i, j, k]") >>> partial_reduction = MapSpec.from_string("a[i, :], b[:, k] -> q[i, k]")
- property external_indices: tuple[str, ...][source]#
Output indices that are shared with the input indices.
- input_keys(shape, linear_index)[source]#
Return keys for indexing inputs of this map.
- Parameters:
- Return type:
Examples
>>> spec = MapSpec("x[i, j], y[j, :, k] -> z[i, j, k]") >>> spec.input_keys((5, 2, 3), 23) {'x': (3, 1), 'y': (1, slice(None, None, None), 2)}
- output_key(shape, linear_index)[source]#
Return a key used for indexing the output of this map.
- Parameters:
- Return type:
Examples
>>> spec = MapSpec.from_string("x[i, j], y[j, :, k] -> z[i, j, k]") >>> spec.output_key((5, 2, 3), 23) (3, 1, 2)
- rename(renames)[source]#
Return a new renamed MapSpec if any of the names are in โrenamesโ.
- Return type:
- rename_axes(renames)[source]#
Return a new MapSpec with renamed axes.
- Parameters:
renames (
dict[str,str]) โ Dictionary mapping old axis names to new axis names.- Return type:
- Returns:
A new MapSpec with renamed axes applied to all inputs and outputs.
Examples
>>> spec = MapSpec.from_string("a[i, j], b[i, j] -> c[i, j]") >>> renamed = spec.rename_axes({"i": "x", "j": "y"}) >>> str(renamed) 'a[x, y], b[x, y] -> c[x, y]'
- class pipefunc.map.RunInfo(inputs, defaults, all_output_names, shapes, resolved_shapes, internal_shapes, shape_masks, run_folder, mapspecs_as_strings, storage, error_handling='raise', pipefunc_version='0.90.2.dev1+None.gb2ba6a8.dirty')[source]#
Bases:
objectInformation about a
pipeline.map()run.The data in this class is immutable, except for
resolved_shapeswhich is updated as new shapes are resolved.- classmethod create(run_folder, pipeline, inputs, internal_shapes=None, *, executor=None, storage, error_handling='raise', cleanup=None, resume=False, resume_validation='auto')[source]#
- Return type:
Bases:
DictArrayArray interface to a shared memory dict store.
Indicates if the storage can be dumped in a subprocess and read by the main process.
- class pipefunc.map.StorageBase(folder, shape, internal_shape=None, shape_mask=None)[source]#
Bases:
ABCBase class for file-based arrays.
- abstract property dump_in_subprocess: bool#
Indicates if the storage can be dumped in a subprocess and read by the main process.
- abstract property mask: MaskedArray#
- pipefunc.map.load_all_outputs(run_folder)[source]#
Load all outputs of a run.
Allows loading outputs even from partial runs that failed. Map results are returned as `numpy.masked_array`s where missing values are masked.
- Parameters:
run_folder (
str|Path) โ Therun_folderused inpipeline.maporpipeline.map_async.- Return type:
See also
load_outputsFor loading specific outputs.
- pipefunc.map.load_dataframe(*output_name, run_folder, load_intermediate=True, backend='pandas')[source]#
Load the output(s) of a pipeline.map as a DataFrame.
- Parameters:
output_name (
str) โ The names of the outputs to load. If empty, all outputs are loaded.run_folder (
str|Path) โ The folder where the pipeline run was stored.load_intermediate (
bool) โ Whether to load intermediate outputs.backend (
Literal['pandas','polars']) โ DataFrame library to use when constructing the result. Defaults to"pandas".
- Return type:
- Returns:
A DataFrame from the selected backend containing the outputs of the pipeline run.
- pipefunc.map.load_outputs(*output_names, run_folder)[source]#
Load the outputs of a run.
Allows loading outputs even from partial runs that failed. Map results are returned as `numpy.masked_array`s where missing values are masked.
- Parameters:
- Return type:
See also
load_all_outputsFor loading all outputs.
- pipefunc.map.load_xarray_dataset(*output_name, run_folder, load_intermediate=True)[source]#
Load the output(s) of a pipeline.map as an
xarray.Dataset.- Parameters:
- Return type:
Dataset- Returns:
An
xarray.Datasetcontaining the outputs of the pipeline run.
- pipefunc.map.register_storage(cls, storage_id=None)[source]#
Register a StorageBase class.
- Parameters:
cls (
type[StorageBase]) โ Storage class that should be registered.storage_id (
str|None) โ Storage identifier, defaults to the storage_id attribute of the class.
- Return type:
Notes
This function maintains a mapping from storage identifiers to storage classes. When a storage class is registered, it will replace any class previously registered under the same storage identifier, if present.
- pipefunc.map.run_map(pipeline, inputs, run_folder=None, internal_shapes=None, *, output_names=None, parallel=True, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise')[source]#
Run a pipeline with
MapSpecfunctions for giveninputs.- Parameters:
pipeline (
Pipeline) โ The pipeline to run.inputs (
dict[str,Any] |BaseModel) โ The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions withoutmapspecor lists of values or numpy.ndarray`s for functions with ``mapspec`.run_folder (
str|Path|None) โ The folder to store the run information. IfNone, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) โ The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or โ?โ for unknown dimensions. Theinternal_shapecan also be provided via thePipeFunc(..., internal_shape=...)argument. If a PipeFunc has aninternal_shapeargument and it is provided here, the provided value is used.output_names (
set[str|tuple[str,...]] |None) โ The output(s) to calculate. IfNone, the entire pipeline is run and all outputs are computed.parallel (
bool) โ Whether to run the functions in parallel. Is ignored if providedexecutoris notNone.executor (
Executor|dict[str|tuple[str,...],Executor] |None) โThe executor to use for parallel execution. Can be specified as:
None: Aconcurrent.futures.ProcessPoolExecutoris used (only ifparallel=True).A
concurrent.futures.Executorinstance: Used for all outputs.A dictionary: Specify different executors for different outputs.
Use output names as keys and
Executorinstances as values.Use an empty string
""as a key to set a default executor.
If parallel is
False, this argument is ignored.chunksizes (
int|dict[str|tuple[str,...],int|Callable[[int],int] |None] |None) โControls batching of
MapSpeccomputations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:None: Automatically determine optimal chunk sizes (default)
int: Same chunk size for all outputs
- dict: Different chunk sizes per output where:
Keys are output names (or
""for default)Values are either integers or callables
Callables take total execution count and return chunk size
Examples:
>>> chunksizes = None # Auto-determine optimal chunk sizes >>> chunksizes = 100 # All outputs use chunks of 100 >>> chunksizes = {"out1": 50, "out2": 100} # Different sizes per output >>> chunksizes = {"": 50, "out1": lambda n: n // 20} # Default and dynamic
storage (
str|Literal['file_array','dict','shared_memory_dict'] |dict[str|tuple[str,...],Literal['file_array','dict','shared_memory_dict'] |str] |None) โThe storage class to use for storing intermediate and final results. Can be specified as:
A string: Use a single storage class for all outputs.
A dictionary: Specify different storage classes for different outputs.
Use output names as keys and storage class names as values.
Use an empty string
""as a key to set a default storage class.
Available storage classes are registered in
pipefunc.map.storage_registry. Common options include"file_array","dict", and"shared_memory_dict". Defaults to"file_array"ifrun_folderis provided, otherwise"dict".persist_memory (
bool) โ Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) โWhether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.resume_validation (
Literal['auto','strict','skip']) โControls validation strictness when reusing data from a previous run (only applies when
resume=True):"auto"(default): Validate that inputs/defaults match the previous run. If equality comparison fails (returnsNone), warn but proceed anyway."strict": Validate that inputs/defaults match. Raise an error if equality comparison fails."skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.
Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when
resume=False.fixed_indices (
dict[str,int|slice] |None) โ A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.auto_subpipeline (
bool) โ IfTrue, a subpipeline is created with the specifiedinputs, using Pipeline.subpipeline. This allows to provide intermediate results in theinputsinstead of providing the root arguments. IfFalse, all root arguments must be provided, and an exception is raised if any are missing.show_progress (
bool|Literal['rich','ipywidgets','headless'] |None) โWhether to display a progress bar. Can be:
True: Display a progress bar. Auto-selects based on environment:ipywidgetsin Jupyter (if installed), otherwiserich(if installed).False: No progress bar."ipywidgets": Forceipywidgetsprogress bar (HTML-based). Shown only if in a Jupyter notebook andipywidgetsis installed."rich": Forcerichprogress bar (text-based). Shown only ifrichis installed."headless": No progress bar, but the progress is still tracked internally.None(default): Showsipywidgetsprogress bar only if running in a Jupyter notebook andipywidgetsis installed. Otherwise, no progress bar is shown.
return_results (
bool) โ Whether to return the results of the pipeline. IfFalse, the pipeline is run without keeping the results in memory. Instead the results are only kept in the setstorage. This is useful for very large pipelines where the results do not fit into memory.error_handling (
Literal['raise','continue']) โHow to handle errors during function execution:
"raise"(default): Stop execution on first error and raise exception"continue": Continue execution, collecting errors as ErrorSnapshot objects
- Return type:
ResultDict
- pipefunc.map.run_map_async(pipeline, inputs, run_folder=None, internal_shapes=None, *, output_names=None, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise', display_widgets=True, start=True)[source]#
Asynchronously run a pipeline with
MapSpecfunctions for giveninputs.Returns immediately with an AsyncRun instance with a task attribute that can be awaited.
- Parameters:
pipeline (
Pipeline) โ The pipeline to run.inputs (
dict[str,Any] |BaseModel) โ The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions withoutmapspecor lists of values or numpy.ndarray`s for functions with ``mapspec`.run_folder (
str|Path|None) โ The folder to store the run information. IfNone, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) โ The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or โ?โ for unknown dimensions. Theinternal_shapecan also be provided via thePipeFunc(..., internal_shape=...)argument. If a PipeFunc has aninternal_shapeargument and it is provided here, the provided value is used.output_names (
set[str|tuple[str,...]] |None) โ The output(s) to calculate. IfNone, the entire pipeline is run and all outputs are computed.executor (
Executor|dict[str|tuple[str,...],Executor] |None) โThe executor to use for parallel execution. Can be specified as:
None: Aconcurrent.futures.ProcessPoolExecutoris used (only ifparallel=True).A
concurrent.futures.Executorinstance: Used for all outputs.A dictionary: Specify different executors for different outputs.
Use output names as keys and
Executorinstances as values.Use an empty string
""as a key to set a default executor.
chunksizes (
int|dict[str|tuple[str,...],int|Callable[[int],int] |None] |None) โControls batching of
MapSpeccomputations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:None: Automatically determine optimal chunk sizes (default)
int: Same chunk size for all outputs
- dict: Different chunk sizes per output where:
Keys are output names (or
""for default)Values are either integers or callables
Callables take total execution count and return chunk size
Examples:
>>> chunksizes = None # Auto-determine optimal chunk sizes >>> chunksizes = 100 # All outputs use chunks of 100 >>> chunksizes = {"out1": 50, "out2": 100} # Different sizes per output >>> chunksizes = {"": 50, "out1": lambda n: n // 20} # Default and dynamic
storage (
str|Literal['file_array','dict','shared_memory_dict'] |dict[str|tuple[str,...],Literal['file_array','dict','shared_memory_dict'] |str] |None) โThe storage class to use for storing intermediate and final results. Can be specified as:
A string: Use a single storage class for all outputs.
A dictionary: Specify different storage classes for different outputs.
Use output names as keys and storage class names as values.
Use an empty string
""as a key to set a default storage class.
Available storage classes are registered in
pipefunc.map.storage_registry. Common options include"file_array","dict", and"shared_memory_dict". Defaults to"file_array"ifrun_folderis provided, otherwise"dict".persist_memory (
bool) โ Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) โWhether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.resume_validation (
Literal['auto','strict','skip']) โControls validation strictness when reusing data from a previous run (only applies when
resume=True):"auto"(default): Validate that inputs/defaults match the previous run. If equality comparison fails (returnsNone), warn but proceed anyway."strict": Validate that inputs/defaults match. Raise an error if equality comparison fails."skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.
Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when
resume=False.fixed_indices (
dict[str,int|slice] |None) โ A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.auto_subpipeline (
bool) โ IfTrue, a subpipeline is created with the specifiedinputs, using Pipeline.subpipeline. This allows to provide intermediate results in theinputsinstead of providing the root arguments. IfFalse, all root arguments must be provided, and an exception is raised if any are missing.show_progress (
bool|Literal['rich','ipywidgets','headless'] |None) โWhether to display a progress bar. Can be:
True: Display a progress bar. Auto-selects based on environment:ipywidgetsin Jupyter (if installed), otherwiserich(if installed).False: No progress bar."ipywidgets": Forceipywidgetsprogress bar (HTML-based). Shown only if in a Jupyter notebook andipywidgetsis installed."rich": Forcerichprogress bar (text-based). Shown only ifrichis installed."headless": No progress bar, but the progress is still tracked internally.None(default): Showsipywidgetsprogress bar only if running in a Jupyter notebook andipywidgetsis installed. Otherwise, no progress bar is shown.
display_widgets (
bool) โ Whether to callIPython.display.display(...)on widgets. Ignored if outside of a Jupyter notebook.return_results (
bool) โ Whether to return the results of the pipeline. IfFalse, the pipeline is run without keeping the results in memory. Instead the results are only kept in the setstorage. This is useful for very large pipelines where the results do not fit into memory.error_handling (
Literal['raise','continue']) โHow to handle errors during function execution:
"raise"(default): Stop execution on first error and raise exception"continue": Continue execution, collecting errors as ErrorSnapshot objects
start (
bool) โ Whether to start the pipeline immediately. IfFalse, the pipeline is not started until the start() method on the AsyncMap instance is called.
- Return type:
AsyncMap
- class pipefunc.map.ZarrFileArray(folder, shape, internal_shape=None, shape_mask=None, *, store=None, object_codec=None)[source]#
Bases:
StorageBaseArray interface to a Zarr store.
Only exists if the
zarrpackage is installed!- dump(key, value)[source]#
Dump โvalueโ into the location associated with โkeyโ.
- Return type:
Examples
>>> arr = ZarrFileArray(...) >>> arr.dump((2, 1, 5), dict(a=1, b=2))
- property dump_in_subprocess: bool#
Indicates if the storage can be dumped in a subprocess and read by the main process.
- property mask: MaskedArray#
Return the mask associated with the array.
- class pipefunc.map.ZarrMemoryArray(folder, shape, internal_shape=None, shape_mask=None, *, store=None, object_codec=None)[source]#
Bases:
ZarrFileArrayArray interface to an in-memory Zarr store.
Only exists if the
zarrpackage is installed!
Bases:
ZarrMemoryArrayArray interface to a shared memory Zarr store.
Only exists if the
zarrpackage is installed!Indicates if the storage can be dumped in a subprocess and read by the main process.