Skip to content

Commit

Permalink
feat: use asset embedded in events for task processing (#1104)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexandrePicosson authored Jun 14, 2022
1 parent c218ee6 commit a5ae5bd
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 11 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- Filtering on compute plan metadata (#1043)

### Changed

- use tasks from event for task processing (#1104)

## [0.18.0] 2022-06-14

### Removed
Expand Down
17 changes: 6 additions & 11 deletions backend/events/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,19 @@ def on_computetask_event(payload):
asset_key = payload["asset_key"]
channel_name = payload["channel"]
event_kind = payload["event_kind"]
metadata = payload["metadata"]
targeted_organisation = metadata["worker"]
task = payload["compute_task"]
targeted_organisation = task["worker"]

logger.info("Processing task", asset_key=asset_key, kind=event_kind, status=metadata["status"])
logger.info("Processing task", asset_key=asset_key, kind=event_kind, status=task["status"])

event_task_status = computetask_pb2.ComputeTaskStatus.Value(metadata["status"])
event_task_status = computetask_pb2.ComputeTaskStatus.Value(task["status"])

if event_task_status in [
computetask_pb2.STATUS_DONE,
computetask_pb2.STATUS_CANCELED,
computetask_pb2.STATUS_FAILED,
]:
with get_orchestrator_client(channel_name) as client:
task = client.query_task(asset_key)

# Handle intermediary models
models = []
for parent_key in task["parent_task_keys"]:
Expand Down Expand Up @@ -88,15 +86,12 @@ def on_computetask_event(payload):
"Skipping task: this organisation is not the targeted organisation",
my_organisation=my_organisation,
targeted_organisation=targeted_organisation,
assert_key=asset_key,
asset_key=asset_key,
kind=event_kind,
status=metadata["status"],
status=task["status"],
)
return

with get_orchestrator_client(channel_name) as client:
task = client.query_task(asset_key)

queue_prepare_task(channel_name, task=task)


Expand Down

0 comments on commit a5ae5bd

Please sign in to comment.