pipefunc.helpers module#

Provides pipefunc.helpers module with various tools.

class pipefunc.helpers.FileArray(folder, shape, internal_shape=None, shape_mask=None, *, filename_template='__{:d}__.pickle')[source]#

Bases: StorageBase

Array interface to a folder of files on disk.

__getitem__ returns “np.ma.masked” for non-existent files.

dump(key, value)[source]#

Dump ‘value’ into the file associated with ‘key’.

Return type:

None

Examples

>>> arr = FileArray(...)
>>> arr.dump((2, 1, 5), dict(a=1, b=2))
property dump_in_subprocess: bool#

Indicates if the storage can be dumped in a subprocess and read by the main process.

classmethod from_data(data, folder)[source]#

Create a FileArray from an existing list or NumPy array.

This method serializes the provided data (which can be a Python list or a NumPy array) into the FileArray’s on-disk format within the specified folder. Each element of the input data is dumped individually.

This is useful for preparing large datasets for use with pipeline.map in distributed environments (like SLURM), as it allows pipefunc to pass around a lightweight FileArray object instead of serializing the entire large dataset for each task.

Parameters:
  • data (list[Any] | ndarray) – The list or NumPy-like array to store in the FileArray.

  • folder (str | Path) – The directory where the FileArray will store its data files. This folder must be accessible by all worker nodes if used in a distributed setting.

Returns:

A new FileArray instance populated with the provided data.

Return type:

FileArray

get_from_index(index)[source]#

Return the data associated with the given linear index.

Return type:

Any

has_index(index)[source]#

Return whether the given linear index exists.

Return type:

bool

property mask: MaskedArray#

Return a masked numpy array containing the mask.

The returned numpy array has dtype “bool” and a mask for masking out missing data.

mask_linear()[source]#

Return a list of booleans indicating which elements are missing.

Return type:

list[bool]

requires_serialization: bool = True#
storage_id: str = 'file_array'#
to_array(*, splat_internal=None)[source]#

Return a masked numpy array containing all the data.

The returned numpy array has dtype “object” and a mask for masking out missing data.

Parameters:

splat_internal (bool | None) – If True, the internal array dimensions will be splatted out. If None, it will happen if and only if internal_shape is provided.

Return type:

MaskedArray

Returns:

The array containing all the data.

class pipefunc.helpers.FileValue(path)[source]#

Bases: object

A reference to a value stored in a file.

This class provides a way to store and load values from files, which is useful for passing large objects between processes without serializing them directly.

Parameters:

path (str | Path) – Path to the file containing the serialized value.

Examples

>>> ref = FileValue.from_data([1, 2, 3], Path("data.pkl"))
>>> ref.load()
[1, 2, 3]
load()[source]#

Load the stored data from disk.

Return type:

Any

classmethod from_data(data, path)[source]#

Serializes data to the given file path and returns a FileValue to it.

This is useful for preparing a single large, non-iterable object for use with pipeline.map in distributed environments. The object is stored once on disk, and the lightweight FileValue can be passed to tasks, which then load the data on demand.

Parameters:
  • data (Any) – The Python object to serialize and store.

  • path (Path) – The full file path (including filename) where the data will be stored. This path must be accessible by all worker nodes if used in a distributed setting.

Returns:

A new FileValue instance pointing to the stored data.

Return type:

FileValue

pipefunc.helpers.chain(functions, *, copy=True)[source]#

Return a new list of PipeFuncs connected linearly by applying minimal renames.

The i+1-th function’s first parameter is renamed to the i-th function’s output name, creating a linear data flow. Other parameters (including additional inputs) are untouched.

Parameters:
  • functions (Sequence[PipeFunc | Callable]) – Sequence of PipeFuncs (or callables). Callables are wrapped as PipeFuncs with output_name=f.__name__.

  • copy (bool) – If True (default), return copies of the input PipeFuncs; original instances are not modified.

Returns:

New PipeFunc objects with renames applied so the data flows linearly.

Return type:

list[PipeFunc]

Notes

  • If a downstream function already has an unbound parameter matching an upstream output name, no rename is applied (prefer existing matches).

  • When no explicit match exists, the first parameter is renamed to the upstream output. The first parameter must not be bound; if it is, a ValueError is raised.

  • If a function has zero parameters (and is not the first in the chain), a ValueError is raised.

pipefunc.helpers.collect_kwargs(parameters, *, annotations=None, function_name='call')[source]#

Returns a callable with a signature as specified in parameters which returns a dict.

Parameters:
  • parameters (tuple[str, ...]) – Tuple of names, these names will be used for the function parameters.

  • annotations (tuple[type, ...] | None) – Optionally, provide type-annotations for the parameters. Must be the same length as parameters or None.

  • function_name (str) – The __name__ that is assigned to the returned callable.

Return type:

Callable[..., dict[str, Any]]

Returns:

Callable that returns the parameters in a dictionary.

Examples

This creates def yolo(a: int, b: list[int]) -> dict[str, Any]:

>>> f = collect_kwargs(("a", "b"), annotations=(int, list[int]), function_name="yolo")
>>> f(a=1, b=2)
{"a": 1, "b": 2}
async pipefunc.helpers.gather_maps(*async_maps, max_concurrent=1, max_completed_tabs=None, _tabs=None)[source]#

Run AsyncMap objects with a limit on simultaneous executions.

Parameters:
  • async_maps (AsyncMap) – AsyncMap objects created with pipeline.map_async(..., start=False).

  • max_concurrent (int) – Maximum number of concurrent jobs

  • max_completed_tabs (int | None) – Maximum number of completed tabs to show. If None, all completed tabs are shown. Only used if display_widgets=True.

Return type:

list[ResultDict]

Returns:

List of results from each AsyncMap’s task

pipefunc.helpers.get_attribute_factory(attribute_name, parameter_name, parameter_annotation, return_annotation, function_name='get_attribute')[source]#

Returns a callable that retrieves an attribute from its input parameter.

Parameters:
  • attribute_name (str) – The name of the attribute to access.

  • parameter_name (str) – The name of the input parameter.

  • parameter_annotation (type) – Optional, type annotation for the input parameter.

  • return_annotation (type) – Optional, type annotation for the return value.

  • function_name (str) – The __name__ that is assigned to the returned callable.

Return type:

Callable[[Any], Any]

Returns:

Callable that returns an attribute of its input parameter.

Examples

This creates def get_data(obj: MyClass) -> int:

>>> class MyClass:
...     def __init__(self, data: int) -> None:
...         self.data = data
>>> f = get_attribute_factory("data", parameter_name="obj", parameter_annotation=MyClass, return_annotation=int, function_name="get_data")
>>> f(MyClass(data=123))
123
pipefunc.helpers.launch_maps(*async_maps, max_concurrent=1, max_completed_tabs=None)[source]#

Launch a collection of map operations to run concurrently in the background.

This is a user-friendly, non-blocking wrapper around gather_maps. It immediately returns an asyncio.Task object, which can be awaited later to retrieve the results. This is ideal for use in interactive environments like Jupyter notebooks.

Parameters:
  • async_maps (AsyncMap) – AsyncMap objects created with pipeline.map_async(..., start=False).

  • max_concurrent (int) – Maximum number of map operations to run at the same time.

  • max_completed_tabs (int | None) – Maximum number of completed tabs to show. If None, all completed tabs are shown. Only used if display_widgets=True.

Returns:

A task handle representing the background execution of the maps. await this task to get the list of results.

Return type:

asyncio.Task

Examples

>>> # In a Jupyter notebook cell:
>>> task = launch_maps(runners, max_concurrent=2)
>>> # In a later cell:
>>> results = await task
>>> print("Computation finished!")