pipefunc.helpers module#
Provides pipefunc.helpers module with various tools.
- class pipefunc.helpers.FileArray(folder, shape, internal_shape=None, shape_mask=None, *, filename_template='__{:d}__.pickle')[source]#
Bases:
StorageBaseArray interface to a folder of files on disk.
__getitem__ returns “np.ma.masked” for non-existent files.
- dump(key, value)[source]#
Dump ‘value’ into the file associated with ‘key’.
- Return type:
Examples
>>> arr = FileArray(...) >>> arr.dump((2, 1, 5), dict(a=1, b=2))
- property dump_in_subprocess: bool#
Indicates if the storage can be dumped in a subprocess and read by the main process.
- classmethod from_data(data, folder)[source]#
Create a FileArray from an existing list or NumPy array.
This method serializes the provided data (which can be a Python list or a NumPy array) into the FileArray’s on-disk format within the specified folder. Each element of the input data is dumped individually.
This is useful for preparing large datasets for use with pipeline.map in distributed environments (like SLURM), as it allows
pipefuncto pass around a lightweightFileArrayobject instead of serializing the entire large dataset for each task.- Parameters:
- Returns:
A new FileArray instance populated with the provided data.
- Return type:
- property mask: MaskedArray#
Return a masked numpy array containing the mask.
The returned numpy array has dtype “bool” and a mask for masking out missing data.
- requires_serialization: bool = True#
- storage_id: str = 'file_array'#
- class pipefunc.helpers.FileValue(path)[source]#
Bases:
objectA reference to a value stored in a file.
This class provides a way to store and load values from files, which is useful for passing large objects between processes without serializing them directly.
Examples
>>> ref = FileValue.from_data([1, 2, 3], Path("data.pkl")) >>> ref.load() [1, 2, 3]
- classmethod from_data(data, path)[source]#
Serializes data to the given file path and returns a FileValue to it.
This is useful for preparing a single large, non-iterable object for use with pipeline.map in distributed environments. The object is stored once on disk, and the lightweight FileValue can be passed to tasks, which then load the data on demand.
- Parameters:
- Returns:
A new FileValue instance pointing to the stored data.
- Return type:
- pipefunc.helpers.chain(functions, *, copy=True)[source]#
Return a new list of PipeFuncs connected linearly by applying minimal renames.
The i+1-th function’s first parameter is renamed to the i-th function’s output name, creating a linear data flow. Other parameters (including additional inputs) are untouched.
- Parameters:
- Returns:
New PipeFunc objects with renames applied so the data flows linearly.
- Return type:
Notes
If a downstream function already has an unbound parameter matching an upstream output name, no rename is applied (prefer existing matches).
When no explicit match exists, the first parameter is renamed to the upstream output. The first parameter must not be bound; if it is, a ValueError is raised.
If a function has zero parameters (and is not the first in the chain), a ValueError is raised.
- pipefunc.helpers.collect_kwargs(parameters, *, annotations=None, function_name='call')[source]#
Returns a callable with a signature as specified in
parameterswhich returns a dict.- Parameters:
parameters (
tuple[str,...]) – Tuple of names, these names will be used for the function parameters.annotations (
tuple[type,...] |None) – Optionally, provide type-annotations for theparameters. Must be the same length asparametersorNone.function_name (
str) – The__name__that is assigned to the returned callable.
- Return type:
- Returns:
Callable that returns the parameters in a dictionary.
Examples
This creates
def yolo(a: int, b: list[int]) -> dict[str, Any]:>>> f = collect_kwargs(("a", "b"), annotations=(int, list[int]), function_name="yolo") >>> f(a=1, b=2) {"a": 1, "b": 2}
- async pipefunc.helpers.gather_maps(*async_maps, max_concurrent=1, max_completed_tabs=None, _tabs=None)[source]#
Run AsyncMap objects with a limit on simultaneous executions.
- Parameters:
- Return type:
list[ResultDict]- Returns:
List of results from each AsyncMap’s task
- pipefunc.helpers.get_attribute_factory(attribute_name, parameter_name, parameter_annotation, return_annotation, function_name='get_attribute')[source]#
Returns a callable that retrieves an attribute from its input parameter.
- Parameters:
attribute_name (
str) – The name of the attribute to access.parameter_name (
str) – The name of the input parameter.parameter_annotation (
type) – Optional, type annotation for the input parameter.return_annotation (
type) – Optional, type annotation for the return value.function_name (
str) – The__name__that is assigned to the returned callable.
- Return type:
- Returns:
Callable that returns an attribute of its input parameter.
Examples
This creates
def get_data(obj: MyClass) -> int:>>> class MyClass: ... def __init__(self, data: int) -> None: ... self.data = data >>> f = get_attribute_factory("data", parameter_name="obj", parameter_annotation=MyClass, return_annotation=int, function_name="get_data") >>> f(MyClass(data=123)) 123
- pipefunc.helpers.launch_maps(*async_maps, max_concurrent=1, max_completed_tabs=None)[source]#
Launch a collection of map operations to run concurrently in the background.
This is a user-friendly, non-blocking wrapper around
gather_maps. It immediately returns anasyncio.Taskobject, which can be awaited later to retrieve the results. This is ideal for use in interactive environments like Jupyter notebooks.- Parameters:
async_maps (
AsyncMap) –AsyncMapobjects created withpipeline.map_async(..., start=False).max_concurrent (
int) – Maximum number of map operations to run at the same time.max_completed_tabs (
int|None) – Maximum number of completed tabs to show. IfNone, all completed tabs are shown. Only used ifdisplay_widgets=True.
- Returns:
A task handle representing the background execution of the maps.
awaitthis task to get the list of results.- Return type:
Examples
>>> # In a Jupyter notebook cell: >>> task = launch_maps(runners, max_concurrent=2)
>>> # In a later cell: >>> results = await task >>> print("Computation finished!")