Function Chaining Helper#
pipefunc.helpers.chain connects functions linearly so the output of one becomes the input of the next.
import numpy as np
from pipefunc import Pipeline, pipefunc
from pipefunc.helpers import chain
@pipefunc("to_float")
def to_float(img: np.ndarray) -> np.ndarray:
return img.astype(np.float32)
@pipefunc("normalize")
def normalize(img: np.ndarray, eps: float = 1e-8) -> np.ndarray:
img = img.astype(np.float32)
rng = np.ptp(img) # max - min
return (img - img.min()) / (rng + eps)
@pipefunc("gamma")
def gamma(img: np.ndarray, g: float = 2.2) -> np.ndarray:
return np.power(np.clip(img, 0.0, 1.0), 1.0 / g)
@pipefunc("threshold")
def threshold(img: np.ndarray, t: float = 0.5) -> np.ndarray:
return (img > t).astype(np.float32)
functions = chain(
[
to_float,
gamma,
normalize,
threshold,
normalize.copy(output_name="output"), # apply normalize again, with different output name
],
)
pipe = Pipeline(functions)
img = np.random.randint(0, 255, size=(64, 64), dtype=np.uint8)
out = pipe.run("output", kwargs={"img": img, "t": 0.4})
out.shape, out.dtype
((64, 64), dtype('float32'))
How it works#
If a parameter name matches an upstream output, uses that match
Otherwise, renames the first parameter to the upstream output
Plain callables auto-wrap as
PipeFuncwithoutput_name=f.__name__
Multi-output functions#
By default, the first output is used. To select a different output, name your parameter to match it:
@pipefunc(("a", "b"))
def split(x: int) -> tuple[int, int]:
return x, 10 * x
@pipefunc("sink_b")
def sink_b(b: int) -> int: # parameter 'b' matches second output
return b
Pipeline(chain([split, sink_b])).run("sink_b", kwargs={"x": 7}) # -> 70
70