pipefunc.map module

Contents

pipefunc.map module#

pipefunc.map: Modules that handle MapSpecs and its runs.

class pipefunc.map.DictArray(folder, shape, internal_shape=None, shape_mask=None, *, mapping=None)[source]#

Bases: StorageBase

A numpy.ndarray backed by a dict with internal structure.

dump(key, value)[source]#

Dump โ€˜valueโ€™ into the location associated with โ€˜keyโ€™.

Return type:

None

Examples

>>> arr = DictArray(...)
>>> arr.dump((2, 1, 5), dict(a=1, b=2))
property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

get_from_index(index)[source]#

Return the data associated with the given linear index.

Return type:

Any

has_index(index)[source]#

Return whether the given linear index exists.

Return type:

bool

load()[source]#

Load the dict storage from disk.

Return type:

None

property mask: MaskedArray#

Return the mask associated with the array.

mask_linear()[source]#

Return a list of booleans indicating which elements are missing.

Return type:

list[bool]

persist()[source]#

Persist the dict storage to disk.

Return type:

None

requires_serialization: bool = False#
storage_id: str = 'dict'#
to_array(*, splat_internal=None)[source]#

Return the array as a NumPy masked array.

Return type:

MaskedArray

class pipefunc.map.FileArray(folder, shape, internal_shape=None, shape_mask=None, *, filename_template='__{:d}__.pickle')[source]#

Bases: StorageBase

Array interface to a folder of files on disk.

__getitem__ returns โ€œnp.ma.maskedโ€ for non-existent files.

dump(key, value)[source]#

Dump โ€˜valueโ€™ into the file associated with โ€˜keyโ€™.

Return type:

None

Examples

>>> arr = FileArray(...)
>>> arr.dump((2, 1, 5), dict(a=1, b=2))
property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

classmethod from_data(data, folder)[source]#

Create a FileArray from an existing list or NumPy array.

This method serializes the provided data (which can be a Python list or a NumPy array) into the FileArrayโ€™s on-disk format within the specified folder. Each element of the input data is dumped individually.

This is useful for preparing large datasets for use with pipeline.map in distributed environments (like SLURM), as it allows pipefunc to pass around a lightweight FileArray object instead of serializing the entire large dataset for each task.

Parameters:
  • data (list[Any] | ndarray) โ€“ The list or NumPy-like array to store in the FileArray.

  • folder (str | Path) โ€“ The directory where the FileArray will store its data files. This folder must be accessible by all worker nodes if used in a distributed setting.

Returns:

A new FileArray instance populated with the provided data.

Return type:

FileArray

get_from_index(index)[source]#

Return the data associated with the given linear index.

Return type:

Any

has_index(index)[source]#

Return whether the given linear index exists.

Return type:

bool

property mask: MaskedArray#

Return a masked numpy array containing the mask.

The returned numpy array has dtype โ€œboolโ€ and a mask for masking out missing data.

mask_linear()[source]#

Return a list of booleans indicating which elements are missing.

Return type:

list[bool]

requires_serialization: bool = True#
storage_id: str = 'file_array'#
to_array(*, splat_internal=None)[source]#

Return a masked numpy array containing all the data.

The returned numpy array has dtype โ€œobjectโ€ and a mask for masking out missing data.

Parameters:

splat_internal (bool | None) โ€“ If True, the internal array dimensions will be splatted out. If None, it will happen if and only if internal_shape is provided.

Return type:

MaskedArray

Returns:

The array containing all the data.

shape: ShapeTuple#
internal_shape: ShapeTuple#
shape_mask: tuple[bool, ...]#
class pipefunc.map.MapSpec(inputs, outputs, _is_generated=False)[source]#

Bases: object

Specification for how to map input axes to output axes.

Examples

>>> mapped = MapSpec.from_string("a[i, j], b[i, j], c[k] -> q[i, j, k]")
>>> partial_reduction = MapSpec.from_string("a[i, :], b[:, k] -> q[i, k]")
add_axes(*axis)[source]#

Return a new MapSpec with additional axes.

Return type:

MapSpec

property external_indices: tuple[str, ...][source]#

Output indices that are shared with the input indices.

classmethod from_string(expr)[source]#

Construct an MapSpec from a string.

Return type:

MapSpec

property input_indices: set[str]#

Return the index names of the input arrays.

input_keys(shape, linear_index)[source]#

Return keys for indexing inputs of this map.

Parameters:
  • shape (tuple[int, ...]) โ€“ The shape of the map output.

  • linear_index (int) โ€“ The index of the element for which to return the keys.

Return type:

dict[str, tuple[slice | int, ...]]

Examples

>>> spec = MapSpec("x[i, j], y[j, :, k] -> z[i, j, k]")
>>> spec.input_keys((5, 2, 3), 23)
{'x': (3, 1), 'y': (1, slice(None, None, None), 2)}
property input_names: tuple[str, ...]#

Return the parameter names of this mapspec.

property output_indices: tuple[str, ...]#

Return the index names of the output array.

output_key(shape, linear_index)[source]#

Return a key used for indexing the output of this map.

Parameters:
  • shape (tuple[int, ...]) โ€“ The shape of the map output.

  • linear_index (int) โ€“ The index of the element for which to return the key.

Return type:

tuple[int, ...]

Examples

>>> spec = MapSpec.from_string("x[i, j], y[j, :, k] -> z[i, j, k]")
>>> spec.output_key((5, 2, 3), 23)
(3, 1, 2)
property output_names: tuple[str, ...]#

Return the names of the output arrays.

rename(renames)[source]#

Return a new renamed MapSpec if any of the names are in โ€˜renamesโ€™.

Return type:

MapSpec

rename_axes(renames)[source]#

Return a new MapSpec with renamed axes.

Parameters:

renames (dict[str, str]) โ€“ Dictionary mapping old axis names to new axis names.

Return type:

MapSpec

Returns:

A new MapSpec with renamed axes applied to all inputs and outputs.

Examples

>>> spec = MapSpec.from_string("a[i, j], b[i, j] -> c[i, j]")
>>> renamed = spec.rename_axes({"i": "x", "j": "y"})
>>> str(renamed)
'a[x, y], b[x, y] -> c[x, y]'
shape(input_shapes, internal_shapes=None)[source]#

Return the shape of the output of this MapSpec.

Parameters:
  • input_shapes (dict[str, tuple[int | Literal['?'], ...]]) โ€“ Shapes of the inputs, keyed by name.

  • internal_shapes (dict[str, tuple[int | Literal['?'], ...]] | None) โ€“ Shapes of the outputs, keyed by name. Provide this only if the output has an axis not shared with any input.

Return type:

tuple[tuple[int | Literal['?'], ...], tuple[bool, ...]]

to_string()[source]#

Return a faithful representation of a MapSpec as a string.

Return type:

str

inputs: tuple[ArraySpec, ...]#
outputs: tuple[ArraySpec, ...]#
class pipefunc.map.RunInfo(inputs, defaults, all_output_names, shapes, resolved_shapes, internal_shapes, shape_masks, run_folder, mapspecs_as_strings, storage, error_handling='raise', pipefunc_version='0.90.2.dev1+None.gb2ba6a8.dirty')[source]#

Bases: object

Information about a pipeline.map() run.

The data in this class is immutable, except for resolved_shapes which is updated as new shapes are resolved.

classmethod create(run_folder, pipeline, inputs, internal_shapes=None, *, executor=None, storage, error_handling='raise', cleanup=None, resume=False, resume_validation='auto')[source]#
Return type:

RunInfo

property defaults_path: Path#
dump()[source]#

Dump the RunInfo to a file.

Return type:

None

error_handling: Literal['raise', 'continue'] = 'raise'#
init_store()[source]#
Return type:

dict[str, StorageBase | Path | DirectValue]

property input_paths: dict[str, Path]#
classmethod load(run_folder)[source]#
Return type:

RunInfo

property mapspecs: list[MapSpec][source]#
static path(run_folder)[source]#
Return type:

Path

pipefunc_version: str = '0.90.2.dev1+None.gb2ba6a8.dirty'#
resolve_downstream_shapes(output_name, store, output=None, shape=None)[source]#
Return type:

None

storage_class(output_name)[source]#
Return type:

type[StorageBase]

inputs: dict[str, Any]#
defaults: dict[str, Any]#
all_output_names: set[str]#
shapes: dict[str | tuple[str, ...], tuple[int | Literal['?'], ...]]#
resolved_shapes: dict[str | tuple[str, ...], tuple[int | Literal['?'], ...]]#
internal_shapes: dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None#
shape_masks: dict[str | tuple[str, ...], tuple[bool, ...]]#
run_folder: Path | None#
mapspecs_as_strings: list[str]#
storage: str | dict[str | tuple[str, ...], str]#
class pipefunc.map.SharedMemoryDictArray(folder, shape, internal_shape=None, shape_mask=None, *, mapping=None)[source]#

Bases: DictArray

Array interface to a shared memory dict store.

property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

requires_serialization: bool = True#
storage_id: str = 'shared_memory_dict'#
class pipefunc.map.StorageBase(folder, shape, internal_shape=None, shape_mask=None)[source]#

Bases: ABC

Base class for file-based arrays.

abstractmethod dump(key, value)[source]#
Return type:

None

abstract property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

property full_shape: tuple[int, ...][source]#

Return the full shape of the array.

full_shape_is_resolved()[source]#

Return whether the shape is resolved.

Return type:

bool

abstractmethod get_from_index(index)[source]#
Return type:

Any

abstractmethod has_index(index)[source]#
Return type:

bool

abstract property mask: MaskedArray#
abstractmethod mask_linear()[source]#
Return type:

list[bool]

persist()[source]#

Save a memory-based storage to disk.

Return type:

None

property rank: int#

Return the rank of the array.

property resolved_internal_shape: tuple[int, ...][source]#
property resolved_shape: tuple[int, ...][source]#

Return the resolved shape of the array.

property size: int#

Return number of elements in the array.

property strides: tuple[int, ...][source]#

Return the strides of the array.

abstractmethod to_array(*, splat_internal=None)[source]#
Return type:

MaskedArray

shape: tuple[int | Literal['?'], ...]#
internal_shape: tuple[int | Literal['?'], ...]#
shape_mask: tuple[bool, ...]#
storage_id: str#
requires_serialization: bool#
pipefunc.map.load_all_outputs(run_folder)[source]#

Load all outputs of a run.

Allows loading outputs even from partial runs that failed. Map results are returned as `numpy.masked_array`s where missing values are masked.

Parameters:

run_folder (str | Path) โ€“ The run_folder used in pipeline.map or pipeline.map_async.

Return type:

dict[str, Any]

See also

load_outputs

For loading specific outputs.

pipefunc.map.load_dataframe(*output_name, run_folder, load_intermediate=True, backend='pandas')[source]#

Load the output(s) of a pipeline.map as a DataFrame.

Parameters:
  • output_name (str) โ€“ The names of the outputs to load. If empty, all outputs are loaded.

  • run_folder (str | Path) โ€“ The folder where the pipeline run was stored.

  • load_intermediate (bool) โ€“ Whether to load intermediate outputs.

  • backend (Literal['pandas', 'polars']) โ€“ DataFrame library to use when constructing the result. Defaults to "pandas".

Return type:

Any

Returns:

A DataFrame from the selected backend containing the outputs of the pipeline run.

pipefunc.map.load_outputs(*output_names, run_folder)[source]#

Load the outputs of a run.

Allows loading outputs even from partial runs that failed. Map results are returned as `numpy.masked_array`s where missing values are masked.

Parameters:
  • output_names (str) โ€“ The names of the outputs to load. If empty, no outputs are loaded.

  • run_folder (str | Path) โ€“ The run_folder used in pipeline.map or pipeline.map_async.

Return type:

Any

See also

load_all_outputs

For loading all outputs.

pipefunc.map.load_xarray_dataset(*output_name, run_folder, load_intermediate=True)[source]#

Load the output(s) of a pipeline.map as an xarray.Dataset.

Parameters:
  • output_name (str) โ€“ The names of the outputs to load. If empty, all outputs are loaded.

  • run_folder (str | Path) โ€“ The folder where the pipeline run was stored.

  • load_intermediate (bool) โ€“ Whether to load intermediate outputs.

Return type:

Dataset

Returns:

An xarray.Dataset containing the outputs of the pipeline run.

pipefunc.map.register_storage(cls, storage_id=None)[source]#

Register a StorageBase class.

Parameters:
  • cls (type[StorageBase]) โ€“ Storage class that should be registered.

  • storage_id (str | None) โ€“ Storage identifier, defaults to the storage_id attribute of the class.

Return type:

None

Notes

This function maintains a mapping from storage identifiers to storage classes. When a storage class is registered, it will replace any class previously registered under the same storage identifier, if present.

pipefunc.map.run_map(pipeline, inputs, run_folder=None, internal_shapes=None, *, output_names=None, parallel=True, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise')[source]#

Run a pipeline with MapSpec functions for given inputs.

Parameters:
  • pipeline (Pipeline) โ€“ The pipeline to run.

  • inputs (dict[str, Any] | BaseModel) โ€“ The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions without mapspec or lists of values or numpy.ndarray`s for functions with ``mapspec`.

  • run_folder (str | Path | None) โ€“ The folder to store the run information. If None, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) โ€“ The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or โ€œ?โ€ for unknown dimensions. The internal_shape can also be provided via the PipeFunc(..., internal_shape=...) argument. If a PipeFunc has an internal_shape argument and it is provided here, the provided value is used.

  • output_names (set[str | tuple[str, ...]] | None) โ€“ The output(s) to calculate. If None, the entire pipeline is run and all outputs are computed.

  • parallel (bool) โ€“ Whether to run the functions in parallel. Is ignored if provided executor is not None.

  • executor (Executor | dict[str | tuple[str, ...], Executor] | None) โ€“

    The executor to use for parallel execution. Can be specified as:

    1. None: A concurrent.futures.ProcessPoolExecutor is used (only if parallel=True).

    2. A concurrent.futures.Executor instance: Used for all outputs.

    3. A dictionary: Specify different executors for different outputs.

      • Use output names as keys and Executor instances as values.

      • Use an empty string "" as a key to set a default executor.

    If parallel is False, this argument is ignored.

  • chunksizes (int | dict[str | tuple[str, ...], int | Callable[[int], int] | None] | None) โ€“

    Controls batching of MapSpec computations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:

    • None: Automatically determine optimal chunk sizes (default)

    • int: Same chunk size for all outputs

    • dict: Different chunk sizes per output where:
      • Keys are output names (or "" for default)

      • Values are either integers or callables

      • Callables take total execution count and return chunk size

    Examples:

    >>> chunksizes = None  # Auto-determine optimal chunk sizes
    >>> chunksizes = 100  # All outputs use chunks of 100
    >>> chunksizes = {"out1": 50, "out2": 100}  # Different sizes per output
    >>> chunksizes = {"": 50, "out1": lambda n: n // 20}  # Default and dynamic
    

  • storage (str | Literal['file_array', 'dict', 'shared_memory_dict'] | dict[str | tuple[str, ...], Literal['file_array', 'dict', 'shared_memory_dict'] | str] | None) โ€“

    The storage class to use for storing intermediate and final results. Can be specified as:

    1. A string: Use a single storage class for all outputs.

    2. A dictionary: Specify different storage classes for different outputs.

      • Use output names as keys and storage class names as values.

      • Use an empty string "" as a key to set a default storage class.

    Available storage classes are registered in pipefunc.map.storage_registry. Common options include "file_array", "dict", and "shared_memory_dict". Defaults to "file_array" if run_folder is provided, otherwise "dict".

  • persist_memory (bool) โ€“ Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.

  • cleanup (bool | None) โ€“

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) โ€“

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

  • resume_validation (Literal['auto', 'strict', 'skip']) โ€“

    Controls validation strictness when reusing data from a previous run (only applies when resume=True):

    • "auto" (default): Validate that inputs/defaults match the previous run. If equality comparison fails (returns None), warn but proceed anyway.

    • "strict": Validate that inputs/defaults match. Raise an error if equality comparison fails.

    • "skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.

    Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when resume=False.

  • fixed_indices (dict[str, int | slice] | None) โ€“ A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.

  • auto_subpipeline (bool) โ€“ If True, a subpipeline is created with the specified inputs, using Pipeline.subpipeline. This allows to provide intermediate results in the inputs instead of providing the root arguments. If False, all root arguments must be provided, and an exception is raised if any are missing.

  • show_progress (bool | Literal['rich', 'ipywidgets', 'headless'] | None) โ€“

    Whether to display a progress bar. Can be:

    • True: Display a progress bar. Auto-selects based on environment: ipywidgets in Jupyter (if installed), otherwise rich (if installed).

    • False: No progress bar.

    • "ipywidgets": Force ipywidgets progress bar (HTML-based). Shown only if in a Jupyter notebook and ipywidgets is installed.

    • "rich": Force rich progress bar (text-based). Shown only if rich is installed.

    • "headless": No progress bar, but the progress is still tracked internally.

    • None (default): Shows ipywidgets progress bar only if running in a Jupyter notebook and ipywidgets is installed. Otherwise, no progress bar is shown.

  • return_results (bool) โ€“ Whether to return the results of the pipeline. If False, the pipeline is run without keeping the results in memory. Instead the results are only kept in the set storage. This is useful for very large pipelines where the results do not fit into memory.

  • error_handling (Literal['raise', 'continue']) โ€“

    How to handle errors during function execution:

    • "raise" (default): Stop execution on first error and raise exception

    • "continue": Continue execution, collecting errors as ErrorSnapshot objects

Return type:

ResultDict

pipefunc.map.run_map_async(pipeline, inputs, run_folder=None, internal_shapes=None, *, output_names=None, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise', display_widgets=True, start=True)[source]#

Asynchronously run a pipeline with MapSpec functions for given inputs.

Returns immediately with an AsyncRun instance with a task attribute that can be awaited.

Parameters:
  • pipeline (Pipeline) โ€“ The pipeline to run.

  • inputs (dict[str, Any] | BaseModel) โ€“ The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions without mapspec or lists of values or numpy.ndarray`s for functions with ``mapspec`.

  • run_folder (str | Path | None) โ€“ The folder to store the run information. If None, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) โ€“ The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or โ€œ?โ€ for unknown dimensions. The internal_shape can also be provided via the PipeFunc(..., internal_shape=...) argument. If a PipeFunc has an internal_shape argument and it is provided here, the provided value is used.

  • output_names (set[str | tuple[str, ...]] | None) โ€“ The output(s) to calculate. If None, the entire pipeline is run and all outputs are computed.

  • executor (Executor | dict[str | tuple[str, ...], Executor] | None) โ€“

    The executor to use for parallel execution. Can be specified as:

    1. None: A concurrent.futures.ProcessPoolExecutor is used (only if parallel=True).

    2. A concurrent.futures.Executor instance: Used for all outputs.

    3. A dictionary: Specify different executors for different outputs.

      • Use output names as keys and Executor instances as values.

      • Use an empty string "" as a key to set a default executor.

  • chunksizes (int | dict[str | tuple[str, ...], int | Callable[[int], int] | None] | None) โ€“

    Controls batching of MapSpec computations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:

    • None: Automatically determine optimal chunk sizes (default)

    • int: Same chunk size for all outputs

    • dict: Different chunk sizes per output where:
      • Keys are output names (or "" for default)

      • Values are either integers or callables

      • Callables take total execution count and return chunk size

    Examples:

    >>> chunksizes = None  # Auto-determine optimal chunk sizes
    >>> chunksizes = 100  # All outputs use chunks of 100
    >>> chunksizes = {"out1": 50, "out2": 100}  # Different sizes per output
    >>> chunksizes = {"": 50, "out1": lambda n: n // 20}  # Default and dynamic
    

  • storage (str | Literal['file_array', 'dict', 'shared_memory_dict'] | dict[str | tuple[str, ...], Literal['file_array', 'dict', 'shared_memory_dict'] | str] | None) โ€“

    The storage class to use for storing intermediate and final results. Can be specified as:

    1. A string: Use a single storage class for all outputs.

    2. A dictionary: Specify different storage classes for different outputs.

      • Use output names as keys and storage class names as values.

      • Use an empty string "" as a key to set a default storage class.

    Available storage classes are registered in pipefunc.map.storage_registry. Common options include "file_array", "dict", and "shared_memory_dict". Defaults to "file_array" if run_folder is provided, otherwise "dict".

  • persist_memory (bool) โ€“ Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.

  • cleanup (bool | None) โ€“

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) โ€“

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

  • resume_validation (Literal['auto', 'strict', 'skip']) โ€“

    Controls validation strictness when reusing data from a previous run (only applies when resume=True):

    • "auto" (default): Validate that inputs/defaults match the previous run. If equality comparison fails (returns None), warn but proceed anyway.

    • "strict": Validate that inputs/defaults match. Raise an error if equality comparison fails.

    • "skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.

    Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when resume=False.

  • fixed_indices (dict[str, int | slice] | None) โ€“ A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.

  • auto_subpipeline (bool) โ€“ If True, a subpipeline is created with the specified inputs, using Pipeline.subpipeline. This allows to provide intermediate results in the inputs instead of providing the root arguments. If False, all root arguments must be provided, and an exception is raised if any are missing.

  • show_progress (bool | Literal['rich', 'ipywidgets', 'headless'] | None) โ€“

    Whether to display a progress bar. Can be:

    • True: Display a progress bar. Auto-selects based on environment: ipywidgets in Jupyter (if installed), otherwise rich (if installed).

    • False: No progress bar.

    • "ipywidgets": Force ipywidgets progress bar (HTML-based). Shown only if in a Jupyter notebook and ipywidgets is installed.

    • "rich": Force rich progress bar (text-based). Shown only if rich is installed.

    • "headless": No progress bar, but the progress is still tracked internally.

    • None (default): Shows ipywidgets progress bar only if running in a Jupyter notebook and ipywidgets is installed. Otherwise, no progress bar is shown.

  • display_widgets (bool) โ€“ Whether to call IPython.display.display(...) on widgets. Ignored if outside of a Jupyter notebook.

  • return_results (bool) โ€“ Whether to return the results of the pipeline. If False, the pipeline is run without keeping the results in memory. Instead the results are only kept in the set storage. This is useful for very large pipelines where the results do not fit into memory.

  • error_handling (Literal['raise', 'continue']) โ€“

    How to handle errors during function execution:

    • "raise" (default): Stop execution on first error and raise exception

    • "continue": Continue execution, collecting errors as ErrorSnapshot objects

  • start (bool) โ€“ Whether to start the pipeline immediately. If False, the pipeline is not started until the start() method on the AsyncMap instance is called.

Return type:

AsyncMap

class pipefunc.map.ZarrFileArray(folder, shape, internal_shape=None, shape_mask=None, *, store=None, object_codec=None)[source]#

Bases: StorageBase

Array interface to a Zarr store.

Only exists if the zarr package is installed!

dump(key, value)[source]#

Dump โ€˜valueโ€™ into the location associated with โ€˜keyโ€™.

Return type:

None

Examples

>>> arr = ZarrFileArray(...)
>>> arr.dump((2, 1, 5), dict(a=1, b=2))
property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

get_from_index(index)[source]#

Return the data associated with the given linear index.

Return type:

Any

has_index(index)[source]#

Return whether the given linear index exists.

Return type:

bool

property mask: MaskedArray#

Return the mask associated with the array.

mask_linear()[source]#

Return a list of booleans indicating which elements are missing.

Return type:

list[bool]

property rank: int#

Return the rank of the array.

requires_serialization: bool = True#
property size: int#

Return number of elements in the array.

storage_id: str = 'zarr_file_array'#
to_array(*, splat_internal=None)[source]#

Return the array as a NumPy masked array.

Return type:

MaskedArray

class pipefunc.map.ZarrMemoryArray(folder, shape, internal_shape=None, shape_mask=None, *, store=None, object_codec=None)[source]#

Bases: ZarrFileArray

Array interface to an in-memory Zarr store.

Only exists if the zarr package is installed!

property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

load()[source]#

Load the memory storage from disk.

Return type:

None

persist()[source]#

Persist the memory storage to disk.

Return type:

None

property persistent_store: Store | None#

Return the persistent store.

requires_serialization: bool = False#
storage_id: str = 'zarr_memory'#
class pipefunc.map.ZarrSharedMemoryArray(folder, shape, internal_shape=None, shape_mask=None, *, store=None, object_codec=None)[source]#

Bases: ZarrMemoryArray

Array interface to a shared memory Zarr store.

Only exists if the zarr package is installed!

property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

requires_serialization: bool = True#
storage_id: str = 'zarr_shared_memory'#