pipefunc module

Contents

pipefunc module#

PipeFunc: A Python library for defining, managing, and executing function pipelines.

class pipefunc.ErrorSnapshot(function, exception, args, kwargs, timestamp=<factory>, user=<factory>, machine=<factory>, ip_address=<factory>, current_directory=<factory>)[source]#

Bases: object

A snapshot that represents an error in a function call.

classmethod load_from_file(filename)[source]#

Load an error snapshot from a file using cloudpickle.

Return type:

ErrorSnapshot

reproduce()[source]#

Attempt to recreate the error by calling the function with stored arguments.

Return type:

Any | None

save_to_file(filename)[source]#

Save the error snapshot to a file using cloudpickle.

Return type:

None

function: Callable[..., Any]#
exception: Exception#
args: tuple[Any, ...]#
kwargs: dict[str, Any]#
traceback: str#
timestamp: str#
user: str#
machine: str#
ip_address: str#
current_directory: str#
class pipefunc.NestedPipeFunc(pipefuncs, output_name=None, function_name=None, *, renames=None, mapspec=None, resources=None, resources_scope='map', bound=None, cache=None, variant=None, variant_group=None)[source]#

Bases: PipeFunc

Combine multiple PipeFunc instances into a single function with an internal Pipeline.

Parameters:
  • pipefuncs (list[PipeFunc]) – A sequence of at least 2 PipeFunc instances to combine into a single function.

  • output_name (str | tuple[str, ...] | None) – The identifier for the output of the wrapped function. If None, it is automatically constructed from all the output names of the PipeFunc instances. Must be a subset of the output names of the PipeFunc instances.

  • function_name (str | None) – The name of the nested function, if None the name will be set to "NestedPipeFunc_{output_name[0]}_{output_name[...]}".

  • mapspec (str | MapSpec | None) – MapSpec for the joint function. If None, the mapspec is inferred from the individual PipeFunc instances. None of the MapsSpec instances should have a reduction and all should use identical axes.

  • resources (dict | Resources | None) – Same as the PipeFunc class. However, if it is None here, it is inferred from from the PipeFunc instances. Specifically, it takes the maximum of the resources. Unlike the PipeFunc class, the resources argument cannot be a callable.

  • resources_scope (Literal['map', 'element']) –

    Same as the PipeFunc class. Determines how resources are allocated in relation to the mapspec:

    • ”map”: Allocate resources for the entire mapspec operation (default).

    • ”element”: Allocate resources for each element in the mapspec.

    If no mapspec is defined, this parameter is ignored.

  • cache (bool | None) – Flag indicating whether the wrapped function should be cached. If None, cache will be set to True if any of the PipeFunc instances have caching enabled.

  • bound (dict[str, Any] | None) – Same as the PipeFunc class. Bind arguments to the functions. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.

  • variant (str | dict[str | None, str] | None) –

    Same as the PipeFunc class. Identifies this function as an alternative implementation in a VariantPipeline and specifies which variant groups it belongs to. When multiple functions share the same output_name, variants allow selecting which implementation to use during pipeline execution.

    Can be specified in two formats: - A string (e.g., "fast"): Places the function in the default unnamed

    group (None) with the specified variant name. Equivalent to {None: "fast"}.

    • A dictionary (e.g., {"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.

    Functions with the same output_name but different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have β€œpreprocessing” variants (β€œv1”/”v2”) independent from β€œcomputation” variants (β€œfast”/”accurate”), allowing you to select specific combinations like {"preprocessing": "v1", "computation": "fast"}.

  • variant_group (str | None) – DEPRECATED in v0.58.0: Use variant instead.

pipefuncs#

List of PipeFunc instances (copies of input) that are used in the internal pipeline.

pipeline#

The Pipeline instance that manages the PipeFunc instances.

Notes

The NestedPipeFunc class is a subclass of the PipeFunc class that allows you to combine multiple PipeFunc instances into a single function that has an internal Pipeline instance.

copy(**update)[source]#

Create a copy of the PipeFunc instance, optionally updating the attributes.

Return type:

NestedPipeFunc

property func: Callable[[...], tuple[Any, ...]][source]#
property original_parameters: dict[str, Any][source]#

Return the original (before renames) parameters of the wrapped function.

Returns:

  • A mapping of the original parameters of the wrapped function to their

  • respective inspect.Parameter objects.

property output_annotation: dict[str, Any][source]#

Return the type annotation of the wrapped function’s output.

property parameter_annotations: dict[str, Any][source]#

Return the type annotations of the wrapped function’s parameters.

class pipefunc.PipeFunc(func, output_name, *, output_picker=None, renames=None, defaults=None, bound=None, profile=False, debug=False, print_error=True, cache=False, mapspec=None, internal_shape=None, post_execution_hook=None, resources=None, resources_variable=None, resources_scope='map', scope=None, variant=None, variant_group=None)[source]#

Bases: Generic[T]

Function wrapper class for pipeline functions with additional attributes.

Parameters:
  • func (TypeVar(T, bound= Callable[..., Any])) – The original function to be wrapped.

  • output_name (str | tuple[str, ...]) – The identifier for the output of the wrapped function. Provide a tuple of strings for multiple outputs.

  • output_picker (Callable[[Any, str], Any] | None) – A function that takes the output of the wrapped function as first argument and the output_name (str) as second argument, and returns the desired output. If None, the output of the wrapped function is returned as is.

  • renames (dict[str, str] | None) – A dictionary for renaming function arguments and outputs. The keys are the original names (as defined in the function signature or the output_name), and the values are the new names to be used. This allows you to change how the function is called without modifying its internal logic. For example, {"old_name": "new_name"} would allow the function to be called with new_name instead of old_name. If renaming the output_name, include it in this dictionary as well.

  • defaults (dict[str, Any] | None) – Set defaults for parameters. Overwrites any current defaults. Must be in terms of the renamed argument names.

  • bound (dict[str, Any] | None) – Bind arguments to the function. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.

  • profile (bool) – Flag indicating whether the wrapped function should be profiled. Profiling is only available for sequential execution.

  • debug (bool) – Flag indicating whether debug information should be printed.

  • print_error (bool) – Flag indicating whether errors raised during the function execution should be printed.

  • cache (bool) – Flag indicating whether the wrapped function should be cached.

  • mapspec (str | MapSpec | None) – This is a specification for mapping that dictates how input values should be merged together. If None, the default behavior is that the input directly maps to the output.

  • internal_shape (int | Literal['?'] | tuple[int | Literal['?'], ...] | None) – The shape of the output produced by this function when it is used within a ``mapspec`` context. Can be an int or a tuple of ints, or β€œ?” for unknown dimensions, or a tuple with a mix of both. If not provided, the shape will be inferred from the first execution of the function. If provided, the shape will be validated against the actual shape of the output. This parameter is required only when a mapspec like -> out[i] is used, indicating that the shape cannot be derived from the inputs. In case there are multiple outputs, provide the shape for one of the outputs. This works because the shape of all outputs are required to be identical.

  • post_execution_hook (Callable[[PipeFunc, Any, dict[str, Any]], None] | None) – A callback function that is invoked after the function is executed. The callback signature is hook(func: PipeFunc, result: Any, kwargs: dict) -> None. This hook can be used for logging, visualization of intermediate results, debugging, statistics collection, or other side effects. The hook is executed synchronously after the function returns but before the result is passed to the next function in the pipeline. Keep the hook lightweight to avoid impacting performance.

  • resources (dict | Resources | Callable[[dict[str, Any]], Resources | dict[str, Any]] | None) – A dictionary or Resources instance containing the resources required for the function. This can be used to specify the number of CPUs, GPUs, memory, wall time, queue, partition, and any extra job scheduler arguments. This is not used by the pipefunc directly but can be used by job schedulers to manage the resources required for the function. Alternatively, provide a callable that receives a dict with the input values and returns a Resources instance.

  • resources_variable (str | None) – If provided, the resources will be passed as the specified argument name to the function. This requires that the function has a parameter with the same name. For example, if resources_variable="resources", the function will be called as func(..., resources=Resources(...)). This is useful when the function handles internal parallelization.

  • resources_scope (Literal['map', 'element']) –

    Determines how resources are allocated in relation to the mapspec:

    • ”map”: Allocate resources for the entire mapspec operation (default).

    • ”element”: Allocate resources for each element in the mapspec.

    If no mapspec is defined, this parameter is ignored.

  • scope (str | None) –

    If provided, all parameter names and output names of the function will be prefixed with the specified scope followed by a dot ('.'), e.g., parameter x with scope foo becomes foo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use the PipeFunc.update_scope method.

    When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a PipeFunc instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: func(foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or func(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

  • variant (str | dict[str | None, str] | None) –

    Identifies this function as an alternative implementation in a VariantPipeline and specifies which variant groups it belongs to. When multiple functions share the same output_name, variants allow selecting which implementation to use during pipeline execution.

    Can be specified in two formats: - A string (e.g., "fast"): Places the function in the default unnamed

    group (None) with the specified variant name. Equivalent to {None: "fast"}.

    • A dictionary (e.g., {"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.

    Functions with the same output_name but different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have β€œpreprocessing” variants (β€œv1”/”v2”) independent from β€œcomputation” variants (β€œfast”/”accurate”), allowing you to select specific combinations like {"preprocessing": "v1", "computation": "fast"}.

  • variant_group (str | None) – DEPRECATED in v0.58.0: Use variant instead.

Return type:

A PipeFunc instance that wraps the original function with the specified return identifier.

error_snapshot#

If an error occurs while calling the function, this attribute will contain an ErrorSnapshot instance with information about the error.

Examples

>>> def add_one(a, b):
...     return a + 1, b + 1
>>> add_one_func = PipeFunc(
...     add_one,
...     output_name="c",
...     renames={"a": "x", "b": "y"},
... )
>>> add_one_func(x=1, y=2)
(2, 3)
>>> add_one_func.update_defaults({"x": 1, "y": 1})
>>> add_one_func()
(2, 2)
property bound: dict[str, Any]#

Return the bound arguments for the function. These are arguments that are fixed.

See also

update_bound

Update the bound parameters via this method.

copy(**update)[source]#

Create a copy of the PipeFunc instance, optionally updating the attributes.

Return type:

PipeFunc

property defaults: dict[str, Any][source]#

Return the defaults for the function arguments.

Return type:

A dictionary of default values for the keyword arguments.

See also

update_defaults

Update the defaults via this method.

property original_parameters: dict[str, Parameter]#

Return the original (before renames) parameters of the wrapped function.

Returns:

  • A mapping of the original parameters of the wrapped function to their

  • respective inspect.Parameter objects.

property output_annotation: dict[str, Any][source]#

Return the type annotation of the wrapped function’s output.

property output_name: str | tuple[str, ...][source]#

Return the output name(s) of the wrapped function.

Return type:

The output name(s) of the wrapped function.

property output_picker: Callable[[Any, str], Any] | None[source]#

Return the output picker function for the wrapped function.

The output picker function takes the output of the wrapped function as first argument and the output_name (str) as second argument, and returns the desired output.

property parameter_annotations: dict[str, Any][source]#

Return the type annotations of the wrapped function’s parameters.

property parameter_scopes: set[str][source]#

Return the scopes of the function parameters.

These are constructed from the parameter names that contain a dot. So if the parameter is foo.bar, the scope is foo.

property parameters: tuple[str, ...][source]#
property profile: bool#

Return whether profiling is enabled for the wrapped function.

property renames: dict[str, str]#

Return the renames for the function arguments and output name.

See also

update_renames

Update the renames via this method.

property requires_mapping: bool[source]#
property unscoped_parameters: tuple[str, ...][source]#

Return the parameters with the scope stripped off.

update_bound(bound, *, overwrite=False)[source]#

Update the bound arguments for the function that are fixed.

Parameters:
  • bound (dict[str, Any]) – A dictionary of bound arguments for the function.

  • overwrite (bool) – Whether to overwrite the existing bound arguments. If False, the new bound arguments will be added to the existing bound arguments.

Return type:

None

update_defaults(defaults, *, overwrite=False)[source]#

Update defaults to the provided keyword arguments.

Parameters:
  • defaults (dict[str, Any]) – A dictionary of default values for the keyword arguments.

  • overwrite (bool) – Whether to overwrite the existing defaults. If False, the new defaults will be added to the existing defaults.

Return type:

None

update_mapspec_axes(renames)[source]#

Update the MapSpec by renaming axes.

This method renames axes in the MapSpec while preserving the structure of the array specifications. It uses the MapSpec.rename_axes() method to perform type-safe axis renaming.

Parameters:

renames (dict[str, str]) – Dictionary mapping old axis names to new axis names.

Return type:

None

Examples

>>> @pipefunc(output_name="c", mapspec="a[i, j], b[i, j] -> c[i, j]")
... def f(a, b):
...     return a + b
>>> f.update_mapspec_axes({"i": "x", "j": "y"})
>>> str(f.mapspec)
'a[x, y], b[x, y] -> c[x, y]'
update_renames(renames, *, update_from='current', overwrite=False)[source]#

Update renames to function arguments and output_name for the wrapped function.

When renaming the output_name and if it is a tuple of strings, the renames must be provided as individual strings in the tuple.

Parameters:
  • renames (dict[str, str]) – A dictionary of renames for the function arguments or output_name.

  • update_from (Literal['current', 'original']) – Whether to update the renames from the "current" parameter names (PipeFunc.parameters) or from the "original" parameter names as in the function signature (PipeFunc.original_parameters). If also updating the output_name, original means the name that was provided to the PipeFunc instance.

  • overwrite (bool) – Whether to overwrite the existing renames. If False, the new renames will be added to the existing renames.

Return type:

None

update_scope(scope, inputs=None, outputs=None, exclude=None)[source]#

Update the scope for the PipeFunc by adding (or removing) a prefix to the input and output names.

This method updates the names of the specified inputs and outputs by adding the provided scope as a prefix. The scope is added to the names using the format f”{scope}.{name}”. If an input or output name already starts with the scope prefix, it remains unchanged. If there is an existing scope, it is replaced with the new scope.

Internally, simply calls PipeFunc.update_renames with renames={name: f"{scope}.{name}", ...}.

When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a PipeFunc instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: func(foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or func(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

Parameters:
  • scope (str | None) – The scope to set for the inputs and outputs. If None, the scope of inputs and outputs is removed.

  • inputs (set[str] | Literal['*'] | None) – Specific input names to include, or β€œ*” to include all inputs. If None, no inputs are included.

  • outputs (set[str] | Literal['*'] | None) – Specific output names to include, or β€œ*” to include all outputs. If None, no outputs are included.

  • exclude (set[str] | None) – Names to exclude from the scope. This can include both inputs and outputs. Can be used with inputs or outputs being β€œ*” to exclude specific names.

Return type:

None

Examples

>>> f.update_scope("my_scope", inputs="*", outputs="*")  # Add scope to all inputs and outputs
>>> f.update_scope("my_scope", "*", "*", exclude={"output1"}) # Add to all except "output1"
>>> f.update_scope("my_scope", inputs="*", outputs={"output2"})  # Add scope to all inputs and "output2"
>>> f.update_scope(None, inputs="*", outputs="*")  # Remove scope from all inputs and outputs
class pipefunc.Pipeline(functions, *, lazy=False, debug=None, print_error=None, profile=None, cache_type=None, cache_kwargs=None, validate_type_annotations=True, scope=None, default_resources=None, name=None, description=None)[source]#

Bases: object

Pipeline class for managing and executing a sequence of functions.

Parameters:
  • functions (list[PipeFunc | tuple[PipeFunc, str | MapSpec]]) – A list of functions that form the pipeline. Note that the functions are copied when added to the pipeline using PipeFunc.copy.

  • lazy (bool) – Flag indicating whether the pipeline should be lazy.

  • debug (bool | None) – Flag indicating whether debug information should be printed. If None, the value of each PipeFunc’s debug attribute is used.

  • print_error (bool | None) – Flag indicating whether errors raised during the function execution should be printed. If None, the value of each PipeFunc’s print_error attribute is used.

  • profile (bool | None) – Flag indicating whether profiling information should be collected. If None, the value of each PipeFunc’s profile attribute is used. Profiling is only available for sequential execution.

  • cache_type (Literal['lru', 'hybrid', 'disk', 'simple'] | None) – The type of cache to use. See the notes below for more important information.

  • cache_kwargs (dict[str, Any] | None) – Keyword arguments passed to the cache constructor.

  • validate_type_annotations (bool) – Flag indicating whether type validation should be performed. If True, the type annotations of the functions are validated during the pipeline initialization. If False, the type annotations are not validated.

  • scope (str | None) –

    If provided, all parameter names and output names of the pipeline functions will be prefixed with the specified scope followed by a dot ('.'), e.g., parameter x with scope foo becomes foo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use the Pipeline.update_scope method.

    When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a Pipeline instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or pipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

  • default_resources (dict[str, Any] | Resources | None) – Default resources to use for the pipeline functions. If None, the resources are not set. Either a dict or a pipefunc.resources.Resources instance can be provided. If provided, the resources in the PipeFunc instances are updated with the default resources.

  • name (str | None) – A name for the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.

  • description (str | None) – A description of the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.

Notes

Important note about caching: The caching behavior differs between pipeline.map and pipeline.run / pipeline(...).

  1. For pipeline.run and pipeline(...) (β€œcalling the pipeline as a function”):

  • The cache key is computed based solely on the root arguments provided to the pipeline.

  • Only the root arguments need to be hashable.

  • The root arguments uniquely determine the output across the entire pipeline, allowing caching to be simple and effective when computing the final result.

  1. For pipeline.map:

  • The cache key is computed based on the input values of each PipeFunc.

  • So a PipeFunc with cache=True must have hashable input values.

  • When using pipeline.map(..., parallel=True), the cache itself will be serialized, so one must use a cache that supports shared memory, such as LRUCache with shared=True or uses a disk cache like DiskCache.

For both methods:

  • The pipefunc.cache.to_hashable function is used to attempt to ensure that input values are hashable, which is a requirement for storing results in a cache.

  • This function works for many common types but is not guaranteed to work for all types.

  • If to_hashable cannot make a value hashable, it falls back to using the serialized representation of the value.

The key difference is that pipeline.run’s output is uniquely determined by the root arguments, while pipeline.map is not because it may contain reduction operations as described by MapSpec.

add(f, mapspec=None)[source]#

Add a function to the pipeline.

Always creates a copy of the PipeFunc instance to avoid side effects.

Parameters:
  • f (PipeFunc | Callable) – The function to add to the pipeline.

  • profile – Flag indicating whether profiling information should be collected.

  • mapspec (str | MapSpec | None) – This is a specification for mapping that dictates how input values should be merged together. If None, the default behavior is that the input directly maps to the output.

Return type:

PipeFunc

add_mapspec_axis(*parameter, axis)[source]#

Add a new axis to parameter’s MapSpec.

Parameters:
  • parameter (str) – The parameter to add an axis to.

  • axis (str) – The axis to add to the MapSpec of all functions that depends on parameter. Provide a new axis name to add a new axis or an existing axis name to zip the parameter with the existing axis. Can be a comma-separated string to add multiple axes at once.

Return type:

None

property all_arg_combinations: dict[str | tuple[str, ...], set[tuple[str, ...]]][source]#

Compute all possible argument mappings for the pipeline.

Returns:

  • A dictionary mapping function names to sets of tuples containing

  • possible argument combinations.

property all_output_names: set[str][source]#
property all_root_args: dict[str | tuple[str, ...], tuple[str, ...]][source]#

Return the root arguments required to compute all outputs.

arg_combinations(output_name)[source]#

Return the arguments required to compute a specific output.

Parameters:

output_name (str | tuple[str, ...]) – The identifier for the return value of the pipeline.

Return type:

set[tuple[str, ...]]

Returns:

A set of tuples containing possible argument combinations. The tuples are sorted in lexicographical order.

cli(description=None)[source]#

Automatically construct a command-line interface using argparse.

This method creates an argparse.ArgumentParser instance, adds arguments for each root parameter in the pipeline using a Pydantic model, sets default values if they exist, parses the command-line arguments, and runs one of three subcommands:

  • cli: Supply individual input parameters as command-line options.

  • json: Load all input parameters from a JSON file.

  • docs: Display the pipeline documentation (using pipeline.print_documentation).

Mapping options (prefixed with –map-) are available for the cli and json subcommands to control parallel execution, storage method, and cleanup behavior.

Usage Examples:

CLI mode:

python cli-example.py cli --x 2 --y 3 --map-parallel false --map-cleanup true

JSON mode:

python cli-example.py json --json-file inputs.json --map-parallel false --map-cleanup true

Docs mode:

python cli-example.py docs

Parameters:
  • pipeline – The PipeFunc pipeline instance to be executed.

  • description (str | None) – A custom description for the CLI help message. If not provided, a default description is used.

Raises:
Return type:

None

Examples

>>> if __name__ == "__main__":
...     pipeline = create_my_pipeline()
...     pipeline.cli()

See also

pydantic_model

Generate a Pydantic model for pipeline root input parameters.

print_documentation

Print the pipeline documentation as a table formatted with Rich.

copy(**update)[source]#

Return a copy of the pipeline.

Parameters:

update (Any) – Keyword arguments passed to the Pipeline constructor instead of the original values.

Return type:

Pipeline

property debug: bool | None#

Flag indicating whether debug information should be printed.

property defaults: dict[str, Any][source]#
drop(*, f=None, output_name=None)[source]#

Drop a function from the pipeline.

Parameters:
  • f (PipeFunc | None) – The function to drop from the pipeline.

  • output_name (str | tuple[str, ...] | None) – The name of the output to drop from the pipeline.

Return type:

None

property error_snapshot: ErrorSnapshot | None#

Return an error snapshot for the pipeline.

This value is None if no errors have occurred during the pipeline execution.

func(output_name)[source]#

Create a composed function that can be called with keyword arguments.

Parameters:

output_name (str | tuple[str, ...] | list[str | tuple[str, ...]]) – The identifier for the return value of the composed function.

Return type:

_PipelineAsFunc

Returns:

The composed function that can be called with keyword arguments.

func_dependencies(output_name)[source]#

Return the functions required to compute a specific output.

See also

func_dependents

Return type:

list[str | tuple[str, ...]]

func_dependents(name)[source]#

Return the functions that depend on a specific input/output.

Return type:

list[str | tuple[str, ...]]

property graph: DiGraph[source]#

Create a directed graph representing the pipeline.

Returns:

  • A directed graph with nodes representing functions and edges

  • representing dependencies between functions.

independent_axes_in_mapspecs(output_name)[source]#

Return the axes that are both in the output and in the root arguments.

Identifies axes that are cross-products and can be computed independently.

Return type:

set[str]

info(*, print_table=False)[source]#

Return information about inputs and outputs of the Pipeline.

Parameters:

print_table (bool) – Whether to print a rich-formatted table to the console. Requires the rich package.

Returns:

If print_table is False, returns a dictionary containing information about the inputs and outputs of the Pipeline, with the following keys:

  • inputs: The input arguments of the Pipeline.

  • outputs: The output arguments of the Pipeline.

  • intermediate_outputs: The intermediate output arguments of the Pipeline.

  • required_inputs: The required input arguments of the Pipeline.

  • optional_inputs: The optional input arguments of the Pipeline (see Pipeline.defaults).

If print_table is True, prints a rich-formatted table to the console and returns None.

Return type:

dict or None

See also

defaults

A dictionary with input name to default value mappings.

leaf_nodes

The leaf nodes of the pipeline as PipeFunc objects.

root_args

The root arguments (inputs) required to compute the output of the pipeline.

print_documentation

Print formatted documentation of the pipeline to the console.

join(*pipelines)[source]#

Join multiple pipelines into a single new pipeline.

The new pipeline has no default_resources set, instead, each function has a Resources attribute that is created via Resources.maybe_with_defaults(f.resources, pipeline.default_resources).

Parameters:

pipelines (Pipeline | PipeFunc) – The pipelines to join. Can also be individual PipeFunc instances.

Return type:

Pipeline

Returns:

A new pipeline containing all functions from the original pipelines.

property leaf_nodes: list[PipeFunc][source]#

Return the leaf nodes in the pipeline’s execution graph.

map(inputs, run_folder=None, internal_shapes=None, *, output_names=None, parallel=True, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise', scheduling_strategy='generation')[source]#

Run a pipeline with MapSpec functions for given inputs.

Parameters:
  • inputs (dict[str, Any] | BaseModel) – The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions without mapspec or lists of values or numpy.ndarray`s for functions with ``mapspec`.

  • run_folder (str | Path | None) – The folder to store the run information. If None, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) – The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or β€œ?” for unknown dimensions. The internal_shape can also be provided via the PipeFunc(..., internal_shape=...) argument. If a PipeFunc has an internal_shape argument and it is provided here, the provided value is used.

  • output_names (set[str | tuple[str, ...]] | None) – The output(s) to calculate. If None, the entire pipeline is run and all outputs are computed.

  • parallel (bool) – Whether to run the functions in parallel. Is ignored if provided executor is not None.

  • executor (Executor | dict[str | tuple[str, ...], Executor] | None) –

    The executor to use for parallel execution. Can be specified as:

    1. None: A concurrent.futures.ProcessPoolExecutor is used (only if parallel=True).

    2. A concurrent.futures.Executor instance: Used for all outputs.

    3. A dictionary: Specify different executors for different outputs.

      • Use output names as keys and Executor instances as values.

      • Use an empty string "" as a key to set a default executor.

    If parallel is False, this argument is ignored.

  • chunksizes (int | dict[str | tuple[str, ...], int | Callable[[int], int] | None] | None) –

    Controls batching of MapSpec computations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:

    • None: Automatically determine optimal chunk sizes (default)

    • int: Same chunk size for all outputs

    • dict: Different chunk sizes per output where:
      • Keys are output names (or "" for default)

      • Values are either integers or callables

      • Callables take total execution count and return chunk size

    Examples:

    >>> chunksizes = None  # Auto-determine optimal chunk sizes
    >>> chunksizes = 100  # All outputs use chunks of 100
    >>> chunksizes = {"out1": 50, "out2": 100}  # Different sizes per output
    >>> chunksizes = {"": 50, "out1": lambda n: n // 20}  # Default and dynamic
    

  • storage (str | Literal['file_array', 'dict', 'shared_memory_dict'] | dict[str | tuple[str, ...], Literal['file_array', 'dict', 'shared_memory_dict'] | str] | None) –

    The storage class to use for storing intermediate and final results. Can be specified as:

    1. A string: Use a single storage class for all outputs.

    2. A dictionary: Specify different storage classes for different outputs.

      • Use output names as keys and storage class names as values.

      • Use an empty string "" as a key to set a default storage class.

    Available storage classes are registered in pipefunc.map.storage_registry. Common options include "file_array", "dict", and "shared_memory_dict". Defaults to "file_array" if run_folder is provided, otherwise "dict".

  • persist_memory (bool) – Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.

  • cleanup (bool | None) –

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) –

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

  • resume_validation (Literal['auto', 'strict', 'skip']) –

    Controls validation strictness when reusing data from a previous run (only applies when resume=True):

    • "auto" (default): Validate that inputs/defaults match the previous run. If equality comparison fails (returns None), warn but proceed anyway.

    • "strict": Validate that inputs/defaults match. Raise an error if equality comparison fails.

    • "skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.

    Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when resume=False.

  • fixed_indices (dict[str, int | slice] | None) – A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.

  • auto_subpipeline (bool) – If True, a subpipeline is created with the specified inputs, using Pipeline.subpipeline. This allows to provide intermediate results in the inputs instead of providing the root arguments. If False, all root arguments must be provided, and an exception is raised if any are missing.

  • show_progress (bool | Literal['rich', 'ipywidgets', 'headless'] | None) –

    Whether to display a progress bar. Can be:

    • True: Display a progress bar. Auto-selects based on environment: ipywidgets in Jupyter (if installed), otherwise rich (if installed).

    • False: No progress bar.

    • "ipywidgets": Force ipywidgets progress bar (HTML-based). Shown only if in a Jupyter notebook and ipywidgets is installed.

    • "rich": Force rich progress bar (text-based). Shown only if rich is installed.

    • "headless": No progress bar, but the progress is still tracked internally.

    • None (default): Shows ipywidgets progress bar only if running in a Jupyter notebook and ipywidgets is installed. Otherwise, no progress bar is shown.

  • return_results (bool) – Whether to return the results of the pipeline. If False, the pipeline is run without keeping the results in memory. Instead the results are only kept in the set storage. This is useful for very large pipelines where the results do not fit into memory.

  • error_handling (Literal['raise', 'continue']) –

    How to handle errors during function execution:

    • "raise" (default): Stop execution on first error and raise exception

    • "continue": Continue execution, collecting errors as ErrorSnapshot objects

  • scheduling_strategy (Literal['generation', 'eager']) –

    Strategy for scheduling pipeline function execution:

    • ”generation” (default): Executes functions in strict topological generations, waiting for all functions in a generation to complete before starting the next. Provides predictable execution order but may not maximize parallelism.

    • ”eager”: Dynamically schedules functions as soon as their dependencies are met, without waiting for entire generations to complete. Can improve performance by maximizing parallel execution, especially for complex dependency graphs with varied execution times.

See also

map_async

The asynchronous version of this method.

Return type:

ResultDict

Returns:

A ResultDict containing the results of the pipeline. The values are of type Result, use Result.output to get the actual result.

map_async(inputs, run_folder=None, internal_shapes=None, *, output_names=None, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, display_widgets=True, return_results=True, scheduling_strategy='generation', error_handling='raise', start=True)[source]#

Asynchronously run a pipeline with MapSpec functions for given inputs.

Returns immediately with an AsyncRun instance with a task attribute that can be awaited.

Parameters:
  • inputs (dict[str, Any] | BaseModel) – The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions without mapspec or lists of values or numpy.ndarray`s for functions with ``mapspec`.

  • run_folder (str | Path | None) – The folder to store the run information. If None, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) – The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or β€œ?” for unknown dimensions. The internal_shape can also be provided via the PipeFunc(..., internal_shape=...) argument. If a PipeFunc has an internal_shape argument and it is provided here, the provided value is used.

  • output_names (set[str | tuple[str, ...]] | None) – The output(s) to calculate. If None, the entire pipeline is run and all outputs are computed.

  • executor (Executor | dict[str | tuple[str, ...], Executor] | None) –

    The executor to use for parallel execution. Can be specified as:

    1. None: A concurrent.futures.ProcessPoolExecutor is used (only if parallel=True).

    2. A concurrent.futures.Executor instance: Used for all outputs.

    3. A dictionary: Specify different executors for different outputs.

      • Use output names as keys and Executor instances as values.

      • Use an empty string "" as a key to set a default executor.

  • chunksizes (int | dict[str | tuple[str, ...], int | Callable[[int], int] | None] | None) –

    Controls batching of MapSpec computations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:

    • None: Automatically determine optimal chunk sizes (default)

    • int: Same chunk size for all outputs

    • dict: Different chunk sizes per output where:
      • Keys are output names (or "" for default)

      • Values are either integers or callables

      • Callables take total execution count and return chunk size

    Examples:

    >>> chunksizes = None  # Auto-determine optimal chunk sizes
    >>> chunksizes = 100  # All outputs use chunks of 100
    >>> chunksizes = {"out1": 50, "out2": 100}  # Different sizes per output
    >>> chunksizes = {"": 50, "out1": lambda n: n // 20}  # Default and dynamic
    

  • storage (str | Literal['file_array', 'dict', 'shared_memory_dict'] | dict[str | tuple[str, ...], Literal['file_array', 'dict', 'shared_memory_dict'] | str] | None) –

    The storage class to use for storing intermediate and final results. Can be specified as:

    1. A string: Use a single storage class for all outputs.

    2. A dictionary: Specify different storage classes for different outputs.

      • Use output names as keys and storage class names as values.

      • Use an empty string "" as a key to set a default storage class.

    Available storage classes are registered in pipefunc.map.storage_registry. Common options include "file_array", "dict", and "shared_memory_dict". Defaults to "file_array" if run_folder is provided, otherwise "dict".

  • persist_memory (bool) – Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.

  • cleanup (bool | None) –

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) –

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

  • resume_validation (Literal['auto', 'strict', 'skip']) –

    Controls validation strictness when reusing data from a previous run (only applies when resume=True):

    • "auto" (default): Validate that inputs/defaults match the previous run. If equality comparison fails (returns None), warn but proceed anyway.

    • "strict": Validate that inputs/defaults match. Raise an error if equality comparison fails.

    • "skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.

    Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when resume=False.

  • fixed_indices (dict[str, int | slice] | None) – A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.

  • auto_subpipeline (bool) – If True, a subpipeline is created with the specified inputs, using Pipeline.subpipeline. This allows to provide intermediate results in the inputs instead of providing the root arguments. If False, all root arguments must be provided, and an exception is raised if any are missing.

  • show_progress (bool | Literal['rich', 'ipywidgets', 'headless'] | None) –

    Whether to display a progress bar. Can be:

    • True: Display a progress bar. Auto-selects based on environment: ipywidgets in Jupyter (if installed), otherwise rich (if installed).

    • False: No progress bar.

    • "ipywidgets": Force ipywidgets progress bar (HTML-based). Shown only if in a Jupyter notebook and ipywidgets is installed.

    • "rich": Force rich progress bar (text-based). Shown only if rich is installed.

    • "headless": No progress bar, but the progress is still tracked internally.

    • None (default): Shows ipywidgets progress bar only if running in a Jupyter notebook and ipywidgets is installed. Otherwise, no progress bar is shown.

  • display_widgets (bool) – Whether to call IPython.display.display(...) on widgets. Ignored if outside of a Jupyter notebook.

  • return_results (bool) – Whether to return the results of the pipeline. If False, the pipeline is run without keeping the results in memory. Instead the results are only kept in the set storage. This is useful for very large pipelines where the results do not fit into memory.

  • scheduling_strategy (Literal['generation', 'eager']) –

    Strategy for scheduling pipeline function execution:

    • ”generation” (default): Executes functions in strict topological generations, waiting for all functions in a generation to complete before starting the next. Provides predictable execution order but may not maximize parallelism.

    • ”eager”: Dynamically schedules functions as soon as their dependencies are met, without waiting for entire generations to complete. Can improve performance by maximizing parallel execution, especially for complex dependency graphs with varied execution times.

  • error_handling (Literal['raise', 'continue']) –

    How to handle errors during function execution:

    • "raise" (default): Stop execution on first error and raise exception

    • "continue": Continue execution, collecting errors as ErrorSnapshot objects

  • start (bool) – Whether to start the pipeline immediately. If False, the pipeline is not started until the start() method on the AsyncMap instance is called.

See also

map

The synchronous version of this method.

Return type:

AsyncMap

Returns:

An AsyncRun instance that contains run_info, progress and task. The task can be awaited to get the final result of the pipeline.

property mapspec_axes: dict[str, tuple[str, ...]][source]#

Return the axes for each array parameter in the pipeline.

property mapspec_dimensions: dict[str, int][source]#

Return the number of dimensions for each array parameter in the pipeline.

property mapspec_names: set[str][source]#
mapspecs(*, ordered=True)[source]#

Return the MapSpecs for all functions in the pipeline.

Return type:

list[MapSpec]

property mapspecs_as_strings: list[str][source]#

Return the MapSpecs for all functions in the pipeline as strings.

nest_funcs(output_names, new_output_name=None, function_name=None)[source]#

Replaces a set of output names with a single nested function inplace.

Parameters:
  • output_names (set[str | tuple[str, ...]] | Literal['*']) – The output names to nest in a NestedPipeFunc. Can also be "*" to nest all functions in the pipeline into a single NestedPipeFunc.

  • new_output_name (str | tuple[str, ...] | None) – The identifier for the output of the wrapped function. If None, it is automatically constructed from all the output names of the PipeFunc instances. Must be a subset of the output names of the PipeFunc instances.

  • function_name (str | None) – The name of the nested function, if None the name will be set to "NestedPipeFunc_{output_name[0]}_{output_name[...]}".

Return type:

NestedPipeFunc

Returns:

The newly added NestedPipeFunc instance.

property node_mapping: dict[str | tuple[str, ...], PipeFunc | str][source]#

Return a mapping from node names to nodes.

Return type:

A mapping from node names to nodes.

property output_annotations: dict[str, Any][source]#

Return the (final and intermediate) output annotations for the pipeline.

property output_to_func: dict[str | tuple[str, ...], PipeFunc][source]#

Return a mapping from output names to functions.

The mapping includes functions with multiple outputs both as individual outputs and as tuples of outputs. For example, if a function has the output name ("a", "b"), the mapping will include both "a", "b", and ("a", "b") as keys.

See also

__getitem__

Shortcut for accessing the function corresponding to a specific output name.

property parameter_annotations: dict[str, Any][source]#

Return the parameter annotations for the pipeline.

The parameter annotations are computed by traversing the pipeline graph in topological order and collecting the annotations from the functions. If there are conflicting annotations for the same parameter, a warning is issued and the first encountered annotation is used.

print_documentation(*, borders=False, skip_optional=False, skip_intermediate=True, description_table=True, parameters_table=True, returns_table=True, order='topological')[source]#

Print the documentation for the pipeline as a table formatted with Rich.

Parameters:
  • borders (bool) – Whether to include borders in the tables.

  • skip_optional (bool) – Whether to skip optional parameters.

  • skip_intermediate (bool) – Whether to skip intermediate outputs and only show root parameters.

  • description_table (bool) – Whether to generate the function description table.

  • parameters_table (bool) – Whether to generate the function parameters table.

  • returns_table (bool) – Whether to generate the function returns table.

  • order (Literal['topological', 'alphabetical']) –

    The order in which to display the functions in the documentation. Options are:

    • topological: Display functions in topological order.

    • alphabetical: Display functions in alphabetical order (using output_name).

  • emojis – Whether to use emojis in the documentation.

Return type:

None

See also

info

Returns the input and output information for the pipeline.

property print_error: bool | None#

Flag indicating whether errors raised during the function execution should be printed.

print_profiling_stats()[source]#

Display the resource usage report for each function in the pipeline.

Return type:

None

property profile: bool | None#

Flag indicating whether profiling information should be collected.

property profiling_stats: dict[str, ProfilingStats]#

Return the profiling data for each function in the pipeline.

pydantic_model(model_name='InputModel')[source]#

Generate a Pydantic model for pipeline root input parameters.

Inspects the pipeline to extract defaults, type annotations, and docstrings to create a model that validates and coerces input data (e.g., from JSON) to the correct types. This is useful for ensuring that inputs meet the pipeline’s requirements and for generating a CLI.

Multidimensional Array Handling: Array inputs specified via mapspecs are annotated as nested lists because Pydantic cannot directly coerce JSON arrays into NumPy arrays. After validation, these lists are converted to NumPy ndarrays.

Parameters:

model_name (str) – Name for the generated Pydantic model class.

Returns:

A dynamically generated Pydantic model class for validating pipeline inputs. It:

  • Validates and coerces input data to the expected types.

  • Annotates multidimensional arrays as nested lists and converts them to NumPy arrays.

  • Facilitates CLI creation by ensuring proper input validation.

Return type:

type[pydantic.BaseModel]

Examples

>>> from pipefunc import Pipeline, pipefunc
>>> @pipefunc("foo")
... def foo(x: int, y: int = 1) -> int:
...     return x + y
>>> pipeline = Pipeline([foo])
>>> InputModel = pipeline.pydantic_model()
>>> inputs = {"x": "10", "y": "2"}
>>> model = InputModel(**inputs)
>>> model.x, model.y
(10, 2)
>>> results = pipeline.map(model)  # Equivalent to `pipeline.map(inputs)`

Notes

  • If available, detailed parameter descriptions are extracted from docstrings using griffe.

  • This method is especially useful for CLI generation, ensuring that user inputs are properly validated and converted before pipeline execution.

See also

cli

Automatically construct a command-line interface using argparse.

print_documentation

Print the pipeline documentation as a table formatted with Rich.

replace(new, old=None)[source]#

Replace a function in the pipeline with another function.

Parameters:
  • new (PipeFunc) – The function to add to the pipeline.

  • old (PipeFunc | None) – The function to replace in the pipeline. If None, old is assumed to be the function with the same output name as new.

Return type:

None

root_args(output_name=None)[source]#

Return the root arguments required to compute a specific (or all) output(s).

Parameters:

output_name (str | tuple[str, ...] | None) – The identifier for the return value of the pipeline. If None, the root arguments for all outputs are returned.

Return type:

tuple[str, ...]

Returns:

A tuple containing the root arguments required to compute the output. The tuple is sorted in alphabetical order.

property root_nodes: list[PipeFunc][source]#

Return the root nodes in the pipeline’s execution graph.

run(output_name, *, full_output=False, kwargs, allow_unused=False)[source]#

Execute the pipeline for a specific return value.

Parameters:
  • output_name (str | tuple[str, ...] | list[str | tuple[str, ...]]) – The identifier for the return value of the pipeline. Can be a single output name or a list of output names.

  • full_output (bool) – Whether to return the outputs of all function executions as a dictionary mapping function names to their return values.

  • kwargs (dict[str, Any]) – Keyword arguments to be passed to the pipeline functions.

  • allow_unused (bool) – Whether to allow unused keyword arguments. If False, an error is raised if any keyword arguments are unused. If True, unused keyword arguments are ignored.

Return type:

Any

Returns:

A dictionary mapping function names to their return values if full_output is True. Otherwise, the return value is the return value of the pipeline function specified by output_name. If output_name is a list, the return value is a tuple of the return values of the pipeline functions.

property scopes: set[str][source]#
simplified_pipeline(output_name=None, *, conservatively_combine=False)[source]#

Simplify pipeline with combined function nodes.

Generate a simplified version of the pipeline where combinable function nodes have been merged into single function nodes.

This method identifies combinable nodes in the pipeline’s execution graph (i.e., functions that share the same root arguments) and merges them into single function nodes. This results in a simplified pipeline where each key function only depends on nodes that cannot be further combined.

Parameters:
  • output_name (str | tuple[str, ...] | None) – The name of the output from the pipeline function we are starting the simplification from. If None, the unique tip of the pipeline graph is used (if there is one).

  • conservatively_combine (bool) – If True, only combine a function node with its predecessors if all of its predecessors have the same root arguments as the function node itself. If False, combine a function node with its predecessors if any of its predecessors have the same root arguments as the function node.

Return type:

Pipeline

Returns:

The simplified version of the pipeline.

property sorted_functions: list[PipeFunc][source]#

Return the functions in the pipeline in topological order.

split_disconnected(**pipeline_kwargs)[source]#

Split disconnected components of the pipeline into separate pipelines.

Parameters:

pipeline_kwargs (Any) – Keyword arguments to pass to the Pipeline constructor.

Return type:

tuple[Pipeline, ...]

Returns:

Tuple of fully connected Pipeline objects.

subpipeline(inputs=None, output_names=None)[source]#

Create a new pipeline containing only the nodes between the specified inputs and outputs.

Parameters:
  • inputs (set[str] | None) – Set of input names to include in the subpipeline. If None, all root nodes of the original pipeline will be used as inputs.

  • output_names (set[str | tuple[str, ...]] | None) – Set of output names to include in the subpipeline. If None, all leaf nodes of the original pipeline will be used as outputs.

Return type:

Pipeline

Returns:

A new pipeline containing only the nodes and connections between the specified inputs and outputs.

Notes

The subpipeline is created by copying the original pipeline and then removing the nodes that are not part of the path from the specified inputs to the specified outputs. The resulting subpipeline will have the same behavior as the original pipeline for the selected inputs and outputs.

If inputs is provided, the subpipeline will use those nodes as the new root nodes. If output_names is provided, the subpipeline will use those nodes as the new leaf nodes.

Examples

>>> @pipefunc(output_name="y", mapspec="x[i] -> y[i]")
... def f(x: int) -> int:
...     return x
...
>>> @pipefunc(output_name="z")
... def g(y: np.ndarray) -> int:
...     return sum(y)
...
>>> pipeline = Pipeline([f, g])
>>> inputs = {"x": [1, 2, 3]}
>>> results = pipeline.map(inputs, "tmp_path")
>>> partial = pipeline.subpipeline({"y"})
>>> r = partial.map({"y": results["y"].output}, "tmp_path")
>>> assert len(r) == 1
>>> assert r["z"].output == 6
property topological_generations: Generations[source]#

Return the functions in the pipeline grouped by topological generation.

This method uses networkx.topological_generations on the pipeline graph to group functions by their dependency order. The result includes:

  • Root arguments: Initial inputs to the pipeline.

  • Function generations: Subsequent groups of functions in topological order.

Nullary functions (those without parameters) are handled specially to ensure they’re included in the generations rather than treated as root arguments.

property unique_leaf_node: PipeFunc[source]#

Return the unique leaf node of the pipeline graph.

update_defaults(defaults, *, overwrite=False)[source]#

Update defaults to the provided keyword arguments.

Automatically traverses the pipeline graph to find all functions that the defaults can be applied to.

If overwrite is False, the new defaults will be added to the existing defaults. If overwrite is True, the existing defaults will be replaced with the new defaults.

Parameters:
  • defaults (dict[str, Any | dict[str, Any]]) – A dictionary of default values for the keyword arguments.

  • overwrite (bool) – Whether to overwrite the existing defaults. If False, the new defaults will be added to the existing defaults.

Return type:

None

update_mapspec_axes(renames)[source]#

Update the axes in the `MapSpec`s for the pipeline.

Parameters:

renames (dict[str, str]) – A dictionary mapping old axis names to new axis names.

Return type:

None

update_renames(renames, *, update_from='current', overwrite=False)[source]#

Update the renames for the pipeline.

Automatically traverses the pipeline graph to find all functions that the renames can be applied to.

Parameters:
  • renames (dict[str, str]) – A dictionary mapping old parameter names to new parameter and output names.

  • update_from (Literal['current', 'original']) – Whether to update the renames from the current parameter names (PipeFunc.parameters) or from the original parameter names (PipeFunc.original_parameters).

  • overwrite (bool) – Whether to overwrite the existing renames. If False, the new renames will be added to the existing renames.

Return type:

None

update_scope(scope, inputs=None, outputs=None, exclude=None)[source]#

Update the scope for the pipeline by adding (or removing) a prefix to the input and output names.

This method updates the names of the specified inputs and outputs by adding the provided scope as a prefix. The scope is added to the names using the format f"{scope}.{name}". If an input or output name already starts with the scope prefix, it remains unchanged. If there is an existing scope, it is replaced with the new scope.

inputs are the root arguments of the pipeline. Inputs to functions which are outputs of other functions are considered to be outputs.

Internally, simply calls PipeFunc.update_renames with renames={name: f"{scope}.{name}", ...}.

When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a Pipeline instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or pipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

Parameters:
  • scope (str | None) – The scope to set for the inputs and outputs. If None, the scope of inputs and outputs is removed.

  • inputs (set[str] | Literal['*'] | None) – Specific input names to include, or "*" to include all inputs. The inputs are only the root arguments of the pipeline. If None, no inputs are included.

  • outputs (set[str] | Literal['*'] | None) – Specific output names to include, or "*" to include all outputs. If None, no outputs are included.

  • exclude (set[str] | None) – Names to exclude from the scope. Both inputs and outputs can be excluded. Can be used with inputs or outputs being "*" to exclude specific names.

Raises:

ValueError – If no function’s scope was updated, e.g., when both inputs=None and outputs=None.

Return type:

None

Examples

>>> pipeline.update_scope("my_scope", inputs="*", outputs="*")  # Add scope to all inputs and outputs
>>> pipeline.update_scope("my_scope", "*", "*", exclude={"output1"}) # Add to all except "output1"
>>> pipeline.update_scope("my_scope", inputs="*", outputs={"output2"})  # Add scope to all inputs and "output2"
>>> pipeline.update_scope(None, inputs="*", outputs="*")  # Remove scope from all inputs and outputs
validate()[source]#

Validate the pipeline (checks its scopes, renames, defaults, mapspec, type hints).

This is automatically called when the pipeline is created and when calling state updating methods like {method}`~Pipeline.update_renames` or {method}`~Pipeline.update_defaults`. Should be called manually after e.g., manually updating pipeline.validate_type_annotations or changing some other attributes.

Return type:

None

visualize(*, backend=None, **kwargs)[source]#

Visualize the pipeline as a directed graph.

If running in a Jupyter notebook and not in VS Code a widget-based backend will be used if available.

Parameters:
  • backend (Literal['matplotlib', 'graphviz', 'graphviz_widget', 'holoviews'] | None) – The plotting backend to use. If None, the best backend available will be used in the following order: Graphviz (widget), Graphviz, Matplotlib, and HoloViews.

  • kwargs (Any) – Additional keyword arguments passed to the plotting function.

Return type:

Any

Returns:

The output of the plotting function.

See also

visualize_graphviz

Create a directed graph using Graphviz (backend="graphviz").

visualize_graphviz_widget

Create a directed graph using Graphviz and ipywidgets (backend="graphviz_widget").

visualize_matplotlib

Create a directed graph using Matplotlib (backend="matplotlib").

visualize_holoviews

Create a directed graph using HoloViews (backend="holoviews").

visualize_graphviz(*, figsize=None, collapse_scopes=False, min_arg_group_size=None, hide_default_args=False, filename=None, style=None, orient='LR', graphviz_kwargs=None, show_legend=True, include_full_mapspec=False, return_type=None)[source]#

Visualize the pipeline as a directed graph using Graphviz.

Parameters:
  • figsize (tuple[int, int] | int | None) – The width and height of the figure in inches. If a single integer is provided, the figure will be a square. If None, the size will be determined automatically.

  • collapse_scopes (bool | Sequence[str]) – Whether to collapse scopes in the graph. If True, scopes are collapsed into a single node. If a sequence of scope names, only the specified scopes are collapsed.

  • min_arg_group_size (int | None) – Minimum number of parameters to combine into a single node. Only applies to parameters used exclusively by one PipeFunc. If None, no grouping is performed.

  • hide_default_args (bool) – Whether to hide default arguments in the graph.

  • filename (str | Path | None) – The filename to save the figure to, if provided.

  • style (GraphvizStyle | None) – Style for the graph visualization.

  • orient (Literal['TB', 'LR', 'BT', 'RL']) – Graph orientation: β€˜TB’, β€˜LR’, β€˜BT’, β€˜RL’.

  • graphviz_kwargs (dict[str, Any] | None) – Graphviz-specific keyword arguments for customizing the graph’s appearance.

  • show_legend (bool) – Whether to show the legend in the graph visualization.

  • include_full_mapspec (bool) – Whether to include the full mapspec as a separate line in the PipeFunc labels.

  • return_type (Literal['graphviz', 'html'] | None) – The format to return the visualization in. If 'html', the visualization is returned as a IPython.display.html, if 'graphviz', the graphviz.Digraph object is returned. If None, the format is 'html' if running in a Jupyter notebook, otherwise 'graphviz'.

Returns:

The resulting Graphviz Digraph object.

Return type:

graphviz.Digraph

visualize_graphviz_widget(*, orient='TB', graphviz_kwargs=None)[source]#

Create an interactive visualization of the pipeline as a directed graph.

Creates a widget that allows interactive exploration of the pipeline graph. The widget provides the following interactions:

  • Zoom: Use mouse scroll

  • Pan: Click and drag

  • Node selection: Click on nodes to highlight connected nodes

  • Multi-select: Shift-click on nodes to select multiple routes

  • Search: Use the search box to highlight matching nodes

  • Reset view: Press Escape

Requires the graphviz-anywidget package to be installed, which is maintained by the pipefunc authors, see pipefunc/graphviz-anywidget

Parameters:
  • orient (Literal['TB', 'LR', 'BT', 'RL']) – Graph orientation, controlling the main direction of the graph flow. Options are: - β€˜TB’: Top to bottom - β€˜LR’: Left to right - β€˜BT’: Bottom to top - β€˜RL’: Right to left

  • graphviz_kwargs (dict[str, Any] | None) – Graphviz-specific keyword arguments for customizing the graph’s appearance.

Returns:

Interactive widget containing the graph visualization.

Return type:

ipywidgets.VBox

visualize_holoviews(*, show=False)[source]#

Visualize the pipeline as a directed graph using HoloViews.

Parameters:

show (bool) – Whether to show the plot. Uses bokeh.plotting.show(holoviews.render(plot)). If False the holoviews.Graph object is returned.

Return type:

Graph | None

visualize_matplotlib(figsize=(10, 10), filename=None, *, color_combinable=False, conservatively_combine=False, output_name=None)[source]#

Visualize the pipeline as a directed graph.

Parameters:
  • figsize (tuple[int, int] | int) – The width and height of the figure in inches. If a single integer is provided, the figure will be a square.

  • filename (str | Path | None) – The filename to save the figure to.

  • color_combinable (bool) – Whether to color combinable nodes differently.

  • conservatively_combine (bool) – Argument as passed to Pipeline.simplify_pipeline.

  • output_name (str | tuple[str, ...] | None) – Argument as passed to Pipeline.simplify_pipeline.

Return type:

None

class pipefunc.VariantPipeline(functions, *, default_variant=None, lazy=False, debug=None, print_error=None, profile=None, cache_type=None, cache_kwargs=None, validate_type_annotations=True, scope=None, default_resources=None, name=None, description=None)[source]#

Bases: object

A pipeline container that supports multiple implementations (variants) of functions.

VariantPipeline allows you to define multiple implementations of functions and select which variant to use at runtime. This is particularly useful for:

  • A/B testing different implementations

  • Experimenting with algorithm variations

  • Managing multiple processing options

  • Creating configurable pipelines

The pipeline can have multiple variant groups, where each group contains alternative implementations of a function. Functions can be assigned to variant groups using the variant parameter which can be a single string (for the default group) or a dictionary mapping group names to variant names.

All parameters below (except functions and default_variant) are simply passed to the Pipeline constructor when creating a new pipeline with the selected variant(s) using the with_variant method.

Parameters:
  • functions (list[PipeFunc]) – List of PipeFunc instances.

  • default_variant (str | dict[str | None, str] | None) – Default variant to use if none is specified in with_variant. Either a single variant name or a dictionary mapping variant groups to variants.

  • lazy (bool) – Flag indicating whether the pipeline should be lazy.

  • debug (bool | None) – Flag indicating whether debug information should be printed. If None, the value of each PipeFunc’s debug attribute is used.

  • print_error (bool | None) – Flag indicating whether errors raised during the function execution should be printed. If None, the value of each PipeFunc’s print_error attribute is used.

  • profile (bool | None) – Flag indicating whether profiling information should be collected. If None, the value of each PipeFunc’s profile attribute is used. Profiling is only available for sequential execution.

  • cache_type (Literal['lru', 'hybrid', 'disk', 'simple'] | None) – The type of cache to use. See the notes below for more important information.

  • cache_kwargs (dict[str, Any] | None) – Keyword arguments passed to the cache constructor.

  • validate_type_annotations (bool) – Flag indicating whether type validation should be performed. If True, the type annotations of the functions are validated during the pipeline initialization. If False, the type annotations are not validated.

  • scope (str | None) –

    If provided, all parameter names and output names of the pipeline functions will be prefixed with the specified scope followed by a dot ('.'), e.g., parameter x with scope foo becomes foo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use the Pipeline.update_scope method.

    When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a Pipeline instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or pipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

  • default_resources (dict[str, Any] | None) – Default resources to use for the pipeline functions. If None, the resources are not set. Either a dict or a pipefunc.resources.Resources instance can be provided. If provided, the resources in the PipeFunc instances are updated with the default resources.

  • name (str | None) – A name for the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.

  • description (str | None) – A description of the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.

Examples

Simple variant selection:

>>> @pipefunc(output_name="c", variant="add")
... def f(a, b):
...     return a + b
...
>>> @pipefunc(output_name="c", variant="sub")
... def f_alt(a, b):
...     return a - b
...
>>> @pipefunc(output_name="d")
... def g(b, c):
...     return b * c
...
>>> pipeline = VariantPipeline([f, f_alt, g], default_variant="add")
>>> pipeline_add = pipeline.with_variant()  # Uses default variant
>>> pipeline_sub = pipeline.with_variant(select="sub")
>>> pipeline_add(a=2, b=3)  # (2 + 3) * 3 = 15
15
>>> pipeline_sub(a=2, b=3)  # (2 - 3) * 3 = -3
-3

Multiple variant groups:

>>> @pipefunc(output_name="c", variant={"method": "add"})
... def f1(a, b):
...     return a + b
...
>>> @pipefunc(output_name="c", variant={"method": "sub"})
... def f2(a, b):
...     return a - b
...
>>> @pipefunc(output_name="d", variant={"analysis": "mul"})
... def g1(b, c):
...     return b * c
...
>>> @pipefunc(output_name="d", variant={"analysis": "div"})
... def g2(b, c):
...     return b / c
...
>>> pipeline = VariantPipeline(
...     [f1, f2, g1, g2],
...     default_variant={"method": "add", "analysis": "mul"}
... )
>>> # Select specific variants for each group
>>> pipeline_sub_div = pipeline.with_variant(
...     select={"method": "sub", "analysis": "div"}
... )

Notes

  • Functions without variants can be included in the pipeline and will be used regardless of variant selection.

  • When using with_variant(), if all variants are resolved, a regular Pipeline is returned. If some variants remain unselected, another VariantPipeline is returned.

  • The default_variant can be a single string (if there’s only one variant group) or a dictionary mapping variant groups to their default variants.

  • Variants in the same group can have different output names, allowing for flexible pipeline structures.

See also

pipefunc.Pipeline

The base pipeline class.

pipefunc.PipeFunc

Function wrapper that supports variants.

copy(**kwargs)[source]#

Return a copy of the VariantPipeline.

Parameters:

kwargs (Any) – Keyword arguments passed to the VariantPipeline constructor instead of the original values.

Return type:

VariantPipeline

classmethod from_pipelines(*variant_pipeline)[source]#

Create a new VariantPipeline from multiple Pipeline instances.

This method constructs a VariantPipeline by combining functions from multiple Pipeline instances, identifying common functions and assigning variants based on the input tuples.

Each input tuple can either be a 2-tuple or a 3-tuple. - A 2-tuple contains: (variant_name, pipeline). - A 3-tuple contains: (variant_group, variant_name, pipeline).

Functions that are identical across all input pipelines (as determined by the is_identical_pipefunc function) are considered β€œcommon” and are added to the resulting VariantPipeline without any variant information.

Functions that are unique to a specific pipeline are added with their corresponding variant information (if provided in the input tuple).

Parameters:

*variant_pipeline (tuple[str, str, Pipeline] | tuple[str, Pipeline]) – Variable number of tuples, where each tuple represents a pipeline and its associated variant information. Each tuple can be either: - (variant_name, pipeline): Specifies the variant name for all functions in the pipeline. The variant group will be set to None (default group). - (variant_group, variant_name, pipeline): Specifies both the variant group and variant name for all functions in the pipeline.

Return type:

VariantPipeline

Returns:

A new VariantPipeline instance containing the combined functions from the input pipelines, with appropriate variant assignments.

Examples

>>> @pipefunc(output_name="x")
... def f(a, b):
...     return a + b
...
>>> @pipefunc(output_name="y")
... def g(x, c):
...     return x * c
...
>>> pipeline1 = Pipeline([f, g])
>>> pipeline2 = Pipeline([f, g.copy(func=lambda x, c: x / c)])
>>> variant_pipeline = VariantPipeline.from_pipelines(
...     ("add_mul", pipeline1),
...     ("add_div", pipeline2)
... )
>>> add_mul_pipeline = variant_pipeline.with_variant(select="add_mul")
>>> add_div_pipeline = variant_pipeline.with_variant(select="add_div")
>>> add_mul_pipeline(a=1, b=2, c=3)  # (1 + 2) * 3 = 9
9
>>> add_div_pipeline(a=1, b=2, c=3)  # (1 + 2) / 3 = 1.0
1.0

Notes

  • The is_identical_pipefunc function is used to determine if two PipeFunc instances are identical.

  • If multiple pipelines contain the same function but with different variant information, the function will be included multiple times in the resulting VariantPipeline, each with its respective variant assignment.

variants_mapping()[source]#

Return a dictionary of variant groups and their variants.

Return type:

dict[str | None, set[str]]

visualize(**kwargs)[source]#

Visualize the VariantPipeline with interactive variant selection.

Parameters:

kwargs (Any) – Additional keyword arguments passed to the pipefunc.Pipeline.visualize method.

Return type:

Any

Returns:

The output of the widget.

with_variant(select=None, **kwargs)[source]#

Create a new Pipeline or VariantPipeline with the specified variant selected.

Parameters:
  • select (str | dict[str | None, str] | None) – Name of the variant to select. If not provided, default_variant is used. If select is a string, it selects a single variant if no ambiguity exists. If select is a dictionary, it selects a variant for each variant group, where the keys are variant group names and the values are variant names. If a partial dictionary is provided (not covering all variant groups) and default_variant is a dictionary, it will merge the defaults with the selection.

  • kwargs (Any) – Keyword arguments for changing the parameters for a Pipeline or VariantPipeline.

Return type:

Pipeline | VariantPipeline

Returns:

A new Pipeline or VariantPipeline with the selected variant(s). If variants remain, a VariantPipeline is returned. If no variants remain, a Pipeline is returned.

Raises:
  • ValueError – If the specified variant is ambiguous or unknown, or if an invalid variant type is provided.

  • TypeError – If select is not a string or a dictionary.

pipefunc.pipefunc(output_name, *, output_picker=None, renames=None, defaults=None, bound=None, profile=False, debug=False, print_error=True, cache=False, mapspec=None, internal_shape=None, post_execution_hook=None, resources=None, resources_variable=None, resources_scope='map', scope=None, variant=None, variant_group=None)[source]#

A decorator that wraps a function in a PipeFunc instance.

Parameters:
  • output_name (str | tuple[str, ...]) – The identifier for the output of the decorated function. Provide a tuple of strings for multiple outputs.

  • output_picker (Callable[[Any, str], Any] | None) – A function that takes the output of the wrapped function as first argument and the output_name (str) as second argument, and returns the desired output. If None, the output of the wrapped function is returned as is.

  • renames (dict[str, str] | None) – A dictionary for renaming function arguments and outputs. The keys are the original names (as defined in the function signature or the output_name), and the values are the new names to be used. This allows you to change how the function is called without modifying its internal logic. For example, {"old_name": "new_name"} would allow the function to be called with new_name instead of old_name. If renaming the output_name, include it in this dictionary as well.

  • defaults (dict[str, Any] | None) – Set defaults for parameters. Overwrites any current defaults. Must be in terms of the renamed argument names.

  • bound (dict[str, Any] | None) – Bind arguments to the function. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.

  • profile (bool) – Flag indicating whether the decorated function should be profiled.

  • debug (bool) – Flag indicating whether debug information should be printed.

  • print_error (bool) – Flag indicating whether errors raised during the function execution should be printed.

  • cache (bool) – Flag indicating whether the decorated function should be cached.

  • mapspec (str | MapSpec | None) – This is a specification for mapping that dictates how input values should be merged together. If None, the default behavior is that the input directly maps to the output.

  • internal_shape (int | Literal['?'] | tuple[int | Literal['?'], ...] | None) – The shape of the output produced by this function when it is used within a ``mapspec`` context. Can be an int or a tuple of ints, or β€œ?” for unknown dimensions, or a tuple with a mix of both. If not provided, the shape will be inferred from the first execution of the function. If provided, the shape will be validated against the actual shape of the output. This parameter is required only when a mapspec like -> out[i] is used, indicating that the shape cannot be derived from the inputs. In case there are multiple outputs, provide the shape for one of the outputs. This works because the shape of all outputs are required to be identical.

  • post_execution_hook (Callable[[PipeFunc, Any, dict[str, Any]], None] | None) – A callback function that is invoked after the function is executed. The callback signature is hook(func: PipeFunc, result: Any, kwargs: dict) -> None. This hook can be used for logging, visualization of intermediate results, debugging, statistics collection, or other side effects. The hook is executed synchronously after the function returns but before the result is passed to the next function in the pipeline. Keep the hook lightweight to avoid impacting performance.

  • resources (dict | Resources | Callable[[dict[str, Any]], Resources | dict[str, Any]] | None) – A dictionary or Resources instance containing the resources required for the function. This can be used to specify the number of CPUs, GPUs, memory, wall time, queue, partition, and any extra job scheduler arguments. This is not used by the pipefunc directly but can be used by job schedulers to manage the resources required for the function. Alternatively, provide a callable that receives a dict with the input values and returns a Resources instance.

  • resources_variable (str | None) – If provided, the resources will be passed as the specified argument name to the function. This requires that the function has a parameter with the same name. For example, if resources_variable="resources", the function will be called as func(..., resources=Resources(...)). This is useful when the function handles internal parallelization.

  • resources_scope (Literal['map', 'element']) –

    Determines how resources are allocated in relation to the mapspec:

    • ”map”: Allocate resources for the entire mapspec operation (default).

    • ”element”: Allocate resources for each element in the mapspec.

    If no mapspec is defined, this parameter is ignored.

  • scope (str | None) –

    If provided, all parameter names and output names of the function will be prefixed with the specified scope followed by a dot ('.'), e.g., parameter x with scope foo becomes foo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use the PipeFunc.update_scope method.

    When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the f'{scope}.{name}' notation. For example, a PipeFunc instance with scope β€œfoo” and β€œbar”, the parameters can be provided as: func(foo=dict(a=1, b=2), bar=dict(a=3, b=4)) or func(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).

  • variant (str | dict[str | None, str] | None) –

    Identifies this function as an alternative implementation in a VariantPipeline and specifies which variant groups it belongs to. When multiple functions share the same output_name, variants allow selecting which implementation to use during pipeline execution.

    Can be specified in two formats: - A string (e.g., "fast"): Places the function in the default unnamed

    group (None) with the specified variant name. Equivalent to {None: "fast"}.

    • A dictionary (e.g., {"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.

    Functions with the same output_name but different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have β€œpreprocessing” variants (β€œv1”/”v2”) independent from β€œcomputation” variants (β€œfast”/”accurate”), allowing you to select specific combinations like {"preprocessing": "v1", "computation": "fast"}.

  • variant_group (str | None) – DEPRECATED in v0.58.0: Use variant instead.

Return type:

Callable[[Callable[..., Any]], PipeFunc]

Returns:

A wrapped function that takes the original function and output_name and creates a PipeFunc instance with the specified return identifier.

See also

PipeFunc

A function wrapper class for pipeline functions. Has the same functionality as the pipefunc decorator but can be used to wrap existing functions.

Examples

>>> @pipefunc(output_name="c")
... def add(a, b):
...     return a + b
>>> add(a=1, b=2)
3
>>> add.update_renames({"a": "x", "b": "y"})
>>> add(x=1, y=2)
3