pipefunc.map.adaptive module#

Provides adaptive integration for pipefunc.

class pipefunc.map.adaptive.LearnerPipeFunc(learner: SequenceLearner, pipefunc: PipeFunc)[source]#

Bases: NamedTuple

A tuple with SequenceLearner and PipeFunc.

learner: SequenceLearner#

Alias for field number 0

pipefunc: PipeFunc#

Alias for field number 1

class pipefunc.map.adaptive.AxisIndex(axis: str, idx: int | slice)[source]#

Bases: NamedTuple

A named tuple to store the axis and index for a fixed axis.

axis: str#

Alias for field number 0

idx: int | slice#

Alias for field number 1

class pipefunc.map.adaptive.LearnersDict(learners_dict=None, run_info=None)[source]#

Bases: UserDict[tuple[AxisIndex, …] | None, list[list[LearnerPipeFunc]]]

A dictionary of adaptive learners for a pipeline as returned by create_learners.

flatten()[source]#

Flatten the learners into a dictionary with the output names as keys.

Return type:

dict[str | tuple[str, ...], list[SequenceLearner]]

simple_run()[source]#

Run all the learners in the dictionary in order using adaptive.runner.simple.

Return type:

None

to_slurm_run(default_resources=None, *, ignore_resources=False, returns='kwargs', **slurm_run_kwargs)[source]#

Helper for adaptive_scheduler.slurm_run which returns a adaptive_scheduler.RunManager.

Parameters:
  • default_resources (dict | Resources | None) – The default resources to use for the run. Only needed if not all `PipeFunc`s have resources.

  • ignore_resources (bool) – Whether to ignore the resources of the PipeFunc`s and use the `default_resources for all of them.

  • returns (Literal['run_manager', 'kwargs', 'namedtuple']) – What to return. Can be one of “run_manager”, “kwargs”, or “namedtuple”. If “run_manager”, returns a adaptive_scheduler.RunManager. If “kwargs”, returns a dictionary that can be passed to adaptive_scheduler.slurm_run. If “namedtuple”, returns an AdaptiveSchedulerDetails.

  • slurm_run_kwargs (Any) – Additional keyword arguments to pass to adaptive_scheduler.slurm_run.

Return type:

dict[str, Any] | RunManager | AdaptiveSchedulerDetails

Returns:

The output depends on the value of returns.

pipefunc.map.adaptive.create_learners(pipeline, inputs, run_folder, internal_shapes=None, *, storage='file_array', return_output=False, cleanup=None, resume=False, fixed_indices=None, split_independent_axes=False)[source]#

Create adaptive learners for a single Pipeline.map call.

Creates learner(s) for each function node in the pipeline graph. The number of learners created for each node depends on the fixed_indices and split_independent_axes parameters:

  • If fixed_indices is provided or split_independent_axes is False, a single learner is created for each function node (unless resources_scope=”element”).

  • If split_independent_axes is True, multiple learners are created for each function node, corresponding to different combinations of the independent axes in the pipeline.

Returns a dictionary where the keys represent specific combinations of indices for the independent axes, and the values are lists of lists of learners:

  • The outer lists represent different stages or generations of the pipeline, where the learners in each stage depend on the outputs of the learners in the previous stage.

  • The inner lists contain learners that can be executed independently within each stage.

When split_independent_axes is True, each key in the dictionary corresponds to a different combination of indices for the independent axes, allowing for parallel execution across different subsets of the input data.

If fixed_indices is None and split_independent_axes is False, the only key in the dictionary is None, indicating that all indices are being processed together.

Parameters:
  • pipeline (Pipeline) – The pipeline to create learners for.

  • inputs (dict[str, Any]) – The inputs to the pipeline, the same as passed to pipeline.map.

  • run_folder (str | Path | None) – The folder to store the run information.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) – The internal shapes to use for the run.

  • storage (str | dict[str | tuple[str, ...], str]) –

    The storage class to use for storing intermediate and final results. Can be specified as:

    1. A string: Use a single storage class for all outputs.

    2. A dictionary: Specify different storage classes for different outputs.

      • Use output names as keys and storage class names as values.

      • Use an empty string "" as a key to set a default storage class.

    Available storage classes are registered in pipefunc.map.storage_registry. Common options include "file_array", "dict", and "shared_memory_dict".

  • return_output (bool) – Whether to return the output of the function in the learner.

  • cleanup (bool | None) –

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) –

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

  • fixed_indices (dict[str, int | slice] | None) – A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.

  • split_independent_axes (bool) – Whether to split the independent axes into separate learners. Do not use in conjunction with fixed_indices.

See also

LearnersDict.to_slurm_run

Convert the learners to variables that can be passed to adaptive_scheduler.slurm_run.

Return type:

LearnersDict

Returns:

A dictionary where the keys are the fixed indices, e.g., (("i", 0), ("j", 0)), and the values are lists of lists of learners. The learners in the inner list can be executed in parallel, but the outer lists need to be executed in order. If fixed_indices is None and split_independent_axes is False, then the only key is None.

pipefunc.map.adaptive.create_learners_from_sweep(pipeline, sweep, run_folder, internal_shapes=None, *, parallel=True, cleanup=None, resume=False)[source]#

Create adaptive learners for a sweep.

Creates an adaptive.SequenceLearner for each sweep run. These learners have a single iteration that executes the map in parallel. This means that here we rely on the internal parallelization of the pipeline. Each learner is fully independent of the others, and they can be executed in parallel.

Note that this only parallelizes the nodes with a MapSpec, the rest of the nodes are executed in order. Only use this if the sequential execution of the nodes is not a bottleneck.

Parameters:
  • pipeline (Pipeline) – The pipeline to create learners for.

  • sweep (Sweep) – The sweep to create learners for, must generate input dictionaries as expected by pipeline.map.

  • run_folder (str | Path) – The folder to store the run information. Each sweep run will be stored in a subfolder of this folder.

  • internal_shapes (dict[str, int | Literal['?'] | tuple[int | Literal['?'], ...]] | None) – The internal shapes to use for the run, as expected by pipeline.map.

  • parallel (bool) – Whether to run the map in parallel.

  • cleanup (bool | None) –

    Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.

    Whether to clean up the run_folder before running the pipeline. When set, takes priority over resume parameter. cleanup=True is equivalent to resume=False. cleanup=False is equivalent to resume=True.

  • resume (bool) –

    Whether to resume data from a previous run in the run_folder.

    • False (default): Clean up the run_folder before running (fresh start).

    • True: Attempt to load and resume results from a previous run.

    Note: If cleanup is specified, it takes priority over this parameter.

Return type:

tuple[list[SequenceLearner], list[Path]]

Returns:

A tuple of lists where the first list contains the learners and the second list contains the run folders for each sweep run.

pipefunc.map.adaptive.to_adaptive_learner(pipeline, inputs, adaptive_dimensions, adaptive_output, run_folder_template='run_folder_{}', map_kwargs=None, loss_function=None)[source]#

Create an adaptive learner in 1D, 2D, or ND from a pipeline.map.

Parameters:
  • pipeline (Pipeline) – The pipeline to create the learner from.

  • inputs (dict[str, Any]) – The inputs to the pipeline, as passed to pipeline.map. Should not contain the adaptive dimensions.

  • adaptive_dimensions (dict[str, tuple[float, float]]) – A dictionary mapping the adaptive dimensions to their bounds. If the length of the dictionary is 1, a adaptive.Learner1D is created. If the length is 2, a adaptive.Learner2D is created. If the length is 3 or more, a adaptive.LearnerND is created.

  • adaptive_output (str) – The output to adapt to.

  • run_folder_template (str) – The template for the run folder. Must contain a single {} which will be replaced by the adaptive value. For example, "data/my_sweep_{}".

  • map_kwargs (dict[str, Any] | None) – Additional keyword arguments to pass to pipeline.map. For example, the parallel argument can be passed here.

  • loss_function (Callable[..., Any] | None) – The loss function to use for the adaptive learner. The loss_per_interval argument for adaptive.Learner1D, the loss_per_triangle argument for adaptive.Learner2D, and the loss_per_simplex argument for adaptive.LearnerND. If not provided, the default loss function is used.

Return type:

Learner1D | Learner2D | LearnerND

Returns:

A Learner1D, Learner2D, or LearnerND object.