Source code for pipefunc.map.adaptive

"""Provides `adaptive` integration for `pipefunc`."""

from __future__ import annotations

import functools
from collections import UserDict
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal, NamedTuple, TypeAlias

import numpy as np
from adaptive import Learner1D, Learner2D, LearnerND, SequenceLearner, runner

from pipefunc._utils import at_least_tuple, prod
from pipefunc.map._shapes import shape_is_resolved

from ._mapspec import MapSpec
from ._prepare import _reduced_axes, _validate_fixed_indices
from ._run import (
    _func_kwargs,
    _load_from_store,
    _mask_fixed_axes,
    _process_task,
    _run_iteration_and_process,
    _submit_func,
    run_map,
)
from ._run_info import RunInfo
from ._shapes import external_shape_from_mask, map_shapes
from ._storage_array._base import iterate_shape_indices

if TYPE_CHECKING:
    from collections.abc import Callable, Generator

    import adaptive_scheduler
    import numpy.typing as npt

    from pipefunc import PipeFunc, Pipeline
    from pipefunc._pipeline._types import OUTPUT_TYPE
    from pipefunc.cache import _CacheBase
    from pipefunc.resources import Resources
    from pipefunc.sweep import Sweep

    from ._result import StoreType
    from ._storage_array._base import StorageBase
    from ._types import ShapeTuple, UserShapeDict
    from .adaptive_scheduler import AdaptiveSchedulerDetails


[docs] class LearnerPipeFunc(NamedTuple): """A tuple with `~adaptive.SequenceLearner` and `~pipefunc.PipeFunc`.""" learner: SequenceLearner pipefunc: PipeFunc
[docs] class AxisIndex(NamedTuple): """A named tuple to store the axis and index for a fixed axis.""" axis: str idx: int | slice # not called `index` to avoid shadowing the built-in
LearnersDictType: TypeAlias = UserDict[tuple[AxisIndex, ...] | None, list[list[LearnerPipeFunc]]]
[docs] class LearnersDict(LearnersDictType): """A dictionary of adaptive learners for a pipeline as returned by `create_learners`.""" def __init__( self, learners_dict: LearnersDictType | None = None, run_info: RunInfo | None = None, ) -> None: """Create a dictionary of adaptive learners for a pipeline.""" super().__init__(learners_dict or {}) self.run_info: RunInfo | None = run_info
[docs] def flatten(self) -> dict[OUTPUT_TYPE, list[SequenceLearner]]: """Flatten the learners into a dictionary with the output names as keys.""" flat_learners: dict[OUTPUT_TYPE, list[SequenceLearner]] = {} for learners_lists in self.data.values(): for learners in learners_lists: for learner_with_pipefunc in learners: output_name = learner_with_pipefunc.pipefunc.output_name flat_learners.setdefault(output_name, []).append(learner_with_pipefunc.learner) return flat_learners
[docs] def simple_run(self) -> None: """Run all the learners in the dictionary in order using `adaptive.runner.simple`.""" for learner_list in self.flatten().values(): for learner in learner_list: runner.simple(learner)
[docs] def to_slurm_run( self, default_resources: dict | Resources | None = None, *, ignore_resources: bool = False, returns: Literal["run_manager", "kwargs", "namedtuple"] = "kwargs", **slurm_run_kwargs: Any, ) -> dict[str, Any] | adaptive_scheduler.RunManager | AdaptiveSchedulerDetails: """Helper for `adaptive_scheduler.slurm_run` which returns a `adaptive_scheduler.RunManager`. Parameters ---------- default_resources The default resources to use for the run. Only needed if not all `PipeFunc`s have resources. ignore_resources Whether to ignore the resources of the `PipeFunc`s and use the `default_resources` for all of them. returns What to return. Can be one of "run_manager", "kwargs", or "namedtuple". If "run_manager", returns a `adaptive_scheduler.RunManager`. If "kwargs", returns a dictionary that can be passed to `adaptive_scheduler.slurm_run`. If "namedtuple", returns an `AdaptiveSchedulerDetails`. slurm_run_kwargs Additional keyword arguments to pass to `adaptive_scheduler.slurm_run`. Returns ------- The output depends on the value of `returns`. """ from .adaptive_scheduler import slurm_run_setup if self.run_info is None: msg = "`run_info` must be provided. Set `learners_dict.run_info`." raise ValueError(msg) details: AdaptiveSchedulerDetails = slurm_run_setup( self, default_resources, ignore_resources=ignore_resources, ) if returns == "namedtuple": if slurm_run_kwargs: msg = "Cannot pass `slurm_run_kwargs` when `returns='namedtuple'`." raise ValueError(msg) return details kwargs = details.kwargs() if slurm_run_kwargs: kwargs.update(slurm_run_kwargs) assert self.run_info.run_folder is not None kwargs.setdefault("folder", self.run_info.run_folder / "adaptive_scheduler") if returns == "run_manager": # pragma: no cover return details.run_manager(kwargs) if returns == "kwargs": return kwargs msg = f"Invalid value for `returns`: {returns}" raise ValueError(msg)
[docs] def create_learners( pipeline: Pipeline, inputs: dict[str, Any], run_folder: str | Path | None, internal_shapes: UserShapeDict | None = None, *, storage: str | dict[OUTPUT_TYPE, str] = "file_array", return_output: bool = False, cleanup: bool | None = None, resume: bool = False, fixed_indices: dict[str, int | slice] | None = None, split_independent_axes: bool = False, ) -> LearnersDict: """Create adaptive learners for a single `Pipeline.map` call. Creates learner(s) for each function node in the pipeline graph. The number of learners created for each node depends on the `fixed_indices` and `split_independent_axes` parameters: - If `fixed_indices` is provided or `split_independent_axes` is `False`, a single learner is created for each function node (unless `resources_scope="element"`). - If `split_independent_axes` is `True`, multiple learners are created for each function node, corresponding to different combinations of the independent axes in the pipeline. Returns a dictionary where the keys represent specific combinations of indices for the independent axes, and the values are lists of lists of learners: - The outer lists represent different stages or generations of the pipeline, where the learners in each stage depend on the outputs of the learners in the previous stage. - The inner lists contain learners that can be executed independently within each stage. When `split_independent_axes` is `True`, each key in the dictionary corresponds to a different combination of indices for the independent axes, allowing for parallel execution across different subsets of the input data. If `fixed_indices` is `None` and `split_independent_axes` is `False`, the only key in the dictionary is `None`, indicating that all indices are being processed together. Parameters ---------- pipeline The pipeline to create learners for. inputs The inputs to the pipeline, the same as passed to `pipeline.map`. run_folder The folder to store the run information. internal_shapes The internal shapes to use for the run. storage The storage class to use for storing intermediate and final results. Can be specified as: 1. A string: Use a single storage class for all outputs. 2. A dictionary: Specify different storage classes for different outputs. - Use output names as keys and storage class names as values. - Use an empty string ``""`` as a key to set a default storage class. Available storage classes are registered in `pipefunc.map.storage_registry`. Common options include ``"file_array"``, ``"dict"``, and ``"shared_memory_dict"``. return_output Whether to return the output of the function in the learner. cleanup .. deprecated:: 0.89.0 Use `resume` parameter instead. Will be removed in version 1.0.0. Whether to clean up the ``run_folder`` before running the pipeline. When set, takes priority over ``resume`` parameter. ``cleanup=True`` is equivalent to ``resume=False``. ``cleanup=False`` is equivalent to ``resume=True``. resume Whether to resume data from a previous run in the ``run_folder``. - ``False`` (default): Clean up the ``run_folder`` before running (fresh start). - ``True``: Attempt to load and resume results from a previous run. Note: If ``cleanup`` is specified, it takes priority over this parameter. fixed_indices A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over. split_independent_axes Whether to split the independent axes into separate learners. Do not use in conjunction with ``fixed_indices``. See Also -------- LearnersDict.to_slurm_run Convert the learners to variables that can be passed to `adaptive_scheduler.slurm_run`. Returns ------- A dictionary where the keys are the fixed indices, e.g., ``(("i", 0), ("j", 0))``, and the values are lists of lists of learners. The learners in the inner list can be executed in parallel, but the outer lists need to be executed in order. If ``fixed_indices`` is ``None`` and ``split_independent_axes`` is ``False``, then the only key is ``None``. """ run_info = RunInfo.create( run_folder, pipeline, inputs, internal_shapes, storage=storage, cleanup=cleanup, resume=resume, ) store = run_info.init_store() learners: LearnersDict = LearnersDict(run_info=run_info) iterator = _maybe_iterate_axes( pipeline, inputs, fixed_indices, split_independent_axes, run_info.internal_shapes, ) for _fixed_indices in iterator: key = _key(_fixed_indices) for gen in pipeline.topological_generations.function_lists: _validate_no_dynamic_shapes(run_info, gen) gen_learners = [] for func in gen: learner = _learner( func=func, run_info=run_info, store=store, fixed_indices=_fixed_indices, # might be None cache=pipeline.cache, return_output=return_output, ) if func.resources_scope == "element": for lrn in _split_sequence_learner(learner): gen_learners.append(LearnerPipeFunc(lrn, func)) # noqa: PERF401 else: gen_learners.append(LearnerPipeFunc(learner, func)) learners.setdefault(key, []).append(gen_learners) return learners
def _validate_no_dynamic_shapes(run_info: RunInfo, generation: list[PipeFunc]) -> None: for func in generation: shape = run_info.resolved_shapes.get(func.output_name, ()) if not shape_is_resolved(shape): msg = "Dynamic `internal_shapes` not supported in `create_learners`." raise ValueError(msg) def _split_sequence_learner(learner: SequenceLearner) -> list[SequenceLearner]: """Split a `SequenceLearner` into multiple learners.""" if len(learner.sequence) == 1: return [learner] return [SequenceLearner(learner._original_function, [x]) for x in learner.sequence] def _learner( func: PipeFunc, run_info: RunInfo, store: dict[str, StoreType], fixed_indices: dict[str, int | slice] | None, cache: _CacheBase | None, *, return_output: bool, ) -> SequenceLearner: if func.requires_mapping: f = functools.partial( _execute_iteration_in_map_spec, func=func, run_info=run_info, store=store, return_output=return_output, cache=cache, ) shape = run_info.resolved_shapes[func.output_name] assert shape_is_resolved(shape) mask = run_info.shape_masks[func.output_name] assert func.mapspec is not None sequence = _sequence(fixed_indices, func.mapspec, shape, mask) return SequenceLearner(f, sequence) f = functools.partial( _execute_iteration_in_single, func=func, run_info=run_info, store=store, return_output=return_output, ) sequence = [None] # type: ignore[list-item,assignment] return SequenceLearner(f, sequence) def _key(fixed_indices: dict[str, int | slice] | None) -> tuple[AxisIndex, ...] | None: if not fixed_indices: return None # Makes `fixed_indices` hashable return tuple(AxisIndex(axis=axis, idx=idx) for axis, idx in sorted(fixed_indices.items())) def _sequence( fixed_indices: dict[str, int | slice] | None, mapspec: MapSpec, shape: tuple[int, ...], mask: tuple[bool, ...], ) -> npt.NDArray[np.int_] | range: if fixed_indices is None: return range(prod(shape)) fixed_mask = _mask_fixed_axes(fixed_indices, mapspec, shape, mask) assert fixed_mask is not None assert len(fixed_mask) == prod(external_shape_from_mask(shape, mask)) return np.flatnonzero(fixed_mask) def _ensure_adaptive_error_mode(run_info: RunInfo) -> None: if run_info.error_handling != "raise": msg = ( "Adaptive learners do not support error_handling='continue'. " "Please rerun with error_handling='raise' or switch to non-adaptive map execution." ) raise NotImplementedError(msg) def _execute_iteration_in_single( _: Any, func: PipeFunc, run_info: RunInfo, store: dict[str, StoreType], *, return_output: bool = False, ) -> Any | None: """Execute a single iteration of a single function. Meets the requirements of `adaptive.SequenceLearner`. """ _ensure_adaptive_error_mode(run_info) output, exists = _load_from_store(func.output_name, store, return_output=return_output) if exists: return output kwargs_task = _submit_func(func, run_info, store, fixed_indices=None, executor=None) result = _process_task(func, kwargs_task, store, run_info, return_results=True) if not return_output: return None assert result is not None output = tuple(result[name].output for name in at_least_tuple(func.output_name)) return output if isinstance(func.output_name, tuple) else output[0] def _execute_iteration_in_map_spec( index: int, func: PipeFunc, run_info: RunInfo, store: dict[str, StoreType], cache: _CacheBase | None, *, return_output: bool = False, ) -> tuple[Any, ...] | None: """Execute a single iteration of a map spec. Meets the requirements of `adaptive.SequenceLearner`. """ _ensure_adaptive_error_mode(run_info) arrays: list[StorageBase] = [store[name] for name in at_least_tuple(func.output_name)] # type: ignore[misc] # Load the data if it exists if all(arr.has_index(index) for arr in arrays): if not return_output: return None return tuple(arr.get_from_index(index) for arr in arrays) # Otherwise, run the function assert isinstance(func.mapspec, MapSpec) kwargs = _func_kwargs(func, run_info, store) shape = run_info.resolved_shapes[func.output_name] assert shape_is_resolved(shape) mask = run_info.shape_masks[func.output_name] outputs = _run_iteration_and_process( index, func, kwargs, shape, mask, arrays, cache, error_handling="raise", force_dump=True, ) if not return_output: return None return outputs if isinstance(func.output_name, tuple) else outputs[0] @dataclass(frozen=True, slots=True) class _MapWrapper: """Wraps the `pipefunc.map.map` function and makes it a callable with a single unused argument. Copies the Pipeline and removes the cache to avoid issues with the parallel execution. """ pipeline: Pipeline inputs: dict[str, Any] run_folder: Path internal_shapes: UserShapeDict | None parallel: bool cleanup: bool | None resume: bool def __call__(self, _: Any) -> None: """Run the pipeline.""" run_map( self.pipeline, self.inputs, self.run_folder, self.internal_shapes, parallel=self.parallel, cleanup=self.cleanup, resume=self.resume, )
[docs] def create_learners_from_sweep( pipeline: Pipeline, sweep: Sweep, run_folder: str | Path, internal_shapes: UserShapeDict | None = None, *, parallel: bool = True, cleanup: bool | None = None, resume: bool = False, ) -> tuple[list[SequenceLearner], list[Path]]: """Create adaptive learners for a sweep. Creates an `adaptive.SequenceLearner` for each sweep run. These learners have a single iteration that executes the map in parallel. This means that here we rely on the internal parallelization of the pipeline. Each learner is fully independent of the others, and they can be executed in parallel. Note that this only parallelizes the nodes with a `MapSpec`, the rest of the nodes are executed in order. Only use this if the sequential execution of the nodes is not a bottleneck. Parameters ---------- pipeline The pipeline to create learners for. sweep The sweep to create learners for, must generate ``input`` dictionaries as expected by `pipeline.map`. run_folder The folder to store the run information. Each sweep run will be stored in a subfolder of this folder. internal_shapes The internal shapes to use for the run, as expected by `pipeline.map`. parallel Whether to run the map in parallel. cleanup .. deprecated:: 0.89.0 Use `resume` parameter instead. Will be removed in version 1.0.0. Whether to clean up the ``run_folder`` before running the pipeline. When set, takes priority over ``resume`` parameter. ``cleanup=True`` is equivalent to ``resume=False``. ``cleanup=False`` is equivalent to ``resume=True``. resume Whether to resume data from a previous run in the ``run_folder``. - ``False`` (default): Clean up the ``run_folder`` before running (fresh start). - ``True``: Attempt to load and resume results from a previous run. Note: If ``cleanup`` is specified, it takes priority over this parameter. Returns ------- A tuple of lists where the first list contains the learners and the second list contains the run folders for each sweep run. """ run_folder = Path(run_folder) learners = [] folders = [] pipeline = pipeline.copy(cache_type=None, cache_kwargs=None) pipeline._clear_internal_cache() max_digits = len(str(len(sweep) - 1)) for i, inputs in enumerate(sweep): sweep_run = run_folder / f"sweep_{str(i).zfill(max_digits)}" f = _MapWrapper(pipeline, inputs, sweep_run, internal_shapes, parallel, cleanup, resume) learner = SequenceLearner(f, sequence=[None]) learners.append(learner) folders.append(sweep_run) return learners, folders
def _identify_cross_product_axes(pipeline: Pipeline) -> tuple[str, ...]: reduced = _reduced_axes(pipeline) impossible_axes: set[str] = set() # Constructing this as a safety measure (for assert below) for func in pipeline.leaf_nodes: for output_name in pipeline.func_dependencies(func): for name in at_least_tuple(output_name): if name in reduced: impossible_axes.update(reduced[name]) possible_axes: set[str] = set() for func in pipeline.leaf_nodes: axes = pipeline.independent_axes_in_mapspecs(func.output_name) possible_axes.update(axes) assert not (possible_axes & impossible_axes) return tuple(sorted(possible_axes)) def _iterate_axes( independent_axes: tuple[str, ...], inputs: dict[str, Any], mapspec_axes: dict[str, tuple[str, ...]], shapes: dict[OUTPUT_TYPE, ShapeTuple], ) -> Generator[dict[str, Any], None, None]: shape: list[int | Literal["?"]] = [] for axis in independent_axes: parameter, dim = next( (p, axes.index(axis)) for p, axes in mapspec_axes.items() if axis in axes and p in inputs ) shape.append(shapes[parameter][dim]) new_shape = tuple(shape) # We can assert this because the internal_shapes should never appear as independent axes assert shape_is_resolved(new_shape) for indices in iterate_shape_indices(new_shape): yield dict(zip(independent_axes, indices)) def _maybe_iterate_axes( pipeline: Pipeline, inputs: dict[str, Any], fixed_indices: dict[str, int | slice] | None, split_independent_axes: bool, internal_shapes: UserShapeDict | None, ) -> Generator[dict[str, int | slice] | None, None, None]: if fixed_indices: assert not split_independent_axes _validate_fixed_indices(fixed_indices, inputs, pipeline) yield fixed_indices return if not split_independent_axes: yield None return independent_axes = _identify_cross_product_axes(pipeline) axes = pipeline.mapspec_axes shapes = map_shapes(pipeline, inputs, internal_shapes).shapes for _fixed_indices in _iterate_axes(independent_axes, inputs, axes, shapes): _validate_fixed_indices(_fixed_indices, inputs, pipeline) yield _fixed_indices def _adaptive_wrapper( _adaptive_value: float | tuple[float, ...], pipeline: Pipeline, inputs: dict[str, Any], adaptive_dimensions: tuple[str, ...], adaptive_output: str, run_folder_template: str, map_kwargs: dict[str, Any], ) -> float: run_folder = run_folder_template.format(_adaptive_value) values: tuple[float, ...] = at_least_tuple(_adaptive_value) inputs_ = inputs.copy() for dim, val in zip(adaptive_dimensions, values): inputs_[dim] = val results = pipeline.map(inputs_, run_folder=run_folder, show_progress=False, **map_kwargs) return results[adaptive_output].output def _validate_adaptive( pipeline: Pipeline, inputs: dict[str, Any], adaptive_dimensions: dict[str, tuple[float, float]], ) -> None: if invalid := set(adaptive_dimensions) & set(inputs): msg = f"Adaptive dimensions `{invalid}` cannot be in inputs" raise ValueError(msg) if invalid := set(adaptive_dimensions) & set(pipeline.mapspec_names): msg = f"Adaptive dimensions `{invalid}` cannot be in `MapSpec`s" raise ValueError(msg) if not adaptive_dimensions: msg = "`adaptive_dimensions` must be a non-empty dict" raise ValueError(msg)
[docs] def to_adaptive_learner( pipeline: Pipeline, inputs: dict[str, Any], adaptive_dimensions: dict[str, tuple[float, float]], adaptive_output: str, run_folder_template: str = "run_folder_{}", map_kwargs: dict[str, Any] | None = None, loss_function: Callable[..., Any] | None = None, ) -> Learner1D | Learner2D | LearnerND: """Create an adaptive learner in 1D, 2D, or ND from a pipeline.map. Parameters ---------- pipeline The pipeline to create the learner from. inputs The inputs to the pipeline, as passed to `pipeline.map`. Should not contain the adaptive dimensions. adaptive_dimensions A dictionary mapping the adaptive dimensions to their bounds. If the length of the dictionary is 1, a `adaptive.Learner1D` is created. If the length is 2, a `adaptive.Learner2D` is created. If the length is 3 or more, a `adaptive.LearnerND` is created. adaptive_output The output to adapt to. run_folder_template The template for the run folder. Must contain a single `{}` which will be replaced by the adaptive value. For example, ``"data/my_sweep_{}"``. map_kwargs Additional keyword arguments to pass to `pipeline.map`. For example, the `parallel` argument can be passed here. loss_function The loss function to use for the adaptive learner. The ``loss_per_interval`` argument for `adaptive.Learner1D`, the ``loss_per_triangle`` argument for `adaptive.Learner2D`, and the ``loss_per_simplex`` argument for `adaptive.LearnerND`. If not provided, the default loss function is used. Returns ------- A `Learner1D`, `Learner2D`, or `LearnerND` object. """ _validate_adaptive(pipeline, inputs, adaptive_dimensions) dims, bounds = zip(*adaptive_dimensions.items()) function = functools.partial( _adaptive_wrapper, pipeline=pipeline, inputs=inputs, adaptive_dimensions=dims, adaptive_output=adaptive_output, run_folder_template=run_folder_template, map_kwargs=map_kwargs or {}, ) n = len(adaptive_dimensions) if n == 1: return Learner1D(function, bounds[0], loss_per_interval=loss_function) if n == 2: # noqa: PLR2004 return Learner2D(function, bounds, loss_per_triangle=loss_function) return LearnerND(function, bounds, loss_per_simplex=loss_function)