Tutorial for Pipefunc Package#
The pipefunc package is a Python library designed to simplify the creation and execution of function pipelines. It allows you to define functions as pipeline steps, automatically managing dependencies and execution order. In this tutorial, we will guide you through the key features of pipefunc, including sequential and parallel execution, map-reduce operations, and advanced functionalities.
This page is a Jupyter notebook, executed and rendered in the official documentation.
Have uv? ⚡
If you have uv installed, you can instantly open this page as a Jupyter notebook using opennb:
uvx --with "pipefunc[docs]" opennb pipefunc/pipefunc/example.ipynb
This command creates an ephemeral environment with all dependencies and launches the notebook in your browser in 1 second - no manual setup needed! ✨.
Alternatively, run:
uv run https://raw.githubusercontent.com/pipefunc/pipefunc/refs/heads/main/get-notebooks.py
to download all documentation as Jupyter notebooks.
High level overview#
The pipefunc package allows to create reusable and callable pipelines.
A
Pipelinecontains a list ofPipeFuncobjects.At its core, these
PipeFuncobjects only contain a function and an output name.You can create a
PipeFuncobject directly or using the@pipefuncdecorator.The
Pipelinewill automatically connect all functions based on the output names and function inputs.
Building a Simple Pipeline#
Let’s start by importing pipefunc and Pipeline from the pipefunc module.
from pipefunc import PipeFunc, Pipeline, pipefunc
We then define some functions using the @pipefunc decorator.
The @pipefunc decorator turns these functions into pipeline steps.
For each function, we specify an output_name which will be used to refer to the output of that function in the pipeline.
@pipefunc(output_name="c")
def f(a, b):
return a + b
@pipefunc(output_name="d")
def g(b, c, x=1): # "c" is the output of f
return b * c * x
@pipefunc(output_name="e")
def h(c, d, x=1): # "d" is the output of g
return c * d * x
We now have three functions f, g, and h, which we can use to build a pipeline.
We create a Pipeline object passing the list of functions.
We can also enable debugging, profiling, and caching for the entire pipeline:
pipeline = Pipeline(
[f, g, h],
debug=True, # optionally print debug information
profile=True, # optionally profile the pipeline
cache_type="hybrid", # optionally cache the pipeline
)
Now, we have a pipeline that only requires a and b as inputs and uses the outputs of the functions and automatically passes them as inputs to the next function.
Don’t want to use the @pipefunc decorator? No problem! You can create a PipeFunc object directly:
@pipefunc(output_name="c")
def f(a, b):
return a + b
# is equivalent to
def f(a, b):
return a + b
f = PipeFunc(f, output_name="c")
Visualizing the Pipeline#
You can visualize your pipeline using the visualize() method, and print the nodes in the graph using the graph.nodes attribute.
Interactive visualization with graphviz-anywidget
In a live Jupyter notebook, the output below allows interaction with the pipeline visualization.
You will be able to zoom by scrolling, pan by dragging the image, and click on nodes to highlight all connected nodes. Click Escape to reset the view.
print("Graph nodes:", pipeline.graph.nodes)
pipeline.visualize()
Graph nodes: [PipeFunc(f), 'a', 'b', PipeFunc(g), 'x', PipeFunc(h)]
Executing the Pipeline#
There are two ways to execute the pipeline:
Call the pipeline as a function (sequentially) and get a specific output:
pipeline(output_name, **kwargs)pipeline.run(output_name, kwargs)
Evaluate the entire pipeline (parallel) including map-reduce operations:
pipeline.map(kwargs)
We start with calling the pipeline directly and then introduce the map method.
See this documentation page for more information on the difference between run and map.
Using pipeline(...) (Sequential Execution)#
If the pipeline has a unique leaf node (single final output), then we can directly call the pipeline object with the input arguments.
pipeline(a=1, b=2)
2026-01-10 00:45:50.366972 - Function returning 'c' was invoked as `f(a=1, b=2)` and returned `3`. The execution time was 6.99e-05 seconds on average.
2026-01-10 00:45:50.367481 - Function returning 'd' was invoked as `g(x=1, b=2, c=3)` and returned `6`. The execution time was 3.66e-05 seconds on average.
2026-01-10 00:45:50.367651 - Function returning 'e' was invoked as `h(x=1, c=3, d=6)` and returned `18`. The execution time was 5.32e-05 seconds on average.
18
# The above returns the output for:
pipeline.unique_leaf_node
PipeFunc(h)
We can also specify the desired output as the first argument of the pipeline function:
print("`e` is:", pipeline("e", a=1, b=2))
print("`d` is:", pipeline("d", a=1, b=2))
2026-01-10 00:45:50.375168 - Function returning 'c' was invoked as `f(a=1, b=2)` and returned `3`. The execution time was 2.73e-04 seconds on average.
2026-01-10 00:45:50.375443 - Function returning 'd' was invoked as `g(x=1, b=2, c=3)` and returned `6`. The execution time was 5.05e-05 seconds on average.
2026-01-10 00:45:50.375595 - Function returning 'e' was invoked as `h(x=1, c=3, d=6)` and returned `18`. The execution time was 5.26e-05 seconds on average.
`e` is: 18
2026-01-10 00:45:50.375819 - Function returning 'c' was invoked as `f(a=1, b=2)` and returned `3`. The execution time was 1.99e-04 seconds on average.
2026-01-10 00:45:50.375950 - Function returning 'd' was invoked as `g(x=1, b=2, c=3)` and returned `6`. The execution time was 4.76e-05 seconds on average.
`d` is: 6
Using pipeline.run(...) (Sequential Execution)#
Similar to calling the pipeline object, we can use the run method to execute the pipeline.
Note: The
pipeline(...)call is just a wrapper around therunmethod.
result = pipeline.run("e", kwargs={"a": 1, "b": 2})
print(result)
2026-01-10 00:45:50.379280 - Function returning 'c' was invoked as `f(a=1, b=2)` and returned `3`. The execution time was 2.01e-04 seconds on average.
2026-01-10 00:45:50.379642 - Function returning 'd' was invoked as `g(x=1, b=2, c=3)` and returned `6`. The execution time was 4.50e-05 seconds on average.
2026-01-10 00:45:50.379819 - Function returning 'e' was invoked as `h(x=1, c=3, d=6)` and returned `18`. The execution time was 5.40e-05 seconds on average.
18
or get all function outputs and inputs by specifying full_output=True:
result = pipeline.run("e", kwargs={"a": 1, "b": 2}, full_output=True)
print(result)
2026-01-10 00:45:50.383200 - Function returning 'c' was invoked as `f(a=1, b=2)` and returned `3`. The execution time was 2.26e-04 seconds on average.
2026-01-10 00:45:50.383460 - Function returning 'd' was invoked as `g(x=1, b=2, c=3)` and returned `6`. The execution time was 4.74e-05 seconds on average.
2026-01-10 00:45:50.383598 - Function returning 'e' was invoked as `h(x=1, c=3, d=6)` and returned `18`. The execution time was 5.05e-05 seconds on average.
{'a': 1, 'b': 2, 'c': 3, 'd': 6, 'e': 18}
Using pipeline.map(...) (Parallel Execution)#
pipeline.map allows you to execute your pipeline over a set of inputs in parallel.
Note: The
mapspecargument in the@pipefuncdecorator defines how inputs are mapped to outputs.
Note: ⚠️ The mapping computation of the pipeline is done in parallel using the
concurrent.futures.ProcessPoolExecutorwheneverpipeline.map(..., parallel=True)(default).
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
assert isinstance(x, int)
return 2 * x
pipeline_double = Pipeline([double_it])
inputs = {"x": [1, 2, 3, 4, 5]}
run_folder = "my_run_folder" # save the results in this folder
result = pipeline_double.map(inputs, run_folder)
print(result["y"].output)
[2 4 6 8 10]
Syntax of mapspec:
input1[i], input2[j] -> output[i, j]
iandjare indices over which the function maps.input1[i]means the function will receiveinput1at indexi.output[i, j]means the function will produceoutputwith indicesiandj.
Instead of defining mapspec manually, you can use the add_mapspec_axis method on the pipeline object:
# Take `pipeline` defined above and add a 2D mapspec
pipeline2 = pipeline.copy()
pipeline2.debug = False # Turn off debugging print statements
pipeline2.add_mapspec_axis("a", axis="i")
pipeline2.add_mapspec_axis("b", axis="j")
run_folder = "my_run_folder"
result = pipeline2.map({"a": [1, 2], "b": [3, 4]}, run_folder, show_progress=True)
result["e"].output # This is now a 2D array
/home/docs/checkouts/readthedocs.org/user_builds/pipefunc/checkouts/930/pipefunc/map/_run.py:229: UserWarning: `profile=True` is not supported with `parallel=True` using process-based executors.
prep = prepare_run(
array([[48, 100],
[75, 144]], dtype=object)
The methods above will automatically generate the mapspec for you, which is now:
pipeline2.mapspecs_as_strings
['a[i], b[j] -> c[i, j]',
'c[i, j], b[j] -> d[i, j]',
'd[i, j], c[i, j] -> e[i, j]']
The pipeline.map method is powerful and can handle complex map-reduce operations, which we will demonstrate next.
Map-reduce or fan-in / fan-out operations#
The script below demonstrates a two-step pipeline: doubling each integer in an input list, followed by summing all the doubled values.
import numpy as np
from pipefunc import Pipeline, pipefunc
from pipefunc.typing import Array
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
assert isinstance(x, int)
return 2 * x
@pipefunc(output_name="sum") # no mapspec, so receives y[:] as input
def take_sum(y: Array[int]) -> int:
assert isinstance(y, np.ndarray)
return sum(y)
pipeline_map = Pipeline([double_it, take_sum])
pipeline_map.visualize()
What is mapspec?
In double_it, mapspec="x[i] -> y[i]" specifies that each element i of the input array x is independently processed to produce the corresponding element i in the output array y.
Because take_sum does not have a mapspec, it receives the entire array y for aggregation.
Note that the mapspecs are present in the plot. For example, x is now x[i].
inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
results = pipeline_map.map(inputs, run_folder=run_folder)
Check the results in the resulting dict
assert results["y"].output.tolist() == [0, 2, 4, 6]
assert results["sum"].output == 12
Or load the outputs from disk
from pipefunc.map import load_outputs
assert load_outputs("y", run_folder=run_folder).tolist() == [0, 2, 4, 6]
assert load_outputs("sum", run_folder=run_folder) == 12
Or also load from disk but as an xarray.Dataset:
from pipefunc.map import load_xarray_dataset
load_xarray_dataset(run_folder=run_folder)
<xarray.Dataset> Size: 72B
Dimensions: (i: 4)
Coordinates:
x (i) object 32B 0 1 2 3
Dimensions without coordinates: i
Data variables:
y (i) object 32B 0 2 4 6
sum object 8B 12Advanced features#
Below are some advanced features of the pipefunc package.
You will find more features in the FAQ.
Working with Resources Report#
The print_profiling_stats() method of the pipeline provides useful information on the performance of the functions in the pipeline such as CPU usage, memory usage, average time, and the number of times each function was called.
This feature is only available if profile=True when creating the pipeline.
# This will print the number of times each function was called
# CPU, memory, and time usage is also reported
pipeline.print_profiling_stats()
Resource Usage Report:
---------+-------------------+-----------------------+--------------+----------------+----------------
Function | Avg CPU Usage (%) | Max Memory Usage (MB) | Avg Time (s) | Total Time (%) | Number of Calls
-------- | ----------------- | --------------------- | ------------ | -------------- | ---------------
f | 0.00 | 148.57 | 2.26e-04 | 72.07 | 5
g | 0.00 | 148.82 | 4.74e-05 | 15.09 | 5
h | 0.00 | 0.00 | 5.05e-05 | 12.85 | 4
---------+-------------------+-----------------------+--------------+----------------+----------------
This report can be beneficial in performance tuning and identifying bottlenecks in your pipeline. You can identify which functions are consuming the most resources and adjust your pipeline accordingly.
You can also look all the stats directly with:
pipeline.profiling_stats
{'f': ProfilingStats(cpu=ResourceStats(num_executions=1, average=0.0000e+00, max=0.0000e+00, std=0.0000e+00), memory=ResourceStats(num_executions=1, average=1.5578e+08, max=1.5578e+08, std=0.0000e+00), time=ResourceStats(num_executions=5, average=2.2645e-04, max=4.7616e-04, std=1.7949e-04)),
'g': ProfilingStats(cpu=ResourceStats(num_executions=2, average=0.0000e+00, max=0.0000e+00, std=0.0000e+00), memory=ResourceStats(num_executions=2, average=1.5598e+08, max=1.5605e+08, std=9.2682e+04), time=ResourceStats(num_executions=5, average=4.7406e-05, max=6.4360e-05, std=1.2518e-05)),
'h': ProfilingStats(cpu=ResourceStats(num_executions=0, average=0.0000e+00, max=0.0000e+00, std=0.0000e+00), memory=ResourceStats(num_executions=0, average=0.0000e+00, max=0.0000e+00, std=0.0000e+00), time=ResourceStats(num_executions=4, average=5.0460e-05, max=5.6880e-05, std=7.4489e-06))}
Handling Multiple Outputs#
Functions can return multiple results at once.
The output_name argument allows you to specify multiple outputs by providing a tuple of strings.
By default, this assumes the output is a tuple.
However, if you provide a output_picker function, you can return any type of object.
As long as the output name can be used to get the desired output from the returned object.
from pipefunc import Pipeline, pipefunc
# Returns 2 outputs as a tuple: 'c' and 'const'.
@pipefunc(output_name=("c", "const"))
def add_ab(a, b):
return (a + b, 1)
def get_dict_output(output, key):
return output[key]
# Function that returns a dictionary, output_picker is used
# to pick out "d" and "e".
@pipefunc(output_name=("d", "e"), output_picker=get_dict_output)
def mul_bc(b, c, x=1):
return {"d": b * c, "e": x}
# Function returns an object with attributes 'g' and 'h'.
# output_picker is used to pick out 'g' and 'h'.
@pipefunc(output_name=("g", "h"), output_picker=getattr)
def calc_cde(c, d, e, x):
from types import SimpleNamespace
return SimpleNamespace(g=c * d * x, h=c + e)
# Define a function add_gh with a single output 'i'.
@pipefunc(output_name="i")
def add_gh(e, g):
return e + g
# Create a pipeline with the defined functions and visualize it.
pipeline_multiple = Pipeline([add_ab, mul_bc, calc_cde, add_gh])
pipeline_multiple.visualize()
pipeline_multiple(a=1, b=2, x=3)
57
Using the renames Feature#
The renames attribute in pipefunc allows you to rename the inputs and outputs of a function before passing them to the next step in the pipeline.
This can be particularly useful when the same function is used multiple times in a pipeline, or when you want to provide more meaningful names to the inputs and outputs.
In the example below, we demonstrate how to use the renames attribute to rename the inputs of a function before they are passed to the next step in the pipeline.
⚠️ Instead of using the
@pipefuncdecorator (which createspipefunc.PipeFuncobject), we will createPipeFuncobjects directly and specify therenamesattribute.
from pipefunc import PipeFunc, Pipeline
def prod(a, b):
return a * b
def subtract(a, b):
return a - b
# We're going to use these functions multiple times in the pipeline
functions = [
PipeFunc(prod, output_name="prod1"),
PipeFunc(prod, output_name="prod2", renames={"a": "x", "b": "y"}),
PipeFunc(subtract, output_name="delta1", renames={"a": "prod1", "b": "prod2"}),
PipeFunc(subtract, output_name="delta2", renames={"a": "prod2", "b": "prod1"}),
PipeFunc(prod, output_name="result", renames={"a": "delta1", "b": "delta2"}),
]
pipeline_renames = Pipeline(functions)
inputs = {"a": 1, "b": 2, "x": 3, "y": 4}
results = pipeline_renames("result", **inputs)
# Output the results
print("Results:", results)
pipeline_renames.visualize()
Results: -100
Explanation:
Function Definitions:
prod(a, b): Multiples two numbers and returns the result.subtract(a, b): Subtractsbfromaand returns the result.
Pipeline Construction:
We are just using the prod and subtract functions multiple times, but change the names of the inputs and outputs to create a pipeline from it.
One can also apply the renames afterwards using the update_renames method. Or even to the entire pipeline, like:
pipeline_renames2 = pipeline_renames.copy()
pipeline_renames2.update_renames(
{
"a": "aa",
"b": "bb",
"x": "xx",
"y": "yy",
"result": "final_result", # Rename the `output_name` of the last function
},
update_from="current", # update from the current renames, not the original
)
pipeline_renames2(aa=1, bb=2, xx=3, yy=4)
-100
Also check out these Pipeline methods:
Pipeline.update_defaultsPipeline.update_bound
and these PipeFunc methods:
PipeFunc.update_renamesPipeFunc.update_defaultsPipeFunc.update_bound
Custom Parallelism#
By default when pipeline.map(..., parallel=True) is used, the pipeline is executed in parallel using the concurrent.futures.ProcessPoolExecutor. However, you can also specify a custom executor to control the parallelism of the pipeline execution.
It works with any custom executor that has the concurrent.futures.Executor interface, so for example it works with:
concurrent.futures.ProcessPoolExecutorconcurrent.futures.ThreadPoolExecutoripyparallel.Client().executor()dask.distributed.Client().get_executor()mpi4py.futures.MPIPoolExecutor()loky.get_reusable_executor()executorlib.SingleNodeExecutor,executorlib.SlurmClusterExecutor,executorlib.SlurmJobExecutor,executorlib.FluxClusterExecutor,executorlib.FluxJobExecutor
To just change the number of cores while using the default executor, use
import datetime
import time
from concurrent.futures import ProcessPoolExecutor
from pipefunc import Pipeline, pipefunc
@pipefunc(output_name="double", mapspec="x[i] -> double[i]")
def double_it(x: int) -> int:
print(f"{datetime.datetime.now()} - Running double_it for x={x}")
time.sleep(1)
return 2 * x
@pipefunc(output_name="half", mapspec="x[i] -> half[i]")
def half_it(x: int) -> int:
print(f"{datetime.datetime.now()} - Running half_it for x={x}")
time.sleep(1)
return x // 2
@pipefunc(output_name="sum")
def take_sum(half: np.ndarray, double: np.ndarray) -> int:
print(f"{datetime.datetime.now()} - Running take_sum")
return sum(half + double)
pipeline_parallel = Pipeline([double_it, half_it, take_sum])
inputs = {"x": [0, 1, 2, 3]}
run_folder = "my_run_folder"
executor = ProcessPoolExecutor(max_workers=8) # use 8 processes
results = pipeline_parallel.map(
inputs,
run_folder=run_folder,
parallel=True,
executor=executor,
storage="shared_memory_dict",
)
print(results["sum"].output)
14
⚠️ In this pipeline,
double_itandhalf_itare doubly parallel; both the map is parallel and the two functions are executed at the same time, note the timestamps and thesleep()calls. See thevisualize()output to see the structure of the pipeline.
pipeline_parallel.visualize()
Combining Pipelines#
Different pipelines can be combined into a single pipeline using the Pipeline.join method or the | operator.
Note
tl;dr
Use pipeline1 | pipeline2 to join two pipelines.
In cases the output names and arugments do not match up, we can rename the parameters of an entire pipeline using the update_renames method.
from pipefunc import Pipeline, pipefunc
@pipefunc(output_name="c")
def f(a, b):
return a + b
@pipefunc(output_name="d")
def g(b, c, x=1):
return b + c + x
pl1 = Pipeline([f, g])
@pipefunc(output_name="e")
def h(cc, dd, xx=2):
return cc + dd + xx
pl2 = Pipeline([h])
# We now have two pipelines, `pl1` and `pl2`, that we want to combine
# into a single pipeline. However, they have different inputs and defaults.
# Let's update the renames and defaults of `pl2` to match `pl1`.
pl2_ = pl2.copy()
pl2_.update_renames({"cc": "c", "dd": "d", "xx": "x"})
pl2_.update_defaults({"x": 1})
combined_pipeline = pl1 | pl2_ # or use `pl1.combine(pl2_)`
combined_pipeline.visualize()
# The combined pipeline can now be used as a single pipeline
result = combined_pipeline(a=2, b=3, x=2)
print(result) # Output: 17
17
Just to see another quick example of combining pipelines (even though it makes no sense to combine these pipelines):
pipeline_silly = pipeline_renames | combined_pipeline
pipeline_silly.visualize()
# e.g., if we want to get the output of `result` in the `pipeline` (not the leaf node!):
pipeline_silly("result", a=1, b=2, y=3)
-1
Caching Results#
To enable caching, simply set the cache attribute to True for each function.
This can be useful to avoid recomputing results when calling the same function with the same arguments multiple times.
Note
Some cache types support shared memory, which means that the cache can be shared between different processes when running in parallel.
@pipefunc(output_name="y", cache=True)
def my_function(a: int, b: int) -> int:
time.sleep(1) # Pretend this is a slow function
print("Function is called!")
return a + b
# multiple cache_type options are available, e.g., "lru", "hybrid", "disk", and "simple"
pipeline_cache = Pipeline([my_function], cache_type="lru")
# lets call the function 10 times with the same arguments
for _ in range(10):
pipeline_cache(a=2, b=3)
Function is called!
print(f"Cache object: {pipeline_cache.cache}")
pipeline_cache.cache.cache
Cache object: <pipefunc.cache.LRUCache object at 0x7fbd9330c050>
{('y', (('a', 2), ('b', 3))): 5}
The cache is populated even when using parallel execution. To see the cache, you can use the cache attribute on the pipeline.
Note
If calling the pipeline like a function (in contrast to using pipeline.map) keys of the cache are always in terms of the root arguments of the pipeline. When using pipeline.map, the keys are in terms of the arguments of the function.
The key is constructed from the function name and the (root) arguments passed to the function. If the arguments are not hashable, the pipefunc.cache.to_hashable function is used to attempt to convert them to a hashable form.
One can also enable caching after the pipeline is created by setting the cache attribute to True for each function.
for f in pipeline.functions:
f.cache = True
Function Argument Combinations#
As we showed in the first example, we can call the functions in the pipeline by either providing the root inputs or by providing the output of the previous function ourselves.
To see all the possible combinations of arguments that can be passed to each function, you can use the all_arg_combinations property. This will return a dictionary, with function output names as keys and sets of argument tuples as values.
all_args = pipeline.all_arg_combinations
assert all_args == {
# means we can call `pipeline("c", a=1, b=2)`
"c": {("a", "b")},
# means we can call `pipeline("d", a=1, b=2, x=3)` or `pipeline("d", b=2, c=3, x=4)`
"d": {("a", "b", "x"), ("b", "c", "x")},
# means we can call `pipeline("e", a=1, b=2, x=3)` or `pipeline("e", b=2, d=3, x=4)`, etc.
"e": {("a", "b", "x"), ("a", "b", "d", "x"), ("b", "c", "x"), ("c", "d", "x")},
}
# We can get root arguments for a specific function
assert pipeline.root_args("e") == ("a", "b", "x")
More mapspec Examples#
This section shows additional mapspec examples.
Cross-product of two inputs#
This example shows how to compute the outer product of two input vectors (x and y) and then aggregate the resulting matrix along rows, and finally reduce the computation to a single float by taking the norm of the resulting aggregated vector.
from pipefunc import Pipeline, pipefunc
@pipefunc(output_name="z", mapspec="x[i], y[j] -> z[i, j]")
def multiply_elements(x: int, y: int) -> int:
"""Multiply two integers."""
return x * y
@pipefunc(output_name="aggregated", mapspec="z[i, :] -> aggregated[i]")
def aggregate_rows(z: np.ndarray) -> np.ndarray:
"""Sum the elements of each row in matrix z."""
return np.sum(z)
@pipefunc(output_name="norm")
def compute_norm(aggregated: np.ndarray) -> float:
"""Compute the Euclidean norm of the vector aggregated."""
return np.linalg.norm(aggregated)
pipeline_norm = Pipeline([multiply_elements, aggregate_rows, compute_norm])
inputs = {"x": [1, 2, 3], "y": [4, 5, 6]}
results = pipeline_norm.map(inputs, run_folder="my_run_folder")
print("Norm of the aggregated sums:", results["norm"].output)
Norm of the aggregated sums: 56.124860801609124
pipeline_norm.visualize()
Explanation:
Matrix Creation (
multiply_elements):Each combination of elements from arrays
xandyis multiplied to form the matrixz. Themapspec"x[i], y[j] -> z[i, j]"ensures that every pair of elements is processed to generate a 2D matrix.
Row Aggregation (
aggregate_rows):The matrix
zis then processed row by row to sum the values, creating an aggregated result for each row. Themapspec"z[i, :] -> aggregated[i]"directs the pipeline to apply the summation across each row, transforming a 2D array into a 1D array of row sums.
Vector Norm Calculation (
compute_norm):Finally, the norm of the aggregated vector is computed, providing a single scalar value that quantifies the magnitude of the vector formed from row sums. This step does not require a
mapspecas it takes the entire output from the previous step and produces a single output.
Dynamic Output Shapes and internal_shapes#
In most cases, pipefunc automatically infers the output shape of each function based on the mapspec and the input shapes.
However, use the internal_shapes argument if a function returns an iterable/array that the next function will iterate over using a mapspec.
The most common case is when the mapspec of the first function is ... -> output1[i] and the mapspec of the second function is output1[i] -> output2[i].
How to use internal_shapes:
Provide a tuple in
@pipefunc(internal_shape=(...))representing the shape of the output of that function. You can use?for unknown dimensions.Provide a dictionary in
pipeline.map(internal_shapes={...})where keys are function output names, and values are tuples representing the shape added by that function. You can use?for unknown dimensions.Or omit
internal_shapesand letpipefuncinfer the shapes automatically (missing out on some consistency checks).
Minimal example:
@pipefunc(output_name="x", internal_shape=(10, 20)) # or `internal_shape=("?", "?")`
def generate_ints() -> np.ndarray:
return np.ones((10, 20))
# or
pipeline.map(..., internal_shapes={"x": (10, 20)}) # or `internal_shapes={"x": ("?", "?")}`
Full example:
We generate a list of integers with a length determined by an input parameter n.
from pipefunc import Pipeline, pipefunc
from pipefunc.typing import Array
@pipefunc(output_name="x")
def generate_ints(n: int) -> list[int]:
"""Generate a list of integers from 0 to n-1."""
return list(range(n))
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
"""Double the input integer."""
return 2 * x
@pipefunc(output_name="sum")
def take_sum(y: Array[int]) -> int:
"""Sum a list of integers."""
return sum(y)
pipeline_sum = Pipeline([generate_ints, double_it, take_sum])
Autogenerated MapSpec for `generate_ints(...) → x`: `... -> x[i]`
Here, generate_ints creates a list of length n.
In the function double_it, we map over the resulting list and double each element.
Note that PipeFunc automatically generated the mapspec="... -> x[i]" for generate_ints, which means that the output is an array with index i that can be mapped over in the double_it function.
We indicate that the output is a 1D array with an unknown number of elements by doing either:
setting the
internal_shapeargument of thegenerate_intsdecorator to@pipefunc(output_name="x", internal_shapes="?"), orby providing a dictionary to the
internal_shapesargument inpipeline.map:
Using option 2:
inputs = {"n": 4}
results = pipeline_sum.map(
inputs,
run_folder="my_run_folder",
internal_shapes={"x": ("?",)}, # Or if we know the shape of the output `{"x": (4,)}`
)
print("Sum of doubled integers:", results["sum"].output)
Sum of doubled integers: 12
Or we can omit the internal_shapes argument and let pipefunc infer the shapes automatically:
results = pipeline_sum.map(inputs, run_folder="my_run_folder")
print("Sum of doubled integers:", results["sum"].output)
Sum of doubled integers: 12
Zipped inputs#
This pipeline processes zipped inputs x and y with independent z to compute a function across all combinations, producing a 2D matrix r.
from pipefunc import Pipeline, pipefunc
@pipefunc(output_name="r")
def process_elements(x: int, y: int, z: int) -> float:
return x * y + z
pipeline_proc = Pipeline([(process_elements, "x[a], y[a], z[b] -> r[a, b]")])
inputs = {"x": [1, 2, 3], "y": [4, 5, 6], "z": [7, 8]}
results = pipeline_proc.map(inputs, run_folder="my_run_folder")
output_matrix = results["r"].output
print("Output Matrix:\n", output_matrix)
Output Matrix:
[[11 12]
[17 18]
[25 26]]
Explanation:
Function
process_elements:Takes three inputs:
x,y, andz. For each pair(x[a], y[a]), the function is applied with eachz[b].
Pipeline Definition:
The
mapspec"x[a], y[a], z[b] -> r[a, b]"specifies how elements from the inputs are to be combined. It states that each element from the paired inputsxandy(indexed bya) should be processed with each element fromz(indexed byb), resulting in a 2D output arrayr.
Outputs:
The output
ris a 2-dimensional matrix where the dimensions are determined by the lengths ofx/yandz. Each element of this matrix represents the computation result for a specific combination of inputs.
Nesting Pipelines for Modularity and Reusability#
pipefunc allows you to create modular and reusable pipeline components by nesting pipelines within each other using the pipefunc.NestedPipeFunc class or the nest_funcs() method. This is particularly useful for:
Encapsulating a sequence of steps that logically belong together.
Reusing a part of a pipeline in multiple projects or within a larger pipeline.
Abstracting away internal details of a complex sub-process.
Selectively avoid returning intermediate results when using
pipeline.map(e.g., to prevent serializing large objects and passing it around).
Creating Nested Pipelines:
You can manually create a NestedPipeFunc by passing a list of functions to its constructor.
However, a potentially more convenient way is to use the pipefunc.Pipeline.nest_funcs() method, which allows you to combine existing functions within a pipeline into a nested one:
from pipefunc import Pipeline, pipefunc
@pipefunc(output_name="c")
def f1(a, b):
return a + b
@pipefunc(output_name="d")
def f2(c):
return c * 2
@pipefunc(output_name="e")
def f3(d, x):
return d + x
pipeline = Pipeline([f1, f2, f3])
# Nest f1 and f2 into a single NestedPipeFunc
nested_pipeline = pipeline.copy()
nested_func = nested_pipeline.nest_funcs(
{"c", "d"},
new_output_name="d", # Only returns "d" and not "c"
function_name="f1_f2",
)
nested_pipeline.visualize()
This creates a nested_pipeline where f1 and f2 are combined into a NestedPipeFunc named f1_f2.
The new nested function only returns "d" and not "c".
The new_output_name must be a subset of the outputs of the nested pipeline.
You can optionally specify the name of the function using the function_name argument.
Inspecting the Nested Pipeline:
The nested_func object contains its own internal pipeline, accessible via the pipeline attribute:
nested_func.pipeline.visualize()
Using the Nested Pipeline:
You can now use the nested_pipeline like any other pipeline. When executed, the NestedPipeFunc will run its internal pipeline, taking the required inputs and producing the specified output.
result = nested_pipeline(a=1, b=2, x=3)
print(f"{result=}")
nested_result = nested_func(a=1, b=2)
print(f"{nested_result=}")
result=9
nested_result=6
Limitations with mapspec:
While NestedPipeFunc offers powerful modularity, there are limitations when using it with mapspec:
No Map-Reduce Operations: The
mapspecof functions within aNestedPipeFunccannot contain reductions (e.g.,x[i, j] -> y[i]).No Dynamic Axis Generation: The
mapspeccannot dynamically generate new axes (e.g.,... -> out[i]). In other words, it cannot return an output with aninternal_shape.Allowed
mapspecs: You can use mapspecs that do not reduce or create new axes.Bound arguments: The
boundarguments do not appear as parameters to the nested pipeline. To update the bound arguments, usenested_func.pipeline["output_name"].update_bound(...).
These limitations stem from the fact that the nested pipeline is treated as a single unit, and its internal operations are not directly exposed to the outer pipeline’s mapping logic.
Benefits of Nesting (Despite Limitations):
Modularity: Create self-contained, reusable pipeline components.
Abstraction: Hide internal complexity behind a well-defined interface.
Reusability: Easily integrate nested pipelines into other projects or larger workflows.
Clarity: Improve the overall structure and readability of your pipelines.
Control over intermediate results: When using
pipeline.map, usenest_funcsto avoid returning intermediate results.
Full Examples#
Example: Physics based example#
This section has been moved to the Physics based example page.
Example: Sensor Data Processing Pipeline#
This section has been moved to the Sensor Data Processing Pipeline page.
Example: Image Processing Workflow Example with mapspec#
This section has been moved to the Image Processing Workflow Example with mapspec page.
Example: Natural Language Processing Pipeline for Text Summarization#
This section has been moved to the Natural Language Processing Pipeline for Text Summarization page.
Example: Weather Simulation and Analysis Pipeline with xarray#
This section has been moved to the Weather Simulation and Analysis Pipeline with xarray page.