Skip to content

Commit 7cccf56

Browse files
committed
Format
1 parent 94ec1e8 commit 7cccf56

File tree

3 files changed

+7
-5
lines changed

3 files changed

+7
-5
lines changed

mars/services/task/execution/mars/executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ def __init__(
9595
self._meta_api = meta_api
9696

9797
self._stage_processors = []
98-
self._stage_id_to_processor = weakref.WeakValueDictionary()
98+
self._stage_tile_progresses = []
99+
self._stage_id_to_processor = weakref.WeakValueDictionary()
99100
self._cur_stage_processor = None
100101
self._result_tileables_lifecycle = None
101102
self._subtask_decref_events = dict()

mars/services/task/execution/ray/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1074,8 +1074,8 @@ def gc():
10741074
last_check_slow_time = curr_time
10751075
# Fast to next loop and give it a chance to update object_ref_to_subtask.
10761076
await asyncio.sleep(interval_seconds if len(ready_objects) == 0 else 0)
1077-
1078-
def get_stage_generation_order(self, stage_id: str):
1077+
1078+
def get_stage_generation_order(self, stage_id: str):
10791079
raise NotImplementedError(
10801080
"RayTaskExecutor doesn't support stage generation order."
10811081
)

mars/services/task/supervisor/processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
from ....typing import TileableType, ChunkType
3131
from ....utils import Timer
3232
from ...context import FailOverContext
33-
from ...subtask import SubtaskResult, Subtask
33+
from ...subtask import SubtaskResult, SubtaskGraph, Subtask
3434
from ..core import Task, TaskResult, TaskStatus, new_task_id
3535
from ..execution.api import TaskExecutor, ExecutionChunkResult
3636
from .preprocessor import TaskPreprocessor
@@ -481,8 +481,9 @@ def _finish(self):
481481

482482
def is_done(self) -> bool:
483483
return self.done.is_set()
484+
484485
def get_generation_order(self, stage_id: str):
485486
return self._executor.get_stage_generation_order(stage_id)
486487

487488
def get_subtask(self, chunk_data_key: str):
488-
return self._chunk_data_key_to_subtask.get(chunk_data_key)
489+
return self._chunk_data_key_to_subtask.get(chunk_data_key)

0 commit comments

Comments
 (0)