Skip to content

Dynamic chunking in coffea.compute #1483

@nsmith-

Description

@nsmith-

@btovar thanks for bringing up the dynamic chunking, I now remember you had implemented that at some point in the TaskVine Executor. So then we re-conceptualize Computable not as in iterator but perhaps more a Generator with a send channel for the Backend to request changes to the iterator in flight. Maybe something like:

from typing import Generator, Protocol, TypeAlias

from coffea.compute.protocol import (
    Computable,
    EmptyResult,
    InputT,
    ResultT,
    WorkElement,
)


class SizedWorkElement(WorkElement[InputT, ResultT], Protocol):
    def __len__(self) -> int:
        "Return the size of this work element in some unit (e.g., number of events)"
        ...


NewSizeRequest: TypeAlias = int


class ResizableComputable(Computable[InputT, ResultT], Protocol):
    def generate(
        self,
    ) -> Generator[SizedWorkElement[InputT, ResultT], NewSizeRequest, None]:
        "Generate work elements, possibly adapting their size based on external factors"
        ...


def compute_now(items: ResizableComputable[InputT, ResultT]) -> ResultT | EmptyResult:
    out = EmptyResult()
    work_gen = items.generate()
    # Let it tell us the initial size
    work_element = next(work_gen, None)
    if work_element is None:
        return out
    while True:
        result = work_element()
        out += result
        # Here we could adapt the size of future work elements based on performance metrics
        # For simplicity, we just request the same size
        try:
            work_element = work_gen.send(len(work_element))
        except StopIteration:
            break
    return out

(a real implementation would have to wrap this into a Task)

This would require a bit of re-imagining how the FailedTaskElement is implemented, it could not anymore just keep track of the index in the iteratable, but the whole materialized WorkElement for later re-computation.

Originally posted by @nsmith- in #1470 (comment)

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

Status

No status

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions