pipefunc.map.adaptive module#
Provides adaptive integration for pipefunc.
- class pipefunc.map.adaptive.LearnerPipeFunc(learner: SequenceLearner, pipefunc: PipeFunc)[source]#
Bases:
NamedTupleA tuple with
SequenceLearnerandPipeFunc.- learner: SequenceLearner#
Alias for field number 0
- pipefunc: PipeFunc#
Alias for field number 1
- class pipefunc.map.adaptive.AxisIndex(axis: str, idx: int | slice)[source]#
Bases:
NamedTupleA named tuple to store the axis and index for a fixed axis.
- class pipefunc.map.adaptive.LearnersDict(learners_dict=None, run_info=None)[source]#
Bases:
UserDict[tuple[AxisIndex, …] |None,list[list[LearnerPipeFunc]]]A dictionary of adaptive learners for a pipeline as returned by
create_learners.- simple_run()[source]#
Run all the learners in the dictionary in order using
adaptive.runner.simple.- Return type:
- to_slurm_run(default_resources=None, *, ignore_resources=False, returns='kwargs', **slurm_run_kwargs)[source]#
Helper for
adaptive_scheduler.slurm_runwhich returns aadaptive_scheduler.RunManager.- Parameters:
default_resources (
dict|Resources|None) – The default resources to use for the run. Only needed if not all `PipeFunc`s have resources.ignore_resources (
bool) – Whether to ignore the resources of the PipeFunc`s and use the `default_resources for all of them.returns (
Literal['run_manager','kwargs','namedtuple']) – What to return. Can be one of “run_manager”, “kwargs”, or “namedtuple”. If “run_manager”, returns aadaptive_scheduler.RunManager. If “kwargs”, returns a dictionary that can be passed toadaptive_scheduler.slurm_run. If “namedtuple”, returns anAdaptiveSchedulerDetails.slurm_run_kwargs (
Any) – Additional keyword arguments to pass toadaptive_scheduler.slurm_run.
- Return type:
- Returns:
The output depends on the value of returns.
- pipefunc.map.adaptive.create_learners(pipeline, inputs, run_folder, internal_shapes=None, *, storage='file_array', return_output=False, cleanup=None, resume=False, fixed_indices=None, split_independent_axes=False)[source]#
Create adaptive learners for a single
Pipeline.mapcall.Creates learner(s) for each function node in the pipeline graph. The number of learners created for each node depends on the fixed_indices and split_independent_axes parameters:
If fixed_indices is provided or split_independent_axes is False, a single learner is created for each function node (unless resources_scope=”element”).
If split_independent_axes is True, multiple learners are created for each function node, corresponding to different combinations of the independent axes in the pipeline.
Returns a dictionary where the keys represent specific combinations of indices for the independent axes, and the values are lists of lists of learners:
The outer lists represent different stages or generations of the pipeline, where the learners in each stage depend on the outputs of the learners in the previous stage.
The inner lists contain learners that can be executed independently within each stage.
When split_independent_axes is True, each key in the dictionary corresponds to a different combination of indices for the independent axes, allowing for parallel execution across different subsets of the input data.
If fixed_indices is None and split_independent_axes is False, the only key in the dictionary is None, indicating that all indices are being processed together.
- Parameters:
pipeline (
Pipeline) – The pipeline to create learners for.inputs (
dict[str,Any]) – The inputs to the pipeline, the same as passed to pipeline.map.run_folder (
str|Path|None) – The folder to store the run information.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) – The internal shapes to use for the run.storage (
str|dict[str|tuple[str,...],str]) –The storage class to use for storing intermediate and final results. Can be specified as:
A string: Use a single storage class for all outputs.
A dictionary: Specify different storage classes for different outputs.
Use output names as keys and storage class names as values.
Use an empty string
""as a key to set a default storage class.
Available storage classes are registered in
pipefunc.map.storage_registry. Common options include"file_array","dict", and"shared_memory_dict".return_output (
bool) – Whether to return the output of the function in the learner.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) –Whether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.fixed_indices (
dict[str,int|slice] |None) – A dictionary mapping axes names to indices that should be fixed for the run. If not provided, all indices are iterated over.split_independent_axes (
bool) – Whether to split the independent axes into separate learners. Do not use in conjunction withfixed_indices.
See also
LearnersDict.to_slurm_runConvert the learners to variables that can be passed to
adaptive_scheduler.slurm_run.
- Return type:
- Returns:
A dictionary where the keys are the fixed indices, e.g.,
(("i", 0), ("j", 0)), and the values are lists of lists of learners. The learners in the inner list can be executed in parallel, but the outer lists need to be executed in order. Iffixed_indicesisNoneandsplit_independent_axesisFalse, then the only key isNone.
- pipefunc.map.adaptive.create_learners_from_sweep(pipeline, sweep, run_folder, internal_shapes=None, *, parallel=True, cleanup=None, resume=False)[source]#
Create adaptive learners for a sweep.
Creates an
adaptive.SequenceLearnerfor each sweep run. These learners have a single iteration that executes the map in parallel. This means that here we rely on the internal parallelization of the pipeline. Each learner is fully independent of the others, and they can be executed in parallel.Note that this only parallelizes the nodes with a
MapSpec, the rest of the nodes are executed in order. Only use this if the sequential execution of the nodes is not a bottleneck.- Parameters:
pipeline (
Pipeline) – The pipeline to create learners for.sweep (
Sweep) – The sweep to create learners for, must generateinputdictionaries as expected by pipeline.map.run_folder (
str|Path) – The folder to store the run information. Each sweep run will be stored in a subfolder of this folder.internal_shapes (
dict[str,int|Literal['?'] |tuple[int|Literal['?'],...]] |None) – The internal shapes to use for the run, as expected by pipeline.map.parallel (
bool) – Whether to run the map in parallel.Deprecated since version 0.89.0: Use resume parameter instead. Will be removed in version 1.0.0.
Whether to clean up the
run_folderbefore running the pipeline. When set, takes priority overresumeparameter.cleanup=Trueis equivalent toresume=False.cleanup=Falseis equivalent toresume=True.resume (
bool) –Whether to resume data from a previous run in the
run_folder.False(default): Clean up therun_folderbefore running (fresh start).True: Attempt to load and resume results from a previous run.
Note: If
cleanupis specified, it takes priority over this parameter.
- Return type:
- Returns:
A tuple of lists where the first list contains the learners and the second list contains the run folders for each sweep run.
- pipefunc.map.adaptive.to_adaptive_learner(pipeline, inputs, adaptive_dimensions, adaptive_output, run_folder_template='run_folder_{}', map_kwargs=None, loss_function=None)[source]#
Create an adaptive learner in 1D, 2D, or ND from a pipeline.map.
- Parameters:
pipeline (
Pipeline) – The pipeline to create the learner from.inputs (
dict[str,Any]) – The inputs to the pipeline, as passed to pipeline.map. Should not contain the adaptive dimensions.adaptive_dimensions (
dict[str,tuple[float,float]]) – A dictionary mapping the adaptive dimensions to their bounds. If the length of the dictionary is 1, aadaptive.Learner1Dis created. If the length is 2, aadaptive.Learner2Dis created. If the length is 3 or more, aadaptive.LearnerNDis created.adaptive_output (
str) – The output to adapt to.run_folder_template (
str) – The template for the run folder. Must contain a single {} which will be replaced by the adaptive value. For example,"data/my_sweep_{}".map_kwargs (
dict[str,Any] |None) – Additional keyword arguments to pass to pipeline.map. For example, the parallel argument can be passed here.loss_function (
Callable[...,Any] |None) – The loss function to use for the adaptive learner. Theloss_per_intervalargument foradaptive.Learner1D, theloss_per_triangleargument foradaptive.Learner2D, and theloss_per_simplexargument foradaptive.LearnerND. If not provided, the default loss function is used.
- Return type:
Learner1D|Learner2D|LearnerND- Returns:
A
Learner1D,Learner2D, orLearnerNDobject.