Error Handling#
Have uv? ⚡
If you have uv installed, you can instantly open this page as a Jupyter notebook using opennb:
uvx --with "pipefunc[docs]" opennb pipefunc/pipefunc/docs/source/concepts/error-handling.md
This command creates an ephemeral environment with all dependencies and launches the notebook in your browser in 1 second - no manual setup needed! ✨.
Alternatively, run:
uv run https://raw.githubusercontent.com/pipefunc/pipefunc/refs/heads/main/get-notebooks.py
to download all documentation as Jupyter notebooks.
Overview#
pipefunc exposes two error-handling strategies when running pipelines:
error_handling="raise"(default): stop on the first failure and raise the underlying exception.error_handling="continue": convert failures raised inside user-definedPipeFunccallables intoErrorSnapshotobjects so the run can continue and downstream tasks can inspect the failure. Infrastructure issues (e.g. pickling the return value, shape validation, storage I/O) still abort the run regardless of the setting.
The sections below show how to inspect snapshots, how propagation works, and why the behaviour is consistent across synchronous, parallel, and async runs.
Capturing error snapshots#
When a PipeFunc fails, it stores an
ErrorSnapshot on the callable itself and on the owning
pipeline. The attribute reflects the most recent failure on that
PipeFunc. When multiple errors occur (for example in threaded or
process-based executors) each failing invocation also returns its own snapshot,
so the per-element objects in the result record the correct kwargs and
exceptions even if the shared attribute is overwritten by later failures.
from pipefunc import Pipeline, pipefunc
from pipefunc.exceptions import ErrorSnapshot
@pipefunc(output_name="c")
def faulty_function(a: int, b: int) -> int:
raise ValueError("Intentional error")
pipeline = Pipeline([faulty_function])
try:
pipeline(a=1, b=2)
except Exception: # noqa: BLE001
func_snapshot = faulty_function.error_snapshot
pipeline_snapshot = pipeline.error_snapshot
func_snapshot is pipeline_snapshot, isinstance(func_snapshot, ErrorSnapshot)
An error occurred while calling the function `faulty_function` with the arguments `args=()` and `kwargs={'a': 1, 'b': 2}`.
💥 Error snapshot attached! Use `pipeline.error_snapshot` to debug: `.reproduce()`, `.kwargs`, `.save_to_file()`, `.function`, or just `print()` it. ↓ Scroll down to see the full traceback.
(False, False)
ErrorSnapshot captures the function, arguments, traceback, and useful helper
methods such as ErrorSnapshot.reproduce():
print(func_snapshot)
try:
func_snapshot.reproduce()
except Exception as exc: # noqa: BLE001
type(exc), exc
None
Continue mode walkthrough#
One small example covers the core concepts: failing elements turn into
ErrorSnapshot objects, and downstream functions see
PropagatedErrorSnapshot placeholders when their inputs
contain errors.
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def may_fail(x: int) -> int:
if x == 3:
raise ValueError("Cannot process 3")
return x * 2
@pipefunc(output_name="z", mapspec="y[i] -> z[i]")
def add_ten(y: int) -> int:
return y + 10
pipeline_basic = Pipeline([may_fail, add_ten])
result_basic = pipeline_basic.map(
{"x": [1, 2, 3, 4, 5]},
error_handling="continue",
)
y_outputs = result_basic["y"].output
z_outputs = result_basic["z"].output
y_outputs, z_outputs
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.
(array([2, 4, ErrorSnapshot('may_fail', ValueError: Cannot process 3), 8,
10], dtype=object),
array([12, 14,
PropagatedErrorSnapshot('add_ten', reason='input_is_error'), 18,
20], dtype=object))
The error for x == 3 hydrates two complementary objects:
from pipefunc.exceptions import PropagatedErrorSnapshot
type(y_outputs[2]), type(z_outputs[2])
(pipefunc.exceptions.ErrorSnapshot,
pipefunc.exceptions.PropagatedErrorSnapshot)
snapshot = y_outputs[2]
print(snapshot)
snapshot.kwargs
ErrorSnapshot:
--------------
- 🛠 Function: __main__.may_fail
- 🚨 Exception type: ValueError
- 💥 Exception message: Cannot process 3
- 📋 Args: ()
- 🗂 Kwargs: {x=3}
- 🕒 Timestamp: 2026-01-10T00:44:42.679014+00:00
- 👤 User: docs
- 💻 Machine: build-30952248-project-1013168-pipefunc
- 📡 IP Address: 172.17.0.2
- 📂 Current Directory: /home/docs/checkouts/readthedocs.org/user_builds/pipefunc/checkouts/930/docs/source/concepts
🔁 Reproduce the error by calling `error_snapshot.reproduce()`.
📄 Or see the full stored traceback using `error_snapshot.traceback`.
🔍 Inspect `error_snapshot.args` and `error_snapshot.kwargs`.
💾 Or save the error to a file using `error_snapshot.save_to_file(filename)` and load it using `ErrorSnapshot.load_from_file(filename)`.
{'x': 3}
Downstream code can walk back to the original failures:
propagated = z_outputs[2]
propagated.get_root_causes()
[ErrorSnapshot('may_fail', ValueError: Cannot process 3)]
Parallel and async consistency#
continue mode produces the same per-index snapshots even when work is chunked
across executors or awaited asynchronously.
from concurrent.futures import ThreadPoolExecutor
inputs = {"x": list(range(10))}
parallel_outputs = pipeline_basic.map(
inputs,
parallel=True,
executor=ThreadPoolExecutor(),
chunksizes=4,
error_handling="continue",
)
async_result = pipeline_basic.map_async(
inputs,
executor=ThreadPoolExecutor(),
chunksizes=4,
error_handling="continue",
)
results = await async_result.task
async_outputs = results["y"].output
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.
Recap#
error_handling="raise"aborts immediately;"continue"recordsErrorSnapshotobjects while allowing the run to finish.ErrorSnapshotinstances preserve everything needed to reproduce the exception locally or offline.PropagatedErrorSnapshothighlights which downstream inputs contained errors and lets you walk back to the root causes.The semantics are identical for sequential, threaded, process-based, and async execution so downstream code can rely on consistent error metadata.
Note
Reduction limitation: For reductions where a function receives an array that
contains one or more errors (reason: array_contains_errors),
get_root_causes() currently returns
an empty list. Root-cause enumeration is only available when the upstream input
parameter itself is a single error (“full” case). You can still use the
reason and error_info metadata to detect which inputs contained errors.