Error Handling#

Overview#

pipefunc exposes two error-handling strategies when running pipelines:

  • error_handling="raise" (default): stop on the first failure and raise the underlying exception.

  • error_handling="continue": convert failures raised inside user-defined PipeFunc callables into ErrorSnapshot objects so the run can continue and downstream tasks can inspect the failure. Infrastructure issues (e.g. pickling the return value, shape validation, storage I/O) still abort the run regardless of the setting.

The sections below show how to inspect snapshots, how propagation works, and why the behaviour is consistent across synchronous, parallel, and async runs.

Capturing error snapshots#

When a PipeFunc fails, it stores an ErrorSnapshot on the callable itself and on the owning pipeline. The attribute reflects the most recent failure on that PipeFunc. When multiple errors occur (for example in threaded or process-based executors) each failing invocation also returns its own snapshot, so the per-element objects in the result record the correct kwargs and exceptions even if the shared attribute is overwritten by later failures.

from pipefunc import Pipeline, pipefunc
from pipefunc.exceptions import ErrorSnapshot

@pipefunc(output_name="c")
def faulty_function(a: int, b: int) -> int:
    raise ValueError("Intentional error")

pipeline = Pipeline([faulty_function])

try:
    pipeline(a=1, b=2)
except Exception:  # noqa: BLE001
    func_snapshot = faulty_function.error_snapshot
    pipeline_snapshot = pipeline.error_snapshot

func_snapshot is pipeline_snapshot, isinstance(func_snapshot, ErrorSnapshot)
An error occurred while calling the function `faulty_function` with the arguments `args=()` and `kwargs={'a': 1, 'b': 2}`.
💥 Error snapshot attached!
 Use `pipeline.error_snapshot` to debug:
 `.reproduce()`, `.kwargs`, `.save_to_file()`, `.function`, or just `print()` it.
 ↓ Scroll down to see the full traceback.
(False, False)

ErrorSnapshot captures the function, arguments, traceback, and useful helper methods such as ErrorSnapshot.reproduce():

print(func_snapshot)

try:
    func_snapshot.reproduce()
except Exception as exc:  # noqa: BLE001
    type(exc), exc
None

Continue mode walkthrough#

One small example covers the core concepts: failing elements turn into ErrorSnapshot objects, and downstream functions see PropagatedErrorSnapshot placeholders when their inputs contain errors.

@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def may_fail(x: int) -> int:
    if x == 3:
        raise ValueError("Cannot process 3")
    return x * 2

@pipefunc(output_name="z", mapspec="y[i] -> z[i]")
def add_ten(y: int) -> int:
    return y + 10

pipeline_basic = Pipeline([may_fail, add_ten])

result_basic = pipeline_basic.map(
    {"x": [1, 2, 3, 4, 5]},
    error_handling="continue",
)

y_outputs = result_basic["y"].output
z_outputs = result_basic["z"].output

y_outputs, z_outputs
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.
(array([2, 4, ErrorSnapshot('may_fail', ValueError: Cannot process 3), 8,
        10], dtype=object),
 array([12, 14,
        PropagatedErrorSnapshot('add_ten', reason='input_is_error'), 18,
        20], dtype=object))

The error for x == 3 hydrates two complementary objects:

from pipefunc.exceptions import PropagatedErrorSnapshot

type(y_outputs[2]), type(z_outputs[2])
(pipefunc.exceptions.ErrorSnapshot,
 pipefunc.exceptions.PropagatedErrorSnapshot)
snapshot = y_outputs[2]
print(snapshot)
snapshot.kwargs
ErrorSnapshot:
--------------
- 🛠 Function: __main__.may_fail
- 🚨 Exception type: ValueError
- 💥 Exception message: Cannot process 3
- 📋 Args: ()
- 🗂 Kwargs: {x=3}
- 🕒 Timestamp: 2026-01-10T00:44:42.679014+00:00
- 👤 User: docs
- 💻 Machine: build-30952248-project-1013168-pipefunc
- 📡 IP Address: 172.17.0.2
- 📂 Current Directory: /home/docs/checkouts/readthedocs.org/user_builds/pipefunc/checkouts/930/docs/source/concepts

🔁 Reproduce the error by calling `error_snapshot.reproduce()`.
📄 Or see the full stored traceback using `error_snapshot.traceback`.
🔍 Inspect `error_snapshot.args` and `error_snapshot.kwargs`.
💾 Or save the error to a file using `error_snapshot.save_to_file(filename)` and load it using `ErrorSnapshot.load_from_file(filename)`.
{'x': 3}

Downstream code can walk back to the original failures:

propagated = z_outputs[2]
propagated.get_root_causes()
[ErrorSnapshot('may_fail', ValueError: Cannot process 3)]

Parallel and async consistency#

continue mode produces the same per-index snapshots even when work is chunked across executors or awaited asynchronously.

from concurrent.futures import ThreadPoolExecutor

inputs = {"x": list(range(10))}
parallel_outputs = pipeline_basic.map(
    inputs,
    parallel=True,
    executor=ThreadPoolExecutor(),
    chunksizes=4,
    error_handling="continue",
)

async_result = pipeline_basic.map_async(
    inputs,
    executor=ThreadPoolExecutor(),
    chunksizes=4,
    error_handling="continue",
)
results = await async_result.task
async_outputs = results["y"].output
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.
An error occurred while calling the function `may_fail` with the arguments `args=()` and `kwargs={'x': 3}`.

Recap#

  • error_handling="raise" aborts immediately; "continue" records ErrorSnapshot objects while allowing the run to finish.

  • ErrorSnapshot instances preserve everything needed to reproduce the exception locally or offline.

  • PropagatedErrorSnapshot highlights which downstream inputs contained errors and lets you walk back to the root causes.

  • The semantics are identical for sequential, threaded, process-based, and async execution so downstream code can rely on consistent error metadata.

Note

Reduction limitation: For reductions where a function receives an array that contains one or more errors (reason: array_contains_errors), get_root_causes() currently returns an empty list. Root-cause enumeration is only available when the upstream input parameter itself is a single error (“full” case). You can still use the reason and error_info metadata to detect which inputs contained errors.