Adaptive integration#
Run Adaptive Sweeps in 1D, 2D, 3D, ND
Have uv? ⚡
If you have uv installed, you can instantly open this page as a Jupyter notebook using opennb:
uvx --with "pipefunc[docs]" opennb pipefunc/pipefunc/docs/source/concepts/adaptive-integration.md
This command creates an ephemeral environment with all dependencies and launches the notebook in your browser in 1 second - no manual setup needed! ✨.
Alternatively, run:
uv run https://raw.githubusercontent.com/pipefunc/pipefunc/refs/heads/main/get-notebooks.py
to download all documentation as Jupyter notebooks.
Using Adaptive sweeps instead of regular sweeps can save a lot of time.
Currently, there is no deep integration in pipefunc to do adaptive sweeps.
However, we can still do a poor man’s version of them.
Note
In the future the idea is to allow a syntax like this:
pipeline.map(inputs={'a': Bound(0, 1), 'b': Bound(0, 1), c=[0, 1, 2]})
This will turn into a 2D adaptive sweep (with adaptive.Learner2D) over a and b and do that for each value of c.
This poor man’s version runs pipeline.map for each iteration in the adaptive sweep, creating a new run_folder for each iteration.
Setting the stage#
Let’s set the stage by setting up a simple pipeline with a reduction operation.
from pipefunc import pipefunc, Pipeline
from pipefunc.typing import Array
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int, c: int) -> int:
return x + c**2 / (c**2 + x**2)
@pipefunc(output_name="sum_")
def take_sum(y: Array[int], d: int) -> float:
return sum(y) / d
pipeline = Pipeline([double_it, take_sum])
inputs = {"x": [1, 2, 3, 4], "c": 1, "d": 2}
run_folder = "my_run_folder"
results = pipeline.map(inputs, run_folder=run_folder)
print(results["y"].output.tolist())
[1.5, 2.2, 3.1, 4.0588235294117645]
print(results["sum_"].output)
5.429411764705883
This pipeline returns a single number, which is the sum of the inputs.
However, often we want to run a pipeline for a range of inputs, on e.g., a 2D grid on c and d.
pipeline2d = pipeline.copy()
pipeline2d.add_mapspec_axis("c", axis="j")
pipeline2d.add_mapspec_axis("d", axis="k")
Now let’s run this on a 2D grid of c and d:
import numpy as np
inputs = {"x": [1, 2, 3, 4], "c": np.linspace(0, 100, 20), "d": np.linspace(-1, 1, 20)}
run_folder = "my_run_folder"
results = pipeline2d.map(inputs, run_folder=run_folder)
We can load the results into an xarray dataset and plot them.
from pipefunc.map import load_xarray_dataset
ds = load_xarray_dataset(run_folder=run_folder)
ds.sum_.astype(float).plot(x="c", y="d")
Matplotlib is building the font cache; this may take a moment.
<matplotlib.collections.QuadMesh at 0x7b9ebd53bb60>
Important
One major advantage of this gridded sweep is that the data is all structured nicely and the parallelism in all captured by the pipeline.map function.
Currently, using adaptive as desribed below is a bit more cumbersome, however, there are plans to make this more seamless in the future.
Using adaptive for adaptive sweeps#
import adaptive
adaptive.notebook_extension()
We redefine the pipeline with the single reduction operation.
from pipefunc import pipefunc, Pipeline
from pipefunc.typing import Array
@pipefunc(output_name="y", mapspec="x[i] -> y[i]")
def double_it(x: int, c: int) -> int:
return x + c**2 / (c**2 + x**2)
@pipefunc(output_name="sum_")
def take_sum(y: Array[int], d: int) -> float:
return sum(y) / d
pipeline = Pipeline([double_it, take_sum])
Using adaptive.Learner1D for a 1D adaptive sweep#
from pipefunc.map.adaptive import to_adaptive_learner
run_folder_template = "adaptive_1d/run_folder_{}"
learner1d = to_adaptive_learner(
pipeline,
inputs={"x": [1, 2, 3, 4], "d": 1},
adaptive_dimensions={"c": (0, 100)},
adaptive_output="sum_",
run_folder_template=run_folder_template,
)
Then we can drive the learner sequentially because the pipeline.map is already parallelized.
adaptive.runner.simple(learner1d, npoints_goal=10)
We can now inspect the results of the adaptive_output in the learner
learner1d.to_numpy()
array([[ 0. , 10. ],
[ 0.78125 , 10.61165985],
[ 1.5625 , 11.43420931],
[ 3.125 , 12.51595012],
[ 6.25 , 13.40431569],
[ 12.5 , 13.82132852],
[ 25. , 13.95288667],
[ 50. , 13.98805633],
[ 75. , 13.99467783],
[100. , 13.99700354]])
learner1d.plot()
Or inspect all the underlying data
from pathlib import Path
from pipefunc.map import load_xarray_dataset
all_folders = Path(run_folder_template).parent.glob("*")
all_folders = sorted(all_folders)
datasets = [load_xarray_dataset(run_folder=folder) for folder in all_folders]
datasets[0] # just look at the first dataset
<xarray.Dataset> Size: 72B
Dimensions: (i: 4)
Coordinates:
x (i) object 32B 1 2 3 4
Dimensions without coordinates: i
Data variables:
y (i) object 32B 1.0 2.0 3.0 4.0
sum_ object 8B 10.0Using adaptive.Learner2D for a 2D adaptive sweep#
run_folder_template = "adaptive_2d/run_folder_{}"
learner2d = to_adaptive_learner(
pipeline,
inputs={"x": [1, 2, 3, 4]},
adaptive_dimensions={"c": (0, 100), "d": (-1, 1)},
adaptive_output="sum_",
run_folder_template=run_folder_template,
)
Even though pipeline.map is already parallelized by default, we can still use the adaptive.Runner to also run “doubly” parallel, where multiple pipeline.map are run in parallel in addition to the parallelization of the pipeline.map itself.
runner = adaptive.Runner(learner2d, npoints_goal=10)
runner.live_info()
We can now inspect the results of the adaptive_output in the learner
learner2d.plot(tri_alpha=0.3)
learner2d.to_numpy()
array([[ 0.00000000e+00, -1.00000000e+00, -1.00000000e+01],
[ 0.00000000e+00, 1.00000000e+00, 1.00000000e+01],
[ 1.11111111e+01, 1.11111111e-01, 1.24001139e+02],
[ 3.33333333e+01, -3.33333333e-01, -4.19198497e+01],
[ 3.33333333e+01, 3.33333333e-01, 4.19198497e+01],
[ 4.44444444e+01, 7.77777778e-01, 1.79805891e+01],
[ 6.66666667e+01, -3.33333333e-01, -4.19798036e+01],
[ 6.66666667e+01, 3.33333333e-01, 4.19798036e+01],
[ 1.00000000e+02, -1.00000000e+00, -1.39970035e+01],
[ 1.00000000e+02, 1.00000000e+00, 1.39970035e+01]])
Or inspect all the underlying data
from pathlib import Path
from pipefunc.map import load_xarray_dataset
all_folders = Path(run_folder_template).parent.glob("*")
all_folders = sorted(all_folders)
datasets = [load_xarray_dataset(run_folder=folder) for folder in all_folders]
datasets[0] # just look at the first dataset
<xarray.Dataset> Size: 72B
Dimensions: (i: 4)
Coordinates:
x (i) object 32B 1 2 3 4
Dimensions without coordinates: i
Data variables:
y (i) object 32B 1.0 2.0 3.0 4.0
sum_ object 8B -10.0