Parallelism and Execution#
Have uv? ⚡
If you have uv installed, you can instantly open this page as a Jupyter notebook using opennb:
uvx --with "pipefunc[docs]" opennb pipefunc/pipefunc/docs/source/concepts/execution-and-parallelism.md
This command creates an ephemeral environment with all dependencies and launches the notebook in your browser in 1 second - no manual setup needed! ✨.
Alternatively, run:
uv run https://raw.githubusercontent.com/pipefunc/pipefunc/refs/heads/main/get-notebooks.py
to download all documentation as Jupyter notebooks.
What is the difference between pipeline.run and pipeline.map?#
These methods are used to execute the pipeline but have different use cases:
pipeline.run(output_name, kwargs)is used to execute the pipeline as a function and is fully sequential. It allows going from any input arguments to any output arguments. It does not support map-reduce operations. Internally, it keeps all intermediate results in memory in a dictionary.pipeline.map(inputs, ...)is used to execute the pipeline in parallel. It supports map-reduce operations and is optimized for parallel execution. Internally, it puts all intermediate results in aStorageBase(there are implementations for storing on disk or in memory).
Note
Internally, the pipeline.run method is called when using the pipeline as a function, the following are equivalent:
pipeline.run(output_name, kwargs)pipeline(output_name, **kwargs)f = pipeline.func(output_name)and thenf(**kwargs)
Here is a table summarizing the differences between pipeline.run and pipeline.map:
Feature |
|
|
|---|---|---|
Execution mode |
Sequential |
Parallel (any |
Map-reduce support (via |
No |
Yes |
Input arguments |
Can provide any input arguments for any function in the pipeline |
Requires the root arguments (use |
Output arguments |
Can request the output of any function in the pipeline |
Calculates all function nodes in the entire pipeline (use |
Intermediate results storage |
In-memory dictionary |
Configurable storage ( |
Use case |
Executing the pipeline as a regular function, going from any input to any output |
Optimized for parallel execution and map-reduce operations |
Calling syntax |
|
|
In summary, pipeline.run is used for sequential execution and allows flexible input and output arguments, while pipeline.map is optimized for parallel execution and map-reduce operations but requires structured inputs and outputs based on the mapspec of the functions.
Mixing executors and storage backends for I/O-bound and CPU-bound work#
You can mix different executors and storage backends in a pipeline.
Imagine that some PipeFuncs are trivial to execute, some are CPU-bound and some are I/O-bound.
You can mix different executors and storage backends in a pipeline.
Let’s consider an example where we have two PipeFuncs, f and g.
f is I/O-bound and g is CPU-bound.
We can use a ThreadPoolExecutor for f and a ProcessPoolExecutor for g.
We will store the results of f in memory and store the results of g in a file.
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import numpy as np
from pipefunc import Pipeline, pipefunc
import threading
import multiprocessing
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def f(x):
time.sleep(1) # Simulate I/O-bound work
return threading.current_thread().name
@pipefunc(output_name="z", mapspec="x[i] -> z[i]")
def g(x):
np.linalg.eig(np.random.rand(10, 10)) # CPU-bound work
return multiprocessing.current_process().name
pipeline = Pipeline([f, g])
inputs = {"x": [1, 2, 3]}
executor = {
"y": ThreadPoolExecutor(max_workers=2),
"": ProcessPoolExecutor(max_workers=2), # empty string means default executor
}
storage = {
"z": "file_array",
"": "dict", # empty string means default storage
}
results = pipeline.map(inputs, run_folder="run_folder", executor=executor, storage=storage)
# Get the results to check the thread and process names
thread_names = results["y"].output.tolist()
process_names = results["z"].output.tolist()
print(f"thread_names: {thread_names}")
print(f"process_names: {process_names}")
thread_names: ['ThreadPoolExecutor-0_0', 'ThreadPoolExecutor-0_1', 'ThreadPoolExecutor-0_0']
process_names: ['ForkServerProcess-1', 'ForkServerProcess-2', 'ForkServerProcess-1']
In both executor and storage you can use the special key "" to apply the default executor or storage.
Note
The executor supports any executor that is compliant with the concurrent.futures.Executor interface.
That includes:
concurrent.futures.ProcessPoolExecutorconcurrent.futures.ThreadPoolExecutoripyparallel.Client().executor()dask.distributed.Client().get_executor()mpi4py.futures.MPIPoolExecutor()loky.get_reusable_executor()executorlib.SingleNodeExecutor,executorlib.SlurmClusterExecutor,executorlib.SlurmJobExecutor,executorlib.FluxClusterExecutor,executorlib.FluxJobExecutor
How to use post-execution hooks?#
Post-execution hooks allow you to execute custom code after a function completes. This is useful for logging, debugging, or collecting statistics about function execution.
You can set a post-execution hook in two ways:
When creating a
PipeFuncusing thepost_execution_hookparameterWhen using the
@pipefuncdecorator
The hook function receives three arguments:
The
PipeFuncinstanceThe return value of the function
A dictionary of the input arguments
Here’s an example:
from pipefunc import pipefunc, Pipeline
def my_hook(func, result, kwargs):
print(f"Function {func.__name__} returned {result} with inputs {kwargs}")
@pipefunc(output_name="c", post_execution_hook=my_hook)
def f(a, b):
return a + b
# The hook will print after each execution
f(a=1, b=2) # Prints: Function f returned 3 with inputs {'a': 1, 'b': 2}
# Hooks also work in pipelines and with map operations
@pipefunc(output_name="d")
def g(c):
return c * 2
pipeline = Pipeline([f, g])
pipeline(a=1, b=2) # Hook is called when f executes in the pipeline
Function f returned 3 with inputs {'a': 1, 'b': 2}
Function f returned 3 with inputs {'a': 1, 'b': 2}
6
Post-execution hooks are particularly useful for:
Debugging: Print intermediate results and inputs
Logging: Record function execution details
Profiling: Collect timing or resource usage statistics
Validation: Check results or inputs meet certain criteria
Monitoring: Track pipeline progress
Note that hooks are executed synchronously after the function returns but before the result is passed to the next function in the pipeline. They should be kept lightweight to avoid impacting performance.
Running multiple map calls concurrently#
In some scenarios, you might need to run pipeline.map multiple times with different sets of inputs or even with different pipelines.
pipefunc provides a convenient way to manage and execute these concurrent map operations, giving you control over the degree of parallelism.
This is particularly useful when dealing with tasks that have varying computational requirements or when you want to orchestrate a series of related but independent parameter sweeps.
The core functions for this are launch_maps() and gather_maps().
The workflow is as follows:
Create a list of
AsyncMaprunners by callingmap_async()withstart=False. This prepares the map operations without immediately executing them.Pass these runners to
launch_mapsorgather_mapsto execute them.
launch_maps(): A non-blocking function ideal for interactive environments like Jupyter. It starts the execution in the background and returns anasyncio.Taskthat you canawaitlater.gather_maps(): A blockingasyncfunction that runs the maps and waits for all of them to complete before returning.
Let’s see an example:
from pipefunc import Pipeline, pipefunc
from pipefunc.helpers import launch_maps
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int) -> int:
return 2 * x
pipeline = Pipeline([double_it])
# Define two different sets of inputs with different sizes
inputs1 = {"x": [1, 2, 3]}
inputs2 = {"x": [4, 5, 6, 7]}
# 1. Prepare the runners
runners = [
pipeline.map_async(inputs1, start=False),
pipeline.map_async(inputs2, start=False),
]
# 2. Launch the maps concurrently
# This will run at most 2 maps at the same time.
task = launch_maps(*runners, max_concurrent=2)
In a Jupyter notebook, launch_maps will automatically display a tabbed widget to monitor the progress of each map operation.
To get the results, you can await the task in a later cell:
# In a new cell
results = await task
print(results[0]["y"].output)
print(results[1]["y"].output)
[2 4 6]
[8 10 12 14]
Controlling Concurrency#
The max_concurrent parameter in launch_maps and gather_maps controls how many of the map operations are allowed to run at the same time. For example, if you have 10 map operations to run but set max_concurrent=3, only three will execute in parallel at any given time.
Sequential Execution#
If you want to run the maps one after another, simply set max_concurrent=1.
task = launch_maps(*runners, max_concurrent=1)
This is useful when subsequent map operations might depend on the resources freed up by preceding ones, or when you want to avoid overloading a system.
Why run maps concurrently?#
This feature is beneficial in several situations:
Heterogeneous Workloads: When you have map operations with different input sizes or computational costs.
Resource Constraints: When the design of your
pipefuncs requires that all maps produce arrays of the same shape, but your tasks naturally have different input sizes. Running them as separate map calls allows you to handle this.Complex Workflows: For orchestrating multiple, independent parameter sweeps as part of a larger computational experiment.
By using launch_maps, you can manage these complex scenarios with simple, readable code while retaining fine-grained control over the execution.