Source code for pipefunc.map.adaptive_scheduler

"""Provides `adaptive_scheduler` integration for `pipefunc`."""

from __future__ import annotations

import functools
from collections import defaultdict
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, NamedTuple

from pipefunc._utils import at_least_tuple, requires
from pipefunc.map._shapes import shape_is_resolved
from pipefunc.resources import Resources

from ._run import _func_kwargs, _load_data, _select_kwargs

if TYPE_CHECKING:
    from collections.abc import Callable

    import adaptive_scheduler
    from adaptive import SequenceLearner
    from adaptive_scheduler.utils import EXECUTOR_TYPES

    from pipefunc._pipefunc import PipeFunc

    from ._run_info import RunInfo
    from .adaptive import LearnersDict


[docs] class AdaptiveSchedulerDetails(NamedTuple): """Details for the adaptive scheduler.""" learners: list[SequenceLearner] fnames: list[Path] dependencies: dict[int, list[int]] nodes: tuple[int | None | Callable[[], int | None], ...] | None cores_per_node: tuple[int | None | Callable[[], int | None], ...] | None extra_scheduler: tuple[list[str] | Callable[[], list[str]], ...] | None partition: tuple[str | None | Callable[[], str | None], ...] | None executor_type: tuple[EXECUTOR_TYPES | Callable[[], EXECUTOR_TYPES], ...] | None = None
[docs] def kwargs(self) -> dict[str, Any]: """Get keyword arguments for `adaptive_scheduler.slurm_run`. Examples -------- >>> learners = pipefunc.map.adaptive.create_learners(pipeline, ...) >>> info = learners.to_slurm_run(...) >>> kwargs = info.kwargs() >>> adaptive_scheduler.slurm_run(**kwargs) """ dct = self._asdict() return {k: v for k, v in dct.items() if not _is_none(v)}
[docs] def run_manager(self, kwargs: Any | None) -> adaptive_scheduler.RunManager: # pragma: no cover """Get a `RunManager` for the adaptive scheduler.""" requires("adaptive_scheduler", reason="adaptive_scheduler", extras="adaptive") import adaptive_scheduler return adaptive_scheduler.slurm_run(**(kwargs or self.kwargs()))
def _is_none(value: Any) -> bool: if value is None: return True if isinstance(value, tuple): return all(_is_none(x) for x in value) return False def _fname(run_folder: Path, func: PipeFunc, index: int) -> Path: output_name = "-".join(at_least_tuple(func.output_name)) return run_folder / "adaptive_scheduler" / output_name / f"{index}.pickle"
[docs] def slurm_run_setup( learners_dict: LearnersDict, default_resources: dict | Resources | None = None, *, ignore_resources: bool = False, ) -> AdaptiveSchedulerDetails: """Set up the arguments for `adaptive_scheduler.slurm_run`.""" assert learners_dict.run_info is not None default_resources = Resources.maybe_from_dict(default_resources) # type: ignore[assignment] assert isinstance(default_resources, Resources) or default_resources is None tracker = _ResourcesContainer(default_resources) assert learners_dict.run_info is not None assert learners_dict.run_info.run_folder is not None run_folder = Path(learners_dict.run_info.run_folder) learners: list[SequenceLearner] = [] fnames: list[Path] = [] dependencies: dict[int, list[int]] = {} for learners_lists in learners_dict.data.values(): prev_indices: list[int] = [] for learner_list in learners_lists: indices: list[int] = [] for learner in learner_list: i = len(learners) indices.append(i) dependencies[i] = prev_indices learners.append(learner.learner) fnames.append(_fname(run_folder, learner.pipefunc, i)) tracker.update( resources=learner.pipefunc.resources # type: ignore[has-type] if not ignore_resources else None, func=learner.pipefunc, learner=learner.learner, run_info=learners_dict.run_info, ) prev_indices = indices if not any(tracker.data["extra_scheduler"]): # all are empty del tracker.data["extra_scheduler"] # Combine cores_per_node and cpus cores_per_node = tracker.get("cpus_per_node") cpus = tracker.get("cpus") if cores_per_node is not None and cpus is not None: assert len(cores_per_node) == len(cpus) cores_per_node = tuple(_or(cpn, _cpus) for cpn, _cpus in zip(cores_per_node, cpus)) elif cpus is not None: cores_per_node = cpus return AdaptiveSchedulerDetails( learners=learners, fnames=fnames, dependencies=dependencies, nodes=tracker.get("nodes"), cores_per_node=cores_per_node, extra_scheduler=tracker.get("extra_scheduler"), partition=tracker.get("partition"), executor_type=tracker.get("executor_type"), )
@dataclass(frozen=True, slots=True) class _ResourcesContainer: default_resources: Resources | None data: dict[str, list[Any]] = field(default_factory=lambda: defaultdict(list)) def get(self, key: str) -> tuple | None: if key not in self.data: return None value = tuple(self.data[key]) if all(x is None for x in value): return None return value def update( self, resources: Resources | Callable[[dict[str, Any]], Resources] | None, func: PipeFunc, learner: SequenceLearner, run_info: RunInfo, ) -> None: if resources is None and self.default_resources is None: msg = "Either all `PipeFunc`s must have resources or `default_resources` must be provided." raise ValueError(msg) r: Resources | Callable[[dict[str, Any]], Resources] if resources is None: assert self.default_resources is not None r = self.default_resources elif self.default_resources is None: r = resources elif callable(resources): r = Resources.maybe_with_defaults(resources, self.default_resources) # type: ignore[assignment] assert r is not None else: r = resources.with_defaults(self.default_resources) index = _get_index(learner, func) for name in ["cpus_per_node", "cpus", "nodes", "partition"]: if callable(r): # Note: we don't know if `resources.{name}` returns None or not value = functools.partial( _getattr_from_resources, name=name, index=index, resources=r, func=func, run_info=run_info, ) else: value = getattr(r, name) self.data[name].append(value) self.data["executor_type"].append(_executor_type(index, r, func, run_info)) self.data["extra_scheduler"].append(_extra_scheduler(index, r, func, run_info)) def _get_index(learner: SequenceLearner, func: PipeFunc) -> int | None: if func.resources_scope == "element" and func.mapspec is not None: # Assumes that the learner is already split up assert len(learner.sequence) == 1 return learner.sequence[0] return None def _eval_resources( index: int | None, resources: Callable[[dict[str, Any]], Resources], func: PipeFunc, run_info: RunInfo, ) -> Resources: store = run_info.init_store() kwargs = _func_kwargs(func, run_info, store) _load_data(kwargs) if index is not None: shape = run_info.resolved_shapes[func.output_name] assert shape_is_resolved(shape) shape_mask = run_info.shape_masks[func.output_name] kwargs = _select_kwargs(func, kwargs, shape, shape_mask, index) return resources(kwargs) def _getattr_from_resources( *, name: str, index: int | None, resources: Callable[[dict[str, Any]], Resources], func: PipeFunc, run_info: RunInfo, ) -> Any | None: resources_instance = _eval_resources(index, resources, func, run_info) return getattr(resources_instance, name) def _extra_scheduler( index: int | None, resources: Resources | Callable[[dict[str, Any]], Resources], func: PipeFunc, run_info: RunInfo, ) -> list[str] | Callable[[], list[str]]: if callable(resources): def _fn() -> list[str]: resources_instance = _eval_resources(index, resources, func, run_info) return _extra_scheduler(index, resources_instance, func, run_info) # type: ignore[return-value] return _fn return __extra_scheduler(resources) def __extra_scheduler(resources: Resources) -> list[str]: extra_scheduler = [] if resources.memory: extra_scheduler.append(f"--mem={resources.memory}") if resources.gpus: extra_scheduler.append(f"--gres=gpu:{resources.gpus}") if resources.time: extra_scheduler.append(f"--time={resources.time}") if resources.extra_args: for key, value in resources.extra_args.items(): if key == "executor_type": # This is handled separately in _executor_type continue extra_scheduler.append(f"--{key}={value}") return extra_scheduler def _executor_type( index: int | None, resources: Resources | Callable[[dict[str, Any]], Resources], func: PipeFunc, run_info: RunInfo, ) -> EXECUTOR_TYPES | Callable[[], EXECUTOR_TYPES]: if callable(resources): def _fn() -> EXECUTOR_TYPES: resources_instance = _eval_resources(index, resources, func, run_info) return _executor_type(index, resources_instance, func, run_info) return _fn return __executor_type(resources) def __executor_type(resources: Resources) -> EXECUTOR_TYPES: executor_type = resources.extra_args.get("executor_type") if executor_type is not None: return resources.extra_args["executor_type"] if resources.parallelization_mode == "internal": return "sequential" return None def _or( value_1: int | Callable[[], int | None] | None, value_2: int | Callable[[], int | None] | None, ) -> int | Callable[[], int | None] | None: if value_1 is None: return value_2 if value_2 is None: return value_1 if callable(value_1) and callable(value_2): return lambda: value_1() or value_2() if callable(value_1) and not callable(value_2): return lambda: value_1() or value_2 if not callable(value_1) and callable(value_2): return lambda: value_1 or value_2() return value_1 or value_2