pipefunc module#
PipeFunc: A Python library for defining, managing, and executing function pipelines.
- class pipefunc.ErrorSnapshot(function, exception, args, kwargs, timestamp=<factory>, user=<factory>, machine=<factory>, ip_address=<factory>, current_directory=<factory>)[source]#
Bases:
objectA snapshot that represents an error in a function call.
- classmethod load_from_file(filename)[source]#
Load an error snapshot from a file using cloudpickle.
- Return type:
- class pipefunc.NestedPipeFunc(pipefuncs, output_name=None, function_name=None, *, renames=None, mapspec=None, resources=None, resources_scope='map', bound=None, cache=None, variant=None, variant_group=None)[source]#
Bases:
PipeFuncCombine multiple
PipeFuncinstances into a single function with an internalPipeline.- Parameters:
pipefuncs (
list[PipeFunc]) β A sequence of at least 2PipeFuncinstances to combine into a single function.output_name (
str|tuple[str,...] |None) β The identifier for the output of the wrapped function. IfNone, it is automatically constructed from all the output names of thePipeFuncinstances. Must be a subset of the output names of thePipeFuncinstances.function_name (
str|None) β The name of the nested function, ifNonethe name will be set to"NestedPipeFunc_{output_name[0]}_{output_name[...]}".mapspec (
str|MapSpec|None) βMapSpecfor the joint function. IfNone, the mapspec is inferred from the individualPipeFuncinstances. None of the MapsSpec instances should have a reduction and all should use identical axes.resources (
dict|Resources|None) β Same as thePipeFuncclass. However, if it isNonehere, it is inferred from from thePipeFuncinstances. Specifically, it takes the maximum of the resources. Unlike thePipeFuncclass, theresourcesargument cannot be a callable.resources_scope (
Literal['map','element']) βSame as the
PipeFuncclass. Determines how resources are allocated in relation to the mapspec:βmapβ: Allocate resources for the entire mapspec operation (default).
βelementβ: Allocate resources for each element in the mapspec.
If no mapspec is defined, this parameter is ignored.
cache (
bool|None) β Flag indicating whether the wrapped function should be cached. If None, cache will be set to True if any of thePipeFuncinstances have caching enabled.bound (
dict[str,Any] |None) β Same as thePipeFuncclass. Bind arguments to the functions. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.variant (
str|dict[str|None,str] |None) βSame as the
PipeFuncclass. Identifies this function as an alternative implementation in aVariantPipelineand specifies which variant groups it belongs to. When multiple functions share the sameoutput_name, variants allow selecting which implementation to use during pipeline execution.Can be specified in two formats: - A string (e.g.,
"fast"): Places the function in the default unnamedgroup (None) with the specified variant name. Equivalent to
{None: "fast"}.A dictionary (e.g.,
{"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.
Functions with the same
output_namebut different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have βpreprocessingβ variants (βv1β/βv2β) independent from βcomputationβ variants (βfastβ/βaccurateβ), allowing you to select specific combinations like{"preprocessing": "v1", "computation": "fast"}.variant_group (
str|None) β DEPRECATED in v0.58.0: Use variant instead.
Notes
The
NestedPipeFuncclass is a subclass of thePipeFuncclass that allows you to combine multiplePipeFuncinstances into a single function that has an internalPipelineinstance.- copy(**update)[source]#
Create a copy of the
PipeFuncinstance, optionally updating the attributes.- Return type:
- property original_parameters: dict[str, Any][source]#
Return the original (before renames) parameters of the wrapped function.
- Returns:
A mapping of the original parameters of the wrapped function to their
respective
inspect.Parameterobjects.
- class pipefunc.PipeFunc(func, output_name, *, output_picker=None, renames=None, defaults=None, bound=None, profile=False, debug=False, print_error=True, cache=False, mapspec=None, internal_shape=None, post_execution_hook=None, resources=None, resources_variable=None, resources_scope='map', scope=None, variant=None, variant_group=None)[source]#
Bases:
Generic[T]Function wrapper class for pipeline functions with additional attributes.
- Parameters:
func (
TypeVar(T, bound=Callable[...,Any])) β The original function to be wrapped.output_name (
str|tuple[str,...]) β The identifier for the output of the wrapped function. Provide a tuple of strings for multiple outputs.output_picker (
Callable[[Any,str],Any] |None) β A function that takes the output of the wrapped function as first argument and theoutput_name(str) as second argument, and returns the desired output. IfNone, the output of the wrapped function is returned as is.renames (
dict[str,str] |None) β A dictionary for renaming function arguments and outputs. The keys are the original names (as defined in the function signature or theoutput_name), and the values are the new names to be used. This allows you to change how the function is called without modifying its internal logic. For example,{"old_name": "new_name"}would allow the function to be called withnew_nameinstead ofold_name. If renaming theoutput_name, include it in this dictionary as well.defaults (
dict[str,Any] |None) β Set defaults for parameters. Overwrites any current defaults. Must be in terms of the renamed argument names.bound (
dict[str,Any] |None) β Bind arguments to the function. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.profile (
bool) β Flag indicating whether the wrapped function should be profiled. Profiling is only available for sequential execution.debug (
bool) β Flag indicating whether debug information should be printed.print_error (
bool) β Flag indicating whether errors raised during the function execution should be printed.cache (
bool) β Flag indicating whether the wrapped function should be cached.mapspec (
str|MapSpec|None) β This is a specification for mapping that dictates how input values should be merged together. IfNone, the default behavior is that the input directly maps to the output.internal_shape (
int|Literal['?'] |tuple[int|Literal['?'],...] |None) β The shape of the output produced by this function when it is used within a ``mapspec`` context. Can be an int or a tuple of ints, or β?β for unknown dimensions, or a tuple with a mix of both. If not provided, the shape will be inferred from the first execution of the function. If provided, the shape will be validated against the actual shape of the output. This parameter is required only when a mapspec like -> out[i] is used, indicating that the shape cannot be derived from the inputs. In case there are multiple outputs, provide the shape for one of the outputs. This works because the shape of all outputs are required to be identical.post_execution_hook (
Callable[[PipeFunc,Any,dict[str,Any]],None] |None) β A callback function that is invoked after the function is executed. The callback signature ishook(func: PipeFunc, result: Any, kwargs: dict) -> None. This hook can be used for logging, visualization of intermediate results, debugging, statistics collection, or other side effects. The hook is executed synchronously after the function returns but before the result is passed to the next function in the pipeline. Keep the hook lightweight to avoid impacting performance.resources (
dict|Resources|Callable[[dict[str,Any]],Resources|dict[str,Any]] |None) β A dictionary or Resources instance containing the resources required for the function. This can be used to specify the number of CPUs, GPUs, memory, wall time, queue, partition, and any extra job scheduler arguments. This is not used by thepipefuncdirectly but can be used by job schedulers to manage the resources required for the function. Alternatively, provide a callable that receives a dict with the input values and returns a Resources instance.resources_variable (
str|None) β If provided, the resources will be passed as the specified argument name to the function. This requires that the function has a parameter with the same name. For example, ifresources_variable="resources", the function will be called asfunc(..., resources=Resources(...)). This is useful when the function handles internal parallelization.resources_scope (
Literal['map','element']) βDetermines how resources are allocated in relation to the mapspec:
βmapβ: Allocate resources for the entire mapspec operation (default).
βelementβ: Allocate resources for each element in the mapspec.
If no mapspec is defined, this parameter is ignored.
If provided, all parameter names and output names of the function will be prefixed with the specified scope followed by a dot (
'.'), e.g., parameterxwith scopefoobecomesfoo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use thePipeFunc.update_scopemethod.When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipeFuncinstance with scope βfooβ and βbarβ, the parameters can be provided as:func(foo=dict(a=1, b=2), bar=dict(a=3, b=4))orfunc(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).variant (
str|dict[str|None,str] |None) βIdentifies this function as an alternative implementation in a
VariantPipelineand specifies which variant groups it belongs to. When multiple functions share the sameoutput_name, variants allow selecting which implementation to use during pipeline execution.Can be specified in two formats: - A string (e.g.,
"fast"): Places the function in the default unnamedgroup (None) with the specified variant name. Equivalent to
{None: "fast"}.A dictionary (e.g.,
{"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.
Functions with the same
output_namebut different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have βpreprocessingβ variants (βv1β/βv2β) independent from βcomputationβ variants (βfastβ/βaccurateβ), allowing you to select specific combinations like{"preprocessing": "v1", "computation": "fast"}.variant_group (
str|None) β DEPRECATED in v0.58.0: Use variant instead.
- Return type:
A
PipeFuncinstance that wraps the original function with the specified return identifier.
- error_snapshot#
If an error occurs while calling the function, this attribute will contain an
ErrorSnapshotinstance with information about the error.
Examples
>>> def add_one(a, b): ... return a + 1, b + 1 >>> add_one_func = PipeFunc( ... add_one, ... output_name="c", ... renames={"a": "x", "b": "y"}, ... ) >>> add_one_func(x=1, y=2) (2, 3) >>> add_one_func.update_defaults({"x": 1, "y": 1}) >>> add_one_func() (2, 2)
- property bound: dict[str, Any]#
Return the bound arguments for the function. These are arguments that are fixed.
See also
update_boundUpdate the
boundparameters via this method.
- copy(**update)[source]#
Create a copy of the
PipeFuncinstance, optionally updating the attributes.- Return type:
- property defaults: dict[str, Any][source]#
Return the defaults for the function arguments.
- Return type:
A dictionary of default values for the keyword arguments.
See also
update_defaultsUpdate the
defaultsvia this method.
- property original_parameters: dict[str, Parameter]#
Return the original (before renames) parameters of the wrapped function.
- Returns:
A mapping of the original parameters of the wrapped function to their
respective
inspect.Parameterobjects.
- property output_annotation: dict[str, Any][source]#
Return the type annotation of the wrapped functionβs output.
- property output_name: str | tuple[str, ...][source]#
Return the output name(s) of the wrapped function.
- Return type:
The output name(s) of the wrapped function.
- property output_picker: Callable[[Any, str], Any] | None[source]#
Return the output picker function for the wrapped function.
The output picker function takes the output of the wrapped function as first argument and the
output_name(str) as second argument, and returns the desired output.
- property parameter_annotations: dict[str, Any][source]#
Return the type annotations of the wrapped functionβs parameters.
- property parameter_scopes: set[str][source]#
Return the scopes of the function parameters.
These are constructed from the parameter names that contain a dot. So if the parameter is
foo.bar, the scope isfoo.
- property renames: dict[str, str]#
Return the renames for the function arguments and output name.
See also
update_renamesUpdate the
renamesvia this method.
- property unscoped_parameters: tuple[str, ...][source]#
Return the parameters with the scope stripped off.
- update_bound(bound, *, overwrite=False)[source]#
Update the bound arguments for the function that are fixed.
- update_defaults(defaults, *, overwrite=False)[source]#
Update defaults to the provided keyword arguments.
- update_mapspec_axes(renames)[source]#
Update the MapSpec by renaming axes.
This method renames axes in the MapSpec while preserving the structure of the array specifications. It uses the MapSpec.rename_axes() method to perform type-safe axis renaming.
- Parameters:
renames (
dict[str,str]) β Dictionary mapping old axis names to new axis names.- Return type:
Examples
>>> @pipefunc(output_name="c", mapspec="a[i, j], b[i, j] -> c[i, j]") ... def f(a, b): ... return a + b >>> f.update_mapspec_axes({"i": "x", "j": "y"}) >>> str(f.mapspec) 'a[x, y], b[x, y] -> c[x, y]'
- update_renames(renames, *, update_from='current', overwrite=False)[source]#
Update renames to function arguments and
output_namefor the wrapped function.When renaming the
output_nameand if it is a tuple of strings, the renames must be provided as individual strings in the tuple.- Parameters:
renames (
dict[str,str]) β A dictionary of renames for the function arguments oroutput_name.update_from (
Literal['current','original']) β Whether to update the renames from the"current"parameter names (PipeFunc.parameters) or from the"original"parameter names as in the function signature (PipeFunc.original_parameters). If also updating theoutput_name, original means the name that was provided to thePipeFuncinstance.overwrite (
bool) β Whether to overwrite the existing renames. IfFalse, the new renames will be added to the existing renames.
- Return type:
- update_scope(scope, inputs=None, outputs=None, exclude=None)[source]#
Update the scope for the
PipeFuncby adding (or removing) a prefix to the input and output names.This method updates the names of the specified inputs and outputs by adding the provided scope as a prefix. The scope is added to the names using the format fβ{scope}.{name}β. If an input or output name already starts with the scope prefix, it remains unchanged. If there is an existing scope, it is replaced with the new scope.
Internally, simply calls
PipeFunc.update_renameswithrenames={name: f"{scope}.{name}", ...}.When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipeFuncinstance with scope βfooβ and βbarβ, the parameters can be provided as:func(foo=dict(a=1, b=2), bar=dict(a=3, b=4))orfunc(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).- Parameters:
scope (
str|None) β The scope to set for the inputs and outputs. IfNone, the scope of inputs and outputs is removed.inputs (
set[str] |Literal['*'] |None) β Specific input names to include, or β*β to include all inputs. If None, no inputs are included.outputs (
set[str] |Literal['*'] |None) β Specific output names to include, or β*β to include all outputs. If None, no outputs are included.exclude (
set[str] |None) β Names to exclude from the scope. This can include both inputs and outputs. Can be used with inputs or outputs being β*β to exclude specific names.
- Return type:
Examples
>>> f.update_scope("my_scope", inputs="*", outputs="*") # Add scope to all inputs and outputs >>> f.update_scope("my_scope", "*", "*", exclude={"output1"}) # Add to all except "output1" >>> f.update_scope("my_scope", inputs="*", outputs={"output2"}) # Add scope to all inputs and "output2" >>> f.update_scope(None, inputs="*", outputs="*") # Remove scope from all inputs and outputs
- class pipefunc.Pipeline(functions, *, lazy=False, debug=None, print_error=None, profile=None, cache_type=None, cache_kwargs=None, validate_type_annotations=True, scope=None, default_resources=None, name=None, description=None)[source]#
Bases:
objectPipeline class for managing and executing a sequence of functions.
- Parameters:
functions (
list[PipeFunc|tuple[PipeFunc,str|MapSpec]]) β A list of functions that form the pipeline. Note that the functions are copied when added to the pipeline usingPipeFunc.copy.lazy (
bool) β Flag indicating whether the pipeline should be lazy.debug (
bool|None) β Flag indicating whether debug information should be printed. IfNone, the value of each PipeFuncβs debug attribute is used.print_error (
bool|None) β Flag indicating whether errors raised during the function execution should be printed. IfNone, the value of each PipeFuncβs print_error attribute is used.profile (
bool|None) β Flag indicating whether profiling information should be collected. IfNone, the value of each PipeFuncβs profile attribute is used. Profiling is only available for sequential execution.cache_type (
Literal['lru','hybrid','disk','simple'] |None) β The type of cache to use. See the notes below for more important information.cache_kwargs (
dict[str,Any] |None) β Keyword arguments passed to the cache constructor.validate_type_annotations (
bool) β Flag indicating whether type validation should be performed. IfTrue, the type annotations of the functions are validated during the pipeline initialization. IfFalse, the type annotations are not validated.If provided, all parameter names and output names of the pipeline functions will be prefixed with the specified scope followed by a dot (
'.'), e.g., parameterxwith scopefoobecomesfoo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use thePipeline.update_scopemethod.When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipelineinstance with scope βfooβ and βbarβ, the parameters can be provided as:pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4))orpipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).default_resources (
dict[str,Any] |Resources|None) β Default resources to use for the pipeline functions. IfNone, the resources are not set. Either a dict or apipefunc.resources.Resourcesinstance can be provided. If provided, the resources in thePipeFuncinstances are updated with the default resources.name (
str|None) β A name for the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.description (
str|None) β A description of the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.
Notes
Important note about caching: The caching behavior differs between
pipeline.mapandpipeline.run/pipeline(...).For
pipeline.runandpipeline(...)(βcalling the pipeline as a functionβ):
The cache key is computed based solely on the root arguments provided to the pipeline.
Only the root arguments need to be hashable.
The root arguments uniquely determine the output across the entire pipeline, allowing caching to be simple and effective when computing the final result.
For
pipeline.map:
The cache key is computed based on the input values of each
PipeFunc.So a
PipeFuncwithcache=Truemust have hashable input values.When using
pipeline.map(..., parallel=True), the cache itself will be serialized, so one must use a cache that supports shared memory, such asLRUCachewithshared=Trueor uses a disk cache likeDiskCache.
For both methods:
The
pipefunc.cache.to_hashablefunction is used to attempt to ensure that input values are hashable, which is a requirement for storing results in a cache.This function works for many common types but is not guaranteed to work for all types.
If
to_hashablecannot make a value hashable, it falls back to using the serialized representation of the value.
The key difference is that
pipeline.runβs output is uniquely determined by the root arguments, whilepipeline.mapis not because it may contain reduction operations as described byMapSpec.- add(f, mapspec=None)[source]#
Add a function to the pipeline.
Always creates a copy of the
PipeFuncinstance to avoid side effects.- Parameters:
f (
PipeFunc|Callable) β The function to add to the pipeline.profile β Flag indicating whether profiling information should be collected.
mapspec (
str|MapSpec|None) β This is a specification for mapping that dictates how input values should be merged together. IfNone, the default behavior is that the input directly maps to the output.
- Return type:
- add_mapspec_axis(*parameter, axis)[source]#
Add a new axis to
parameterβs MapSpec.- Parameters:
parameter (
str) β The parameter to add an axis to.axis (
str) β The axis to add to the MapSpec of all functions that depends onparameter. Provide a new axis name to add a new axis or an existing axis name to zip the parameter with the existing axis. Can be a comma-separated string to add multiple axes at once.
- Return type:
- property all_arg_combinations: dict[str | tuple[str, ...], set[tuple[str, ...]]][source]#
Compute all possible argument mappings for the pipeline.
- Returns:
A dictionary mapping function names to sets of tuples containing
possible argument combinations.
- property all_root_args: dict[str | tuple[str, ...], tuple[str, ...]][source]#
Return the root arguments required to compute all outputs.
- cli(description=None)[source]#
Automatically construct a command-line interface using argparse.
This method creates an
argparse.ArgumentParserinstance, adds arguments for each root parameter in the pipeline using a Pydantic model, sets default values if they exist, parses the command-line arguments, and runs one of three subcommands:cli: Supply individual input parameters as command-line options.json: Load all input parameters from a JSON file.docs: Display the pipeline documentation (using pipeline.print_documentation).
Mapping options (prefixed with βmap-) are available for the
cliandjsonsubcommands to control parallel execution, storage method, and cleanup behavior.Usage Examples:
- CLI mode:
python cli-example.py cli --x 2 --y 3 --map-parallel false --map-cleanup true- JSON mode:
python cli-example.py json --json-file inputs.json --map-parallel false --map-cleanup true- Docs mode:
python cli-example.py docs
- Parameters:
- Raises:
ValueError β If an invalid subcommand is specified.
FileNotFoundError β If the JSON input file does not exist (in JSON mode).
json.JSONDecodeError β If the JSON input file is not formatted correctly.
- Return type:
Examples
>>> if __name__ == "__main__": ... pipeline = create_my_pipeline() ... pipeline.cli()
See also
pydantic_modelGenerate a Pydantic model for pipeline root input parameters.
print_documentationPrint the pipeline documentation as a table formatted with Rich.
- property error_snapshot: ErrorSnapshot | None#
Return an error snapshot for the pipeline.
This value is None if no errors have occurred during the pipeline execution.
- func_dependencies(output_name)[source]#
Return the functions required to compute a specific output.
See also
- func_dependents(name)[source]#
Return the functions that depend on a specific input/output.
See also
- property graph: DiGraph[source]#
Create a directed graph representing the pipeline.
- Returns:
A directed graph with nodes representing functions and edges
representing dependencies between functions.
- independent_axes_in_mapspecs(output_name)[source]#
Return the axes that are both in the output and in the root arguments.
Identifies axes that are cross-products and can be computed independently.
- info(*, print_table=False)[source]#
Return information about inputs and outputs of the Pipeline.
- Parameters:
print_table (
bool) β Whether to print a rich-formatted table to the console. Requires therichpackage.- Returns:
If print_table is False, returns a dictionary containing information about the inputs and outputs of the Pipeline, with the following keys:
inputs: The input arguments of the Pipeline.outputs: The output arguments of the Pipeline.intermediate_outputs: The intermediate output arguments of the Pipeline.required_inputs: The required input arguments of the Pipeline.optional_inputs: The optional input arguments of the Pipeline (seePipeline.defaults).
If print_table is True, prints a rich-formatted table to the console and returns None.
- Return type:
dict or None
See also
defaultsA dictionary with input name to default value mappings.
leaf_nodesThe leaf nodes of the pipeline as
PipeFuncobjects.root_argsThe root arguments (inputs) required to compute the output of the pipeline.
print_documentationPrint formatted documentation of the pipeline to the console.
- join(*pipelines)[source]#
Join multiple pipelines into a single new pipeline.
The new pipeline has no default_resources set, instead, each function has a Resources attribute that is created via
Resources.maybe_with_defaults(f.resources, pipeline.default_resources).
- property leaf_nodes: list[PipeFunc][source]#
Return the leaf nodes in the pipelineβs execution graph.
- map(inputs, run_folder=None, internal_shapes=None, *, output_names=None, parallel=True, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, return_results=True, error_handling='raise', scheduling_strategy='generation')[source]#
Run a pipeline with MapSpec functions for given
inputs.- Parameters:
inputs (
dict[str,Any] |BaseModel) β The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions withoutmapspecor lists of values or numpy.ndarray`s for functions with ``mapspec`.run_folder (
str|Path|None) β The folder to store the run information. IfNone, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) β The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or β?β for unknown dimensions. Theinternal_shapecan also be provided via thePipeFunc(..., internal_shape=...)argument. If aPipeFunchas aninternal_shapeargument and it is provided here, the provided value is used.output_names (
set[str|tuple[str,...]] |None) β The output(s) to calculate. IfNone, the entire pipeline is run and all outputs are computed.parallel (
bool) β Whether to run the functions in parallel. Is ignored if providedexecutoris notNone.executor (
Executor|dict[str|tuple[str,...],Executor] |None) βThe executor to use for parallel execution. Can be specified as:
None: Aconcurrent.futures.ProcessPoolExecutoris used (only ifparallel=True).A
concurrent.futures.Executorinstance: Used for all outputs.A dictionary: Specify different executors for different outputs.
Use output names as keys and
Executorinstances as values.Use an empty string
""as a key to set a default executor.
If parallel is
False, this argument is ignored.chunksizes (
int|dict[str|tuple[str,...],int|Callable[[int],int] |None] |None) βControls batching of
MapSpeccomputations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:None: Automatically determine optimal chunk sizes (default)
int: Same chunk size for all outputs
- dict: Different chunk sizes per output where:
Keys are output names (or
""for default)Values are either integers or callables
Callables take total execution count and return chunk size
Examples:
>>> chunksizes = None # Auto-determine optimal chunk sizes >>> chunksizes = 100 # All outputs use chunks of 100 >>> chunksizes = {"out1": 50, "out2": 100} # Different sizes per output >>> chunksizes = {"": 50, "out1": lambda n: n // 20} # Default and dynamic
storage (
str|Literal['file_array','dict','shared_memory_dict'] |dict[str|tuple[str,...],Literal['file_array','dict','shared_memory_dict'] |str] |None) βThe storage class to use for storing intermediate and final results. Can be specified as:
A string: Use a single storage class for all outputs.
A dictionary: Specify different storage classes for different outputs.
Use output names as keys and storage class names as values.
Use an empty string
""as a key to set a default storage class.
Available storage classes are registered in
pipefunc.map.storage_registry. Common options include"file_array","dict", and"shared_memory_dict". Defaults to"file_array"ifrun_folderis provided, otherwise"dict".persist_memory (
bool) β Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) βWhether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.resume_validation (
Literal['auto','strict','skip']) βControls validation strictness when reusing data from a previous run (only applies when
resume=True):"auto"(default): Validate that inputs/defaults match the previous run. If equality comparison fails (returnsNone), warn but proceed anyway."strict": Validate that inputs/defaults match. Raise an error if equality comparison fails."skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.
Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when
resume=False.fixed_indices (
dict[str,int|slice] |None) β A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.auto_subpipeline (
bool) β IfTrue, a subpipeline is created with the specifiedinputs, usingPipeline.subpipeline. This allows to provide intermediate results in theinputsinstead of providing the root arguments. IfFalse, all root arguments must be provided, and an exception is raised if any are missing.show_progress (
bool|Literal['rich','ipywidgets','headless'] |None) βWhether to display a progress bar. Can be:
True: Display a progress bar. Auto-selects based on environment:ipywidgetsin Jupyter (if installed), otherwiserich(if installed).False: No progress bar."ipywidgets": Forceipywidgetsprogress bar (HTML-based). Shown only if in a Jupyter notebook andipywidgetsis installed."rich": Forcerichprogress bar (text-based). Shown only ifrichis installed."headless": No progress bar, but the progress is still tracked internally.None(default): Showsipywidgetsprogress bar only if running in a Jupyter notebook andipywidgetsis installed. Otherwise, no progress bar is shown.
return_results (
bool) β Whether to return the results of the pipeline. IfFalse, the pipeline is run without keeping the results in memory. Instead the results are only kept in the setstorage. This is useful for very large pipelines where the results do not fit into memory.error_handling (
Literal['raise','continue']) βHow to handle errors during function execution:
"raise"(default): Stop execution on first error and raise exception"continue": Continue execution, collecting errors as ErrorSnapshot objects
scheduling_strategy (
Literal['generation','eager']) βStrategy for scheduling pipeline function execution:
βgenerationβ (default): Executes functions in strict topological generations, waiting for all functions in a generation to complete before starting the next. Provides predictable execution order but may not maximize parallelism.
βeagerβ: Dynamically schedules functions as soon as their dependencies are met, without waiting for entire generations to complete. Can improve performance by maximizing parallel execution, especially for complex dependency graphs with varied execution times.
See also
map_asyncThe asynchronous version of this method.
- Return type:
ResultDict- Returns:
A ResultDict containing the results of the pipeline. The values are of type Result, use Result.output to get the actual result.
- map_async(inputs, run_folder=None, internal_shapes=None, *, output_names=None, executor=None, chunksizes=None, storage=None, persist_memory=True, cleanup=None, resume=False, resume_validation='auto', fixed_indices=None, auto_subpipeline=False, show_progress=None, display_widgets=True, return_results=True, scheduling_strategy='generation', error_handling='raise', start=True)[source]#
Asynchronously run a pipeline with MapSpec functions for given
inputs.Returns immediately with an AsyncRun instance with a task attribute that can be awaited.
- Parameters:
inputs (
dict[str,Any] |BaseModel) β The inputs to the pipeline. The keys should be the names of the input parameters of the pipeline functions and the values should be the corresponding input data, these are either single values for functions withoutmapspecor lists of values or numpy.ndarray`s for functions with ``mapspec`.run_folder (
str|Path|None) β The folder to store the run information. IfNone, either a temporary folder is created or no folder is used, depending on whether the storage class requires serialization.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) β The shapes for intermediary outputs that cannot be inferred from the inputs. If not provided, the shapes will be inferred from the first execution of the function. If provided, the shapes will be validated against the actual shapes of the outputs. The values can be either integers or β?β for unknown dimensions. Theinternal_shapecan also be provided via thePipeFunc(..., internal_shape=...)argument. If aPipeFunchas aninternal_shapeargument and it is provided here, the provided value is used.output_names (
set[str|tuple[str,...]] |None) β The output(s) to calculate. IfNone, the entire pipeline is run and all outputs are computed.executor (
Executor|dict[str|tuple[str,...],Executor] |None) βThe executor to use for parallel execution. Can be specified as:
None: Aconcurrent.futures.ProcessPoolExecutoris used (only ifparallel=True).A
concurrent.futures.Executorinstance: Used for all outputs.A dictionary: Specify different executors for different outputs.
Use output names as keys and
Executorinstances as values.Use an empty string
""as a key to set a default executor.
chunksizes (
int|dict[str|tuple[str,...],int|Callable[[int],int] |None] |None) βControls batching of
MapSpeccomputations for parallel execution. Reduces overhead by grouping multiple function calls into single tasks. Can be specified as:None: Automatically determine optimal chunk sizes (default)
int: Same chunk size for all outputs
- dict: Different chunk sizes per output where:
Keys are output names (or
""for default)Values are either integers or callables
Callables take total execution count and return chunk size
Examples:
>>> chunksizes = None # Auto-determine optimal chunk sizes >>> chunksizes = 100 # All outputs use chunks of 100 >>> chunksizes = {"out1": 50, "out2": 100} # Different sizes per output >>> chunksizes = {"": 50, "out1": lambda n: n // 20} # Default and dynamic
storage (
str|Literal['file_array','dict','shared_memory_dict'] |dict[str|tuple[str,...],Literal['file_array','dict','shared_memory_dict'] |str] |None) βThe storage class to use for storing intermediate and final results. Can be specified as:
A string: Use a single storage class for all outputs.
A dictionary: Specify different storage classes for different outputs.
Use output names as keys and storage class names as values.
Use an empty string
""as a key to set a default storage class.
Available storage classes are registered in
pipefunc.map.storage_registry. Common options include"file_array","dict", and"shared_memory_dict". Defaults to"file_array"ifrun_folderis provided, otherwise"dict".persist_memory (
bool) β Whether to write results to disk when memory based storage is used. Does not have any effect when file based storage is used.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) βWhether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.resume_validation (
Literal['auto','strict','skip']) βControls validation strictness when reusing data from a previous run (only applies when
resume=True):"auto"(default): Validate that inputs/defaults match the previous run. If equality comparison fails (returnsNone), warn but proceed anyway."strict": Validate that inputs/defaults match. Raise an error if equality comparison fails."skip": Skip input/default validation entirely. Use when your input objects have broken ``__eq__`` implementations that return incorrect results. You are responsible for ensuring inputs are actually identical.
Note: Shapes and MapSpecs are always validated regardless of this setting. Ignored when
resume=False.fixed_indices (
dict[str,int|slice] |None) β A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.auto_subpipeline (
bool) β IfTrue, a subpipeline is created with the specifiedinputs, usingPipeline.subpipeline. This allows to provide intermediate results in theinputsinstead of providing the root arguments. IfFalse, all root arguments must be provided, and an exception is raised if any are missing.show_progress (
bool|Literal['rich','ipywidgets','headless'] |None) βWhether to display a progress bar. Can be:
True: Display a progress bar. Auto-selects based on environment:ipywidgetsin Jupyter (if installed), otherwiserich(if installed).False: No progress bar."ipywidgets": Forceipywidgetsprogress bar (HTML-based). Shown only if in a Jupyter notebook andipywidgetsis installed."rich": Forcerichprogress bar (text-based). Shown only ifrichis installed."headless": No progress bar, but the progress is still tracked internally.None(default): Showsipywidgetsprogress bar only if running in a Jupyter notebook andipywidgetsis installed. Otherwise, no progress bar is shown.
display_widgets (
bool) β Whether to callIPython.display.display(...)on widgets. Ignored if outside of a Jupyter notebook.return_results (
bool) β Whether to return the results of the pipeline. IfFalse, the pipeline is run without keeping the results in memory. Instead the results are only kept in the setstorage. This is useful for very large pipelines where the results do not fit into memory.scheduling_strategy (
Literal['generation','eager']) βStrategy for scheduling pipeline function execution:
βgenerationβ (default): Executes functions in strict topological generations, waiting for all functions in a generation to complete before starting the next. Provides predictable execution order but may not maximize parallelism.
βeagerβ: Dynamically schedules functions as soon as their dependencies are met, without waiting for entire generations to complete. Can improve performance by maximizing parallel execution, especially for complex dependency graphs with varied execution times.
error_handling (
Literal['raise','continue']) βHow to handle errors during function execution:
"raise"(default): Stop execution on first error and raise exception"continue": Continue execution, collecting errors as ErrorSnapshot objects
start (
bool) β Whether to start the pipeline immediately. IfFalse, the pipeline is not started until the start() method on the AsyncMap instance is called.
See also
mapThe synchronous version of this method.
- Return type:
AsyncMap- Returns:
An AsyncRun instance that contains
run_info,progressandtask. Thetaskcan be awaited to get the final result of the pipeline.
- property mapspec_axes: dict[str, tuple[str, ...]][source]#
Return the axes for each array parameter in the pipeline.
- property mapspec_dimensions: dict[str, int][source]#
Return the number of dimensions for each array parameter in the pipeline.
- property mapspecs_as_strings: list[str][source]#
Return the MapSpecs for all functions in the pipeline as strings.
- nest_funcs(output_names, new_output_name=None, function_name=None)[source]#
Replaces a set of output names with a single nested function inplace.
- Parameters:
output_names (
set[str|tuple[str,...]] |Literal['*']) β The output names to nest in aNestedPipeFunc. Can also be"*"to nest all functions in the pipeline into a singleNestedPipeFunc.new_output_name (
str|tuple[str,...] |None) β The identifier for the output of the wrapped function. IfNone, it is automatically constructed from all the output names of thePipeFuncinstances. Must be a subset of the output names of thePipeFuncinstances.function_name (
str|None) β The name of the nested function, ifNonethe name will be set to"NestedPipeFunc_{output_name[0]}_{output_name[...]}".
- Return type:
- Returns:
The newly added
NestedPipeFuncinstance.
- property node_mapping: dict[str | tuple[str, ...], PipeFunc | str][source]#
Return a mapping from node names to nodes.
- Return type:
A mapping from node names to nodes.
- property output_annotations: dict[str, Any][source]#
Return the (final and intermediate) output annotations for the pipeline.
- property output_to_func: dict[str | tuple[str, ...], PipeFunc][source]#
Return a mapping from output names to functions.
The mapping includes functions with multiple outputs both as individual outputs and as tuples of outputs. For example, if a function has the output name
("a", "b"), the mapping will include both"a","b", and("a", "b")as keys.See also
__getitem__Shortcut for accessing the function corresponding to a specific output name.
- property parameter_annotations: dict[str, Any][source]#
Return the parameter annotations for the pipeline.
The parameter annotations are computed by traversing the pipeline graph in topological order and collecting the annotations from the functions. If there are conflicting annotations for the same parameter, a warning is issued and the first encountered annotation is used.
- print_documentation(*, borders=False, skip_optional=False, skip_intermediate=True, description_table=True, parameters_table=True, returns_table=True, order='topological')[source]#
Print the documentation for the pipeline as a table formatted with Rich.
- Parameters:
borders (
bool) β Whether to include borders in the tables.skip_optional (
bool) β Whether to skip optional parameters.skip_intermediate (
bool) β Whether to skip intermediate outputs and only show root parameters.description_table (
bool) β Whether to generate the function description table.parameters_table (
bool) β Whether to generate the function parameters table.returns_table (
bool) β Whether to generate the function returns table.order (
Literal['topological','alphabetical']) βThe order in which to display the functions in the documentation. Options are:
topological: Display functions in topological order.alphabetical: Display functions in alphabetical order (usingoutput_name).
emojis β Whether to use emojis in the documentation.
- Return type:
See also
infoReturns the input and output information for the pipeline.
- property print_error: bool | None#
Flag indicating whether errors raised during the function execution should be printed.
- print_profiling_stats()[source]#
Display the resource usage report for each function in the pipeline.
- Return type:
- property profiling_stats: dict[str, ProfilingStats]#
Return the profiling data for each function in the pipeline.
- pydantic_model(model_name='InputModel')[source]#
Generate a Pydantic model for pipeline root input parameters.
Inspects the pipeline to extract defaults, type annotations, and docstrings to create a model that validates and coerces input data (e.g., from JSON) to the correct types. This is useful for ensuring that inputs meet the pipelineβs requirements and for generating a CLI.
Multidimensional Array Handling: Array inputs specified via mapspecs are annotated as nested lists because Pydantic cannot directly coerce JSON arrays into NumPy arrays. After validation, these lists are converted to NumPy ndarrays.
- Parameters:
model_name (
str) β Name for the generated Pydantic model class.- Returns:
A dynamically generated Pydantic model class for validating pipeline inputs. It:
Validates and coerces input data to the expected types.
Annotates multidimensional arrays as nested lists and converts them to NumPy arrays.
Facilitates CLI creation by ensuring proper input validation.
- Return type:
type[pydantic.BaseModel]
Examples
>>> from pipefunc import Pipeline, pipefunc >>> @pipefunc("foo") ... def foo(x: int, y: int = 1) -> int: ... return x + y >>> pipeline = Pipeline([foo]) >>> InputModel = pipeline.pydantic_model() >>> inputs = {"x": "10", "y": "2"} >>> model = InputModel(**inputs) >>> model.x, model.y (10, 2) >>> results = pipeline.map(model) # Equivalent to `pipeline.map(inputs)`
Notes
If available, detailed parameter descriptions are extracted from docstrings using griffe.
This method is especially useful for CLI generation, ensuring that user inputs are properly validated and converted before pipeline execution.
See also
cliAutomatically construct a command-line interface using argparse.
print_documentationPrint the pipeline documentation as a table formatted with Rich.
- root_args(output_name=None)[source]#
Return the root arguments required to compute a specific (or all) output(s).
- Parameters:
output_name (
str|tuple[str,...] |None) β The identifier for the return value of the pipeline. IfNone, the root arguments for all outputs are returned.- Return type:
- Returns:
A tuple containing the root arguments required to compute the output. The tuple is sorted in alphabetical order.
- property root_nodes: list[PipeFunc][source]#
Return the root nodes in the pipelineβs execution graph.
- run(output_name, *, full_output=False, kwargs, allow_unused=False)[source]#
Execute the pipeline for a specific return value.
- Parameters:
output_name (
str|tuple[str,...] |list[str|tuple[str,...]]) β The identifier for the return value of the pipeline. Can be a single output name or a list of output names.full_output (
bool) β Whether to return the outputs of all function executions as a dictionary mapping function names to their return values.kwargs (
dict[str,Any]) β Keyword arguments to be passed to the pipeline functions.allow_unused (
bool) β Whether to allow unused keyword arguments. IfFalse, an error is raised if any keyword arguments are unused. IfTrue, unused keyword arguments are ignored.
- Return type:
- Returns:
A dictionary mapping function names to their return values if
full_outputisTrue. Otherwise, the return value is the return value of the pipeline function specified byoutput_name. Ifoutput_nameis a list, the return value is a tuple of the return values of the pipeline functions.
- simplified_pipeline(output_name=None, *, conservatively_combine=False)[source]#
Simplify pipeline with combined function nodes.
Generate a simplified version of the pipeline where combinable function nodes have been merged into single function nodes.
This method identifies combinable nodes in the pipelineβs execution graph (i.e., functions that share the same root arguments) and merges them into single function nodes. This results in a simplified pipeline where each key function only depends on nodes that cannot be further combined.
- Parameters:
output_name (
str|tuple[str,...] |None) β The name of the output from the pipeline function we are starting the simplification from. IfNone, the unique tip of the pipeline graph is used (if there is one).conservatively_combine (
bool) β If True, only combine a function node with its predecessors if all of its predecessors have the same root arguments as the function node itself. If False, combine a function node with its predecessors if any of its predecessors have the same root arguments as the function node.
- Return type:
- Returns:
The simplified version of the pipeline.
- property sorted_functions: list[PipeFunc][source]#
Return the functions in the pipeline in topological order.
- split_disconnected(**pipeline_kwargs)[source]#
Split disconnected components of the pipeline into separate pipelines.
- subpipeline(inputs=None, output_names=None)[source]#
Create a new pipeline containing only the nodes between the specified inputs and outputs.
- Parameters:
inputs (
set[str] |None) β Set of input names to include in the subpipeline. IfNone, all root nodes of the original pipeline will be used as inputs.output_names (
set[str|tuple[str,...]] |None) β Set of output names to include in the subpipeline. IfNone, all leaf nodes of the original pipeline will be used as outputs.
- Return type:
- Returns:
A new pipeline containing only the nodes and connections between the specified inputs and outputs.
Notes
The subpipeline is created by copying the original pipeline and then removing the nodes that are not part of the path from the specified inputs to the specified outputs. The resulting subpipeline will have the same behavior as the original pipeline for the selected inputs and outputs.
If
inputsis provided, the subpipeline will use those nodes as the new root nodes. Ifoutput_namesis provided, the subpipeline will use those nodes as the new leaf nodes.Examples
>>> @pipefunc(output_name="y", mapspec="x[i] -> y[i]") ... def f(x: int) -> int: ... return x ... >>> @pipefunc(output_name="z") ... def g(y: np.ndarray) -> int: ... return sum(y) ... >>> pipeline = Pipeline([f, g]) >>> inputs = {"x": [1, 2, 3]} >>> results = pipeline.map(inputs, "tmp_path") >>> partial = pipeline.subpipeline({"y"}) >>> r = partial.map({"y": results["y"].output}, "tmp_path") >>> assert len(r) == 1 >>> assert r["z"].output == 6
- property topological_generations: Generations[source]#
Return the functions in the pipeline grouped by topological generation.
This method uses
networkx.topological_generationson the pipeline graph to group functions by their dependency order. The result includes:Root arguments: Initial inputs to the pipeline.
Function generations: Subsequent groups of functions in topological order.
Nullary functions (those without parameters) are handled specially to ensure theyβre included in the generations rather than treated as root arguments.
- update_defaults(defaults, *, overwrite=False)[source]#
Update defaults to the provided keyword arguments.
Automatically traverses the pipeline graph to find all functions that the defaults can be applied to.
If overwrite is False, the new defaults will be added to the existing defaults. If overwrite is True, the existing defaults will be replaced with the new defaults.
- update_renames(renames, *, update_from='current', overwrite=False)[source]#
Update the renames for the pipeline.
Automatically traverses the pipeline graph to find all functions that the renames can be applied to.
- Parameters:
renames (
dict[str,str]) β A dictionary mapping old parameter names to new parameter and output names.update_from (
Literal['current','original']) β Whether to update the renames from the current parameter names (PipeFunc.parameters) or from the original parameter names (PipeFunc.original_parameters).overwrite (
bool) β Whether to overwrite the existing renames. IfFalse, the new renames will be added to the existing renames.
- Return type:
- update_scope(scope, inputs=None, outputs=None, exclude=None)[source]#
Update the scope for the pipeline by adding (or removing) a prefix to the input and output names.
This method updates the names of the specified inputs and outputs by adding the provided scope as a prefix. The scope is added to the names using the format
f"{scope}.{name}". If an input or output name already starts with the scope prefix, it remains unchanged. If there is an existing scope, it is replaced with the new scope.inputsare the root arguments of the pipeline. Inputs to functions which are outputs of other functions are considered to be outputs.Internally, simply calls
PipeFunc.update_renameswithrenames={name: f"{scope}.{name}", ...}.When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipelineinstance with scope βfooβ and βbarβ, the parameters can be provided as:pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4))orpipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).- Parameters:
scope (
str|None) β The scope to set for the inputs and outputs. IfNone, the scope of inputs and outputs is removed.inputs (
set[str] |Literal['*'] |None) β Specific input names to include, or"*"to include all inputs. The inputs are only the root arguments of the pipeline. IfNone, no inputs are included.outputs (
set[str] |Literal['*'] |None) β Specific output names to include, or"*"to include all outputs. IfNone, no outputs are included.exclude (
set[str] |None) β Names to exclude from the scope. Both inputs and outputs can be excluded. Can be used withinputsoroutputsbeing"*"to exclude specific names.
- Raises:
ValueError β If no functionβs scope was updated, e.g., when both
inputs=Noneandoutputs=None.- Return type:
Examples
>>> pipeline.update_scope("my_scope", inputs="*", outputs="*") # Add scope to all inputs and outputs >>> pipeline.update_scope("my_scope", "*", "*", exclude={"output1"}) # Add to all except "output1" >>> pipeline.update_scope("my_scope", inputs="*", outputs={"output2"}) # Add scope to all inputs and "output2" >>> pipeline.update_scope(None, inputs="*", outputs="*") # Remove scope from all inputs and outputs
- validate()[source]#
Validate the pipeline (checks its scopes, renames, defaults, mapspec, type hints).
This is automatically called when the pipeline is created and when calling state updating methods like {method}`~Pipeline.update_renames` or {method}`~Pipeline.update_defaults`. Should be called manually after e.g., manually updating pipeline.validate_type_annotations or changing some other attributes.
- Return type:
- visualize(*, backend=None, **kwargs)[source]#
Visualize the pipeline as a directed graph.
If running in a Jupyter notebook and not in VS Code a widget-based backend will be used if available.
- Parameters:
backend (
Literal['matplotlib','graphviz','graphviz_widget','holoviews'] |None) β The plotting backend to use. IfNone, the best backend available will be used in the following order: Graphviz (widget), Graphviz, Matplotlib, and HoloViews.kwargs (
Any) β Additional keyword arguments passed to the plotting function.
- Return type:
- Returns:
The output of the plotting function.
See also
visualize_graphvizCreate a directed graph using Graphviz (
backend="graphviz").visualize_graphviz_widgetCreate a directed graph using Graphviz and ipywidgets (
backend="graphviz_widget").visualize_matplotlibCreate a directed graph using Matplotlib (
backend="matplotlib").visualize_holoviewsCreate a directed graph using HoloViews (
backend="holoviews").
- visualize_graphviz(*, figsize=None, collapse_scopes=False, min_arg_group_size=None, hide_default_args=False, filename=None, style=None, orient='LR', graphviz_kwargs=None, show_legend=True, include_full_mapspec=False, return_type=None)[source]#
Visualize the pipeline as a directed graph using Graphviz.
- Parameters:
figsize (
tuple[int,int] |int|None) β The width and height of the figure in inches. If a single integer is provided, the figure will be a square. IfNone, the size will be determined automatically.collapse_scopes (
bool|Sequence[str]) β Whether to collapse scopes in the graph. IfTrue, scopes are collapsed into a single node. If a sequence of scope names, only the specified scopes are collapsed.min_arg_group_size (
int|None) β Minimum number of parameters to combine into a single node. Only applies to parameters used exclusively by one PipeFunc. If None, no grouping is performed.hide_default_args (
bool) β Whether to hide default arguments in the graph.filename (
str|Path|None) β The filename to save the figure to, if provided.style (
GraphvizStyle|None) β Style for the graph visualization.orient (
Literal['TB','LR','BT','RL']) β Graph orientation: βTBβ, βLRβ, βBTβ, βRLβ.graphviz_kwargs (
dict[str,Any] |None) β Graphviz-specific keyword arguments for customizing the graphβs appearance.show_legend (
bool) β Whether to show the legend in the graph visualization.include_full_mapspec (
bool) β Whether to include the full mapspec as a separate line in thePipeFunclabels.return_type (
Literal['graphviz','html'] |None) β The format to return the visualization in. If'html', the visualization is returned as a IPython.display.html, if'graphviz', thegraphviz.Digraphobject is returned. IfNone, the format is'html'if running in a Jupyter notebook, otherwise'graphviz'.
- Returns:
The resulting Graphviz Digraph object.
- Return type:
graphviz.Digraph
- visualize_graphviz_widget(*, orient='TB', graphviz_kwargs=None)[source]#
Create an interactive visualization of the pipeline as a directed graph.
Creates a widget that allows interactive exploration of the pipeline graph. The widget provides the following interactions:
Zoom: Use mouse scroll
Pan: Click and drag
Node selection: Click on nodes to highlight connected nodes
Multi-select: Shift-click on nodes to select multiple routes
Search: Use the search box to highlight matching nodes
Reset view: Press Escape
Requires the graphviz-anywidget package to be installed, which is maintained by the pipefunc authors, see pipefunc/graphviz-anywidget
- Parameters:
orient (
Literal['TB','LR','BT','RL']) β Graph orientation, controlling the main direction of the graph flow. Options are: - βTBβ: Top to bottom - βLRβ: Left to right - βBTβ: Bottom to top - βRLβ: Right to leftgraphviz_kwargs (
dict[str,Any] |None) β Graphviz-specific keyword arguments for customizing the graphβs appearance.
- Returns:
Interactive widget containing the graph visualization.
- Return type:
ipywidgets.VBox
- visualize_holoviews(*, show=False)[source]#
Visualize the pipeline as a directed graph using HoloViews.
- visualize_matplotlib(figsize=(10, 10), filename=None, *, color_combinable=False, conservatively_combine=False, output_name=None)[source]#
Visualize the pipeline as a directed graph.
- Parameters:
figsize (
tuple[int,int] |int) β The width and height of the figure in inches. If a single integer is provided, the figure will be a square.filename (
str|Path|None) β The filename to save the figure to.color_combinable (
bool) β Whether to color combinable nodes differently.conservatively_combine (
bool) β Argument as passed to Pipeline.simplify_pipeline.output_name (
str|tuple[str,...] |None) β Argument as passed to Pipeline.simplify_pipeline.
- Return type:
- class pipefunc.VariantPipeline(functions, *, default_variant=None, lazy=False, debug=None, print_error=None, profile=None, cache_type=None, cache_kwargs=None, validate_type_annotations=True, scope=None, default_resources=None, name=None, description=None)[source]#
Bases:
objectA pipeline container that supports multiple implementations (variants) of functions.
VariantPipelineallows you to define multiple implementations of functions and select which variant to use at runtime. This is particularly useful for:A/B testing different implementations
Experimenting with algorithm variations
Managing multiple processing options
Creating configurable pipelines
The pipeline can have multiple variant groups, where each group contains alternative implementations of a function. Functions can be assigned to variant groups using the
variantparameter which can be a single string (for the default group) or a dictionary mapping group names to variant names.All parameters below (except
functionsanddefault_variant) are simply passed to thePipelineconstructor when creating a new pipeline with the selected variant(s) using thewith_variantmethod.- Parameters:
default_variant (
str|dict[str|None,str] |None) β Default variant to use if none is specified inwith_variant. Either a single variant name or a dictionary mapping variant groups to variants.lazy (
bool) β Flag indicating whether the pipeline should be lazy.debug (
bool|None) β Flag indicating whether debug information should be printed. IfNone, the value of each PipeFuncβs debug attribute is used.print_error (
bool|None) β Flag indicating whether errors raised during the function execution should be printed. IfNone, the value of each PipeFuncβs print_error attribute is used.profile (
bool|None) β Flag indicating whether profiling information should be collected. IfNone, the value of each PipeFuncβs profile attribute is used. Profiling is only available for sequential execution.cache_type (
Literal['lru','hybrid','disk','simple'] |None) β The type of cache to use. See the notes below for more important information.cache_kwargs (
dict[str,Any] |None) β Keyword arguments passed to the cache constructor.validate_type_annotations (
bool) β Flag indicating whether type validation should be performed. IfTrue, the type annotations of the functions are validated during the pipeline initialization. IfFalse, the type annotations are not validated.If provided, all parameter names and output names of the pipeline functions will be prefixed with the specified scope followed by a dot (
'.'), e.g., parameterxwith scopefoobecomesfoo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use thePipeline.update_scopemethod.When providing parameter values for pipelines that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipelineinstance with scope βfooβ and βbarβ, the parameters can be provided as:pipeline(output_name, foo=dict(a=1, b=2), bar=dict(a=3, b=4))orpipeline(output_name, **{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).default_resources (
dict[str,Any] |None) β Default resources to use for the pipeline functions. IfNone, the resources are not set. Either a dict or apipefunc.resources.Resourcesinstance can be provided. If provided, the resources in thePipeFuncinstances are updated with the default resources.name (
str|None) β A name for the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.description (
str|None) β A description of the pipeline. If provided, it will be used to generate e.g., docs and MCP server descriptions.
Examples
Simple variant selection:
>>> @pipefunc(output_name="c", variant="add") ... def f(a, b): ... return a + b ... >>> @pipefunc(output_name="c", variant="sub") ... def f_alt(a, b): ... return a - b ... >>> @pipefunc(output_name="d") ... def g(b, c): ... return b * c ... >>> pipeline = VariantPipeline([f, f_alt, g], default_variant="add") >>> pipeline_add = pipeline.with_variant() # Uses default variant >>> pipeline_sub = pipeline.with_variant(select="sub") >>> pipeline_add(a=2, b=3) # (2 + 3) * 3 = 15 15 >>> pipeline_sub(a=2, b=3) # (2 - 3) * 3 = -3 -3
Multiple variant groups:
>>> @pipefunc(output_name="c", variant={"method": "add"}) ... def f1(a, b): ... return a + b ... >>> @pipefunc(output_name="c", variant={"method": "sub"}) ... def f2(a, b): ... return a - b ... >>> @pipefunc(output_name="d", variant={"analysis": "mul"}) ... def g1(b, c): ... return b * c ... >>> @pipefunc(output_name="d", variant={"analysis": "div"}) ... def g2(b, c): ... return b / c ... >>> pipeline = VariantPipeline( ... [f1, f2, g1, g2], ... default_variant={"method": "add", "analysis": "mul"} ... ) >>> # Select specific variants for each group >>> pipeline_sub_div = pipeline.with_variant( ... select={"method": "sub", "analysis": "div"} ... )
Notes
Functions without variants can be included in the pipeline and will be used regardless of variant selection.
When using
with_variant(), if all variants are resolved, a regularPipelineis returned. If some variants remain unselected, anotherVariantPipelineis returned.The
default_variantcan be a single string (if thereβs only one variant group) or a dictionary mapping variant groups to their default variants.Variants in the same group can have different output names, allowing for flexible pipeline structures.
See also
pipefunc.PipelineThe base pipeline class.
pipefunc.PipeFuncFunction wrapper that supports variants.
- copy(**kwargs)[source]#
Return a copy of the VariantPipeline.
- Parameters:
kwargs (
Any) β Keyword arguments passed to theVariantPipelineconstructor instead of the original values.- Return type:
- classmethod from_pipelines(*variant_pipeline)[source]#
Create a new
VariantPipelinefrom multiplePipelineinstances.This method constructs a
VariantPipelineby combining functions from multiplePipelineinstances, identifying common functions and assigning variants based on the input tuples.Each input tuple can either be a 2-tuple or a 3-tuple. - A 2-tuple contains:
(variant_name, pipeline). - A 3-tuple contains:(variant_group, variant_name, pipeline).Functions that are identical across all input pipelines (as determined by the is_identical_pipefunc function) are considered βcommonβ and are added to the resulting
VariantPipelinewithout any variant information.Functions that are unique to a specific pipeline are added with their corresponding variant information (if provided in the input tuple).
- Parameters:
*variant_pipeline (
tuple[str,str,Pipeline] |tuple[str,Pipeline]) β Variable number of tuples, where each tuple represents a pipeline and its associated variant information. Each tuple can be either: - (variant_name, pipeline): Specifies the variant name for all functions in the pipeline. The variant group will be set to None (default group). - (variant_group, variant_name, pipeline): Specifies both the variant group and variant name for all functions in the pipeline.- Return type:
- Returns:
A new
VariantPipelineinstance containing the combined functions from the input pipelines, with appropriate variant assignments.
Examples
>>> @pipefunc(output_name="x") ... def f(a, b): ... return a + b ... >>> @pipefunc(output_name="y") ... def g(x, c): ... return x * c ... >>> pipeline1 = Pipeline([f, g]) >>> pipeline2 = Pipeline([f, g.copy(func=lambda x, c: x / c)]) >>> variant_pipeline = VariantPipeline.from_pipelines( ... ("add_mul", pipeline1), ... ("add_div", pipeline2) ... ) >>> add_mul_pipeline = variant_pipeline.with_variant(select="add_mul") >>> add_div_pipeline = variant_pipeline.with_variant(select="add_div") >>> add_mul_pipeline(a=1, b=2, c=3) # (1 + 2) * 3 = 9 9 >>> add_div_pipeline(a=1, b=2, c=3) # (1 + 2) / 3 = 1.0 1.0
Notes
The is_identical_pipefunc function is used to determine if two
PipeFuncinstances are identical.If multiple pipelines contain the same function but with different variant information, the function will be included multiple times in the resulting
VariantPipeline, each with its respective variant assignment.
- visualize(**kwargs)[source]#
Visualize the VariantPipeline with interactive variant selection.
- Parameters:
kwargs (
Any) β Additional keyword arguments passed to thepipefunc.Pipeline.visualizemethod.- Return type:
- Returns:
The output of the widget.
- with_variant(select=None, **kwargs)[source]#
Create a new Pipeline or VariantPipeline with the specified variant selected.
- Parameters:
select (
str|dict[str|None,str] |None) β Name of the variant to select. If not provided, default_variant is used. Ifselectis a string, it selects a single variant if no ambiguity exists. Ifselectis a dictionary, it selects a variant for each variant group, where the keys are variant group names and the values are variant names. If a partial dictionary is provided (not covering all variant groups) and default_variant is a dictionary, it will merge the defaults with the selection.kwargs (
Any) β Keyword arguments for changing the parameters for a Pipeline or VariantPipeline.
- Return type:
- Returns:
A new Pipeline or VariantPipeline with the selected variant(s). If variants remain, a VariantPipeline is returned. If no variants remain, a Pipeline is returned.
- Raises:
ValueError β If the specified variant is ambiguous or unknown, or if an invalid variant type is provided.
- pipefunc.pipefunc(output_name, *, output_picker=None, renames=None, defaults=None, bound=None, profile=False, debug=False, print_error=True, cache=False, mapspec=None, internal_shape=None, post_execution_hook=None, resources=None, resources_variable=None, resources_scope='map', scope=None, variant=None, variant_group=None)[source]#
A decorator that wraps a function in a PipeFunc instance.
- Parameters:
output_name (
str|tuple[str,...]) β The identifier for the output of the decorated function. Provide a tuple of strings for multiple outputs.output_picker (
Callable[[Any,str],Any] |None) β A function that takes the output of the wrapped function as first argument and theoutput_name(str) as second argument, and returns the desired output. IfNone, the output of the wrapped function is returned as is.renames (
dict[str,str] |None) β A dictionary for renaming function arguments and outputs. The keys are the original names (as defined in the function signature or theoutput_name), and the values are the new names to be used. This allows you to change how the function is called without modifying its internal logic. For example,{"old_name": "new_name"}would allow the function to be called withnew_nameinstead ofold_name. If renaming theoutput_name, include it in this dictionary as well.defaults (
dict[str,Any] |None) β Set defaults for parameters. Overwrites any current defaults. Must be in terms of the renamed argument names.bound (
dict[str,Any] |None) β Bind arguments to the function. These are arguments that are fixed. Even when providing different values, the bound values will be used. Must be in terms of the renamed argument names.profile (
bool) β Flag indicating whether the decorated function should be profiled.debug (
bool) β Flag indicating whether debug information should be printed.print_error (
bool) β Flag indicating whether errors raised during the function execution should be printed.cache (
bool) β Flag indicating whether the decorated function should be cached.mapspec (
str|MapSpec|None) β This is a specification for mapping that dictates how input values should be merged together. IfNone, the default behavior is that the input directly maps to the output.internal_shape (
int|Literal['?'] |tuple[int|Literal['?'],...] |None) β The shape of the output produced by this function when it is used within a ``mapspec`` context. Can be an int or a tuple of ints, or β?β for unknown dimensions, or a tuple with a mix of both. If not provided, the shape will be inferred from the first execution of the function. If provided, the shape will be validated against the actual shape of the output. This parameter is required only when a mapspec like -> out[i] is used, indicating that the shape cannot be derived from the inputs. In case there are multiple outputs, provide the shape for one of the outputs. This works because the shape of all outputs are required to be identical.post_execution_hook (
Callable[[PipeFunc,Any,dict[str,Any]],None] |None) β A callback function that is invoked after the function is executed. The callback signature ishook(func: PipeFunc, result: Any, kwargs: dict) -> None. This hook can be used for logging, visualization of intermediate results, debugging, statistics collection, or other side effects. The hook is executed synchronously after the function returns but before the result is passed to the next function in the pipeline. Keep the hook lightweight to avoid impacting performance.resources (
dict|Resources|Callable[[dict[str,Any]],Resources|dict[str,Any]] |None) β A dictionary or Resources instance containing the resources required for the function. This can be used to specify the number of CPUs, GPUs, memory, wall time, queue, partition, and any extra job scheduler arguments. This is not used by thepipefuncdirectly but can be used by job schedulers to manage the resources required for the function. Alternatively, provide a callable that receives a dict with the input values and returns a Resources instance.resources_variable (
str|None) β If provided, the resources will be passed as the specified argument name to the function. This requires that the function has a parameter with the same name. For example, ifresources_variable="resources", the function will be called asfunc(..., resources=Resources(...)). This is useful when the function handles internal parallelization.resources_scope (
Literal['map','element']) βDetermines how resources are allocated in relation to the mapspec:
βmapβ: Allocate resources for the entire mapspec operation (default).
βelementβ: Allocate resources for each element in the mapspec.
If no mapspec is defined, this parameter is ignored.
If provided, all parameter names and output names of the function will be prefixed with the specified scope followed by a dot (
'.'), e.g., parameterxwith scopefoobecomesfoo.x. This allows multiple functions in a pipeline to have parameters with the same name without conflict. To be selective about which parameters and outputs to include in the scope, use thePipeFunc.update_scopemethod.When providing parameter values for functions that have scopes, they can be provided either as a dictionary for the scope, or by using the
f'{scope}.{name}'notation. For example, aPipeFuncinstance with scope βfooβ and βbarβ, the parameters can be provided as:func(foo=dict(a=1, b=2), bar=dict(a=3, b=4))orfunc(**{"foo.a": 1, "foo.b": 2, "bar.a": 3, "bar.b": 4}).variant (
str|dict[str|None,str] |None) βIdentifies this function as an alternative implementation in a
VariantPipelineand specifies which variant groups it belongs to. When multiple functions share the same output_name, variants allow selecting which implementation to use during pipeline execution.Can be specified in two formats: - A string (e.g.,
"fast"): Places the function in the default unnamedgroup (None) with the specified variant name. Equivalent to
{None: "fast"}.A dictionary (e.g.,
{"algorithm": "fast", "optimization": "level1"}): Assigns the function to multiple variant groups simultaneously, with a specific variant name in each group.
Functions with the same output_name but different variant specifications represent alternative implementations. The {meth}`VariantPipeline.with_variant` method selects which variants to use for execution. For example, you might have βpreprocessingβ variants (βv1β/βv2β) independent from βcomputationβ variants (βfastβ/βaccurateβ), allowing you to select specific combinations like
{"preprocessing": "v1", "computation": "fast"}.variant_group (
str|None) β DEPRECATED in v0.58.0: Use variant instead.
- Return type:
- Returns:
A wrapped function that takes the original function and
output_nameand creates aPipeFuncinstance with the specified return identifier.
See also
Examples
>>> @pipefunc(output_name="c") ... def add(a, b): ... return a + b >>> add(a=1, b=2) 3 >>> add.update_renames({"a": "x", "b": "y"}) >>> add(x=1, y=2) 3