Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions CodeEntropy/levels/execution/chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""Frame chunking helpers for map-reduce execution."""

from __future__ import annotations


def chunk_frame_indices(
frame_indices: list[int],
chunk_size: int,
) -> list[tuple[int, ...]]:
"""Split frame indices into deterministic fixed-size chunks.

Args:
frame_indices: Ordered selected frame indices to split.
chunk_size: Maximum number of frame indices per chunk.

Returns:
A list of frame-index tuples preserving input order.

Raises:
ValueError: If ``chunk_size`` is less than one.
"""
if chunk_size < 1:
raise ValueError("chunk_size must be >= 1")

return [
tuple(frame_indices[i : i + chunk_size])
for i in range(0, len(frame_indices), chunk_size)
]
84 changes: 84 additions & 0 deletions CodeEntropy/levels/execution/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
"""Internal policy for hierarchy-level frame map-reduce execution.

Users provide compute resources. CodeEntropy keeps scheduling choices such as
chunk size and in-flight task limits internal so the public configuration remains
stable and simple.
"""

from __future__ import annotations

import math
from dataclasses import dataclass
from typing import Any


@dataclass(frozen=True)
class ExecutionPolicy:
"""Internal policy for scalable, deterministic frame execution."""

target_frame_chunks_per_worker: int = 2
min_frame_chunk_size: int = 1
max_frame_chunk_size: int = 32
max_frame_in_flight_multiplier: int = 1

def requested_worker_count(self, shared_data: dict[str, Any]) -> int:
"""Return the requested worker-process count.

Args:
shared_data: Shared workflow data that may contain ``args`` with local Dask
or HPC worker settings.

Returns:
The requested worker count, clamped to at least one.
"""
args = shared_data.get("args")

dask_workers = getattr(args, "dask_workers", None)
if dask_workers is not None:
return max(1, int(dask_workers))

if bool(getattr(args, "hpc", False)):
hpc_nodes = max(1, int(getattr(args, "hpc_nodes", 1) or 1))
hpc_processes = max(1, int(getattr(args, "hpc_processes", 1) or 1))
return hpc_nodes * hpc_processes

return 1

def frame_chunk_size(self, shared_data: dict[str, Any], *, n_frames: int) -> int:
"""Choose a deterministic frame chunk size.

Args:
shared_data: Shared workflow data used to infer requested worker count.
n_frames: Number of selected frames for the current run.

Returns:
The frame chunk size clamped between the policy minimum and maximum.
"""
n_frames = max(1, int(n_frames))
workers = self.requested_worker_count(shared_data)
target_chunks = max(1, workers * int(self.target_frame_chunks_per_worker))
chunk_size = math.ceil(n_frames / target_chunks)

return max(
int(self.min_frame_chunk_size),
min(int(self.max_frame_chunk_size), int(chunk_size)),
)

def max_frame_in_flight_tasks(
self,
shared_data: dict[str, Any],
*,
n_chunks: int,
) -> int:
"""Choose the maximum number of active frame-chunk tasks.

Args:
shared_data: Shared workflow data used to infer requested worker count.
n_chunks: Number of frame chunks available for submission.

Returns:
The number of frame-chunk tasks allowed to be active at once.
"""
workers = self.requested_worker_count(shared_data)
max_in_flight = workers * int(self.max_frame_in_flight_multiplier)
return max(1, min(int(n_chunks), int(max_in_flight)))
Loading