Skip to content

Commit

Permalink
Reproduce incorrect dataset timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
tillprochaska committed Jun 28, 2024
1 parent 8b522b7 commit 4f7ce2c
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pytest-env==1.1.3
pytest-cov==5.0.0
pytest-mock==3.14.0
wheel==0.43.0
time-machine==2.14.1
89 changes: 89 additions & 0 deletions tests/test_taskqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from unittest.mock import patch
import json
from random import randrange
import time_machine

import pika

Expand Down Expand Up @@ -183,6 +184,94 @@ def did_nack():
assert stage["running"] == 0
assert dataset.is_task_tracked(Task(**body))

def test_timestamps(self):
conn = get_fakeredis()
conn.flushdb()

dataset = Dataset(conn=conn, name=dataset_from_collection_id(123))
status = dataset.get_status()

assert status["start_time"] is None
assert status["last_update"] is None
assert status["end_time"] is None

task_one = Task(
task_id="1",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="1",
)

task_two = Task(
task_id="2",
job_id="abc",
delivery_tag="",
operation="ingest",
context={},
payload={},
priority=5,
collection_id="1",
)

# Adding a task updates `start_time` and `last_update`
with time_machine.travel("2024-01-01T00:00:00"):
dataset.add_task(task_one.task_id, task_one.operation)

status = dataset.get_status()
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-01T00:00:00")
assert status["end_time"] is None

# Once a worker starts processing a task, only `last_update` is updated
with time_machine.travel("2024-01-02T00:00:00"):
dataset.checkout_task(task_one.task_id, task_one.operation)

status = dataset.get_status()
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-02T00:00:00")
assert status["end_time"] is None

# When another task is added, only `last_update` is updated
with time_machine.travel("2024-01-03T00:00:00"):
dataset.checkout_task(task_two.task_id, task_two.operation)

assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-03T00:00:00")
assert status["end_time"] is None

# When the task first task has been processed, `last_update` is updated
with time_machine.travel("2024-01-04T00:00:00"):
dataset.mark_done(task_one)

status = dataset.get_status()
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-04T00:00:00")
assert status["end_time"] is None

# Once all tasks have been processed, `last_update` is updated and
# `end_time` is set
with time_machine.travel("2024-01-05T00:00:00"):
dataset.mark_done(task_two)

status = dataset.get_status()
assert status["start_time"].startswith("2024-01-01T00:00:00")
assert status["last_update"].startswith("2024-01-05T00:00:00")
assert status["end_time"].startswith("2024-01-05T00:00:00")

# Adding a new task to an inactive dataset sets `start_time` and
# resets `end_time`
with time_machine.travel("2024-01-06T00:00:00"):
dataset.add_task("3", "analyze")

status = dataset.get_status()
assert status["start_time"].startswith("2024-01-06T00:00:00")
assert status["last_update"].startswith("2024-01-06T00:00:00")
assert status["end_time"] is None


def test_get_priority_bucket():
redis = get_fakeredis()
Expand Down

0 comments on commit 4f7ce2c

Please sign in to comment.