Skip to content

Commit

Permalink
Issue #150/#156 integrate DeepGraphSplitter in AggregatorBatchJobs._c…
Browse files Browse the repository at this point in the history
…reate_crossbackend_job

and integrate it in test coverage of TestCrossBackendSplitting
  • Loading branch information
soxofaan committed Sep 19, 2024
1 parent 468b2a9 commit 1c82648
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 102 deletions.
82 changes: 59 additions & 23 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
streaming_flask_response,
)
from openeo_aggregator.constants import (
CROSSBACKEND_GRAPH_SPLIT_METHOD,
JOB_OPTION_FORCE_BACKEND,
JOB_OPTION_SPLIT_STRATEGY,
JOB_OPTION_TILE_GRID,
Expand All @@ -102,6 +103,7 @@
from openeo_aggregator.partitionedjobs import PartitionedJob
from openeo_aggregator.partitionedjobs.crossbackend import (
CrossBackendJobSplitter,
DeepGraphSplitter,
LoadCollectionGraphSplitter,
)
from openeo_aggregator.partitionedjobs.splitting import FlimsySplitter, TileGridSplitter
Expand Down Expand Up @@ -806,25 +808,33 @@ def create_job(
if "process_graph" not in process:
raise ProcessGraphMissingException()

# Coverage of messy "split_strategy" job option https://github.com/Open-EO/openeo-aggregator/issues/156
# TODO: better, more generic/specific job_option(s)?
if job_options and (job_options.get(JOB_OPTION_SPLIT_STRATEGY) or job_options.get(JOB_OPTION_TILE_GRID)):
if job_options.get(JOB_OPTION_SPLIT_STRATEGY) == "crossbackend":
# TODO this is temporary feature flag to trigger "crossbackend" splitting
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY)
# TODO: this job option "tile_grid" is quite generic and not very explicit about being a job splitting approach
tile_grid = (job_options or {}).get(JOB_OPTION_TILE_GRID)

crossbackend_mode = (
split_strategy == "crossbackend" or isinstance(split_strategy, dict) and "crossbackend" in split_strategy
)
spatial_split_mode = tile_grid or split_strategy == "flimsy"

if crossbackend_mode:
return self._create_crossbackend_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
elif spatial_split_mode:
return self._create_partitioned_job(
user_id=user_id,
process=process,
api_version=api_version,
metadata=metadata,
job_options=job_options,
)
else:
return self._create_job_standard(
user_id=user_id,
Expand Down Expand Up @@ -939,16 +949,42 @@ def _create_crossbackend_job(
if not self.partitioned_job_tracker:
raise FeatureUnsupportedException(message="Partitioned job tracking is not supported")

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]
split_strategy = (job_options or {}).get(JOB_OPTION_SPLIT_STRATEGY)
if split_strategy == "crossbackend":
graph_split_method = CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE
elif isinstance(split_strategy, dict) and isinstance(split_strategy.get("crossbackend"), dict):
graph_split_method = split_strategy.get("crossbackend", {}).get(
"method", CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE
)
else:
raise ValueError(f"Invalid split strategy {split_strategy!r}")

_log.info(f"_create_crossbackend_job: {graph_split_method=} from {split_strategy=}")
if graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.SIMPLE:

def backend_for_collection(collection_id) -> str:
return self._catalog.get_backends_for_collection(cid=collection_id)[0]

splitter = CrossBackendJobSplitter(
graph_splitter=LoadCollectionGraphSplitter(
graph_splitter = LoadCollectionGraphSplitter(
backend_for_collection=backend_for_collection,
# TODO: job option for `always_split` feature?
always_split=True,
)
)
elif graph_split_method == CROSSBACKEND_GRAPH_SPLIT_METHOD.DEEP:

def supporting_backends(node_id: str, node: dict) -> Union[List[str], None]:
if node["process_id"] == "load_collection":
collection_id = node["arguments"]["id"]
return self._catalog.get_backends_for_collection(cid=collection_id)

graph_splitter = DeepGraphSplitter(
supporting_backends=supporting_backends,
primary_backend=split_strategy.get("crossbackend", {}).get("primary_backend"),
)
else:
raise ValueError(f"Invalid graph split strategy {graph_split_method!r}")

splitter = CrossBackendJobSplitter(graph_splitter=graph_splitter)

pjob_id = self.partitioned_job_tracker.create_crossbackend_pjob(
user_id=user_id, process=process, metadata=metadata, job_options=job_options, splitter=splitter
Expand Down
6 changes: 6 additions & 0 deletions src/openeo_aggregator/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@

# Experimental feature to force a certain upstream back-end through job options
JOB_OPTION_FORCE_BACKEND = "_agg_force_backend"


class CROSSBACKEND_GRAPH_SPLIT_METHOD:
# Poor-man's StrEnum
SIMPLE = "simple"
DEEP = "deep"
80 changes: 43 additions & 37 deletions src/openeo_aggregator/partitionedjobs/crossbackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ def next_nodes(node_id: NodeId) -> Iterable[NodeId]:

return up, down

def produce_split_locations(self, limit: int = 4) -> Iterator[List[NodeId]]:
def produce_split_locations(self, limit: int = 10) -> Iterator[List[NodeId]]:
"""
Produce disjoint subgraphs that can be processed independently.
Expand Down Expand Up @@ -811,49 +811,55 @@ class DeepGraphSplitter(ProcessGraphSplitterInterface):
More advanced graph splitting (compared to just splitting off `load_collection` nodes)
"""

def __init__(self, supporting_backends: SupportingBackendsMapper):
def __init__(self, supporting_backends: SupportingBackendsMapper, primary_backend: Optional[BackendId] = None):
self._supporting_backends_mapper = supporting_backends
self._primary_backend = primary_backend

def split(self, process_graph: FlatPG) -> _PGSplitResult:
graph = _GraphViewer.from_flat_graph(
flat_graph=process_graph, supporting_backends=self._supporting_backends_mapper
)

# TODO: make picking "optimal" split location set a bit more deterministic (e.g. sort first)
(split_nodes,) = graph.produce_split_locations(limit=1)
_log.debug(f"DeepGraphSplitter.split: split nodes: {split_nodes=}")

secondary_graphs: List[_SubGraphData] = []
graph_to_split = graph
for split_node_id in split_nodes:
up, down = graph_to_split.split_at(split_node_id=split_node_id)
# Use upstream graph as secondary graph
node_ids = set(nid for nid, _ in up.iter_nodes())
backend_candidates = up.get_backend_candidates_for_node_set(node_ids)
# TODO: better backend selection?
# TODO handle case where backend_candidates is None?
backend_id = sorted(backend_candidates)[0]
_log.debug(f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}")
secondary_graphs.append(
_SubGraphData(
split_node=split_node_id,
node_ids=node_ids,
backend_id=backend_id,
for split_nodes in graph.produce_split_locations():
_log.debug(f"DeepGraphSplitter.split: evaluating split nodes: {split_nodes=}")

secondary_graphs: List[_SubGraphData] = []
graph_to_split = graph
for split_node_id in split_nodes:
up, down = graph_to_split.split_at(split_node_id=split_node_id)
# Use upstream graph as secondary graph
node_ids = set(nid for nid, _ in up.iter_nodes())
backend_candidates = up.get_backend_candidates_for_node_set(node_ids)
# TODO: better backend selection?
# TODO handle case where backend_candidates is None?
backend_id = sorted(backend_candidates)[0]
_log.debug(
f"DeepGraphSplitter.split: secondary graph: from {split_node_id=}: {backend_id=} {node_ids=}"
)
secondary_graphs.append(
_SubGraphData(
split_node=split_node_id,
node_ids=node_ids,
backend_id=backend_id,
)
)
)

# Prepare for next split (if any)
graph_to_split = down

# Remaining graph is primary graph
primary_graph = graph_to_split
primary_node_ids = set(n for n, _ in primary_graph.iter_nodes())
backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids)
primary_backend_id = sorted(backend_candidates)[0]
_log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}")
# Prepare for next split (if any)
graph_to_split = down

# Remaining graph is primary graph
primary_graph = graph_to_split
primary_node_ids = set(n for n, _ in primary_graph.iter_nodes())
backend_candidates = primary_graph.get_backend_candidates_for_node_set(primary_node_ids)
primary_backend_id = sorted(backend_candidates)[0]
_log.debug(f"DeepGraphSplitter.split: primary graph: {primary_backend_id=} {primary_node_ids=}")

if self._primary_backend is None or primary_backend_id == self._primary_backend:
_log.debug(f"DeepGraphSplitter.split: current split matches constraints")
return _PGSplitResult(
primary_node_ids=primary_node_ids,
primary_backend_id=primary_backend_id,
secondary_graphs=secondary_graphs,
)

return _PGSplitResult(
primary_node_ids=primary_node_ids,
primary_backend_id=primary_backend_id,
secondary_graphs=secondary_graphs,
)
raise GraphSplitException("DeepGraphSplitter.split: No matching split found.")
2 changes: 2 additions & 0 deletions src/openeo_aggregator/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ def approx_now(abs=10):
class ApproxStr:
"""Pytest helper in style of `pytest.approx`, but for string checking, based on prefix, body and or suffix"""

# TODO: port to dirty_equals

def __init__(
self,
prefix: Optional[str] = None,
Expand Down
Loading

0 comments on commit 1c82648

Please sign in to comment.