Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a dataclass for indexer worker TaskStatus #4601

Merged
merged 4 commits into from
Jul 19, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions indexer_worker/indexer_worker/tasks.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Simple in-memory tracking of executed tasks."""

import datetime

from dataclasses import dataclass
from typing import Any

def _time_fmt(timestamp: int) -> str | None:
"""
Expand All @@ -15,6 +16,14 @@ def _time_fmt(timestamp: int) -> str | None:
return None
return str(datetime.datetime.utcfromtimestamp(timestamp))

@dataclass
class TaskInfo:
task: Any
start_time: Any
model: str
target_index: str
finish_time: Any
progress: Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The types can be more precise here. Below, the times are passed to the _time_fmt function that accepts integers, and probably progress is also an integer; @stacimc can confirm.

Suggested change
task: Any
start_time: Any
model: str
target_index: str
finish_time: Any
progress: Any
task: Any
start_time: int
model: str
target_index: str
finish_time: int
progress: Any

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @krysal , finish_time and progress has the same type: <class 'multiprocessing.sharedctypes.Synchronized'> .
But the value of progress in the api response is 100.0. So should it be set as a float ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

float would be appropriate for the timestamps (and you can change the type of the timestamp parameter in _time_fmt to match, actually!)

progress and finish_time are shared variables using multiprocessing.Value so I believe the correct type would be Synchronized[float]

Copy link
Contributor Author

@akshay-km akshay-km Jul 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @stacimc When using Synchronized[float] , I am getting a TypeError: type 'Synchronized' is not subscriptable because Synchronized is not a type that can be parameterized in the typing module. So I have used Synchronized as type of progress and finish_time. Also used float as type for timestamps including for the parameter in _time_fmt_ function.
Have made a new commit with these changes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting! It looks like Synchronized is in the typeshed package/not available at runtime. I think the fix should be to add from __future__ import annotations to the top of the file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great!
Learned something new :)


class TaskTracker:
def __init__(self):
Expand All @@ -27,10 +36,13 @@ def add_task(self, task_id: str, **kwargs):
:param task: the task being performed
:param task_id: the UUID of the task
"""
self.tasks[task_id] = {
"start_time": datetime.datetime.utcnow().timestamp(),
} | kwargs
task_info = TaskInfo(
start_time=datetime.datetime.utcnow().timestamp(),
**kwargs
)

self.tasks[task_id] = task_info

def get_task_status(self, task_id: str) -> dict:
"""
Get the status of a single task with the given task ID.
Expand All @@ -39,12 +51,12 @@ def get_task_status(self, task_id: str) -> dict:
:return: response dictionary containing all relevant info about the task
"""
task_info = self.tasks[task_id]
active = task_info["task"].is_alive()
model = task_info["model"]
target_index = task_info["target_index"]
start_time = task_info["start_time"]
finish_time = task_info["finish_time"].value
progress = task_info["progress"].value
active = task_info.task.is_alive()
model = task_info.model
target_index = task_info.target_index
start_time = task_info.start_time
finish_time = task_info.finish_time.value
progress = task_info.progress.value

return {
"task_id": task_id,
Expand Down
Loading