Skip to content

Commit ae20682

Browse files
committed
Add non-dts tests, update pagination test
1 parent 867bf3b commit ae20682

File tree

2 files changed

+116
-4
lines changed

2 files changed

+116
-4
lines changed

tests/durabletask-azuremanaged/test_dts_batch_actions.py

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

2+
import logging
23
import os
34
import time
45
from datetime import datetime, timedelta, timezone
@@ -140,14 +141,24 @@ def test_get_orchestration_state_by_time_range():
140141
assert len([o for o in orchestrations_outside_range if o.instance_id == id]) == 0
141142

142143

143-
def test_get_orchestration_state_by_max_instance_count():
144+
def test_get_orchestration_state_pagination_succeeds():
145+
# Create a custom handler to capture log messages
146+
log_records = []
147+
148+
class ListHandler(logging.Handler):
149+
def emit(self, record):
150+
log_records.append(record)
151+
152+
handler = ListHandler()
153+
144154
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
145155
taskhub=taskhub_name, token_credential=None) as w:
146156
w.add_orchestrator(empty_orchestrator)
147157
w.start()
148158

149159
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
150-
taskhub=taskhub_name, token_credential=None)
160+
taskhub=taskhub_name, token_credential=None,
161+
log_handler=handler)
151162

152163
# Create at least 3 orchestrations to test the limit
153164
ids = []
@@ -162,8 +173,11 @@ def test_get_orchestration_state_by_max_instance_count():
162173
# Query with max_instance_count=2
163174
orchestrations = c.get_orchestration_state_by(max_instance_count=2)
164175

165-
# Should return exactly 2 instances since we created at least 3
166-
assert len(orchestrations) == 2
176+
# Should return more than 2 instances since we created at least 3
177+
assert len(orchestrations) > 2
178+
# Verify the pagination loop ran by checking for the continuation token log message
179+
assert any("Received continuation token" in record.getMessage() for record in log_records), \
180+
"Expected pagination loop to execute with continuation token"
167181

168182

169183
def test_purge_orchestration():

tests/durabletask/test_batch_actions.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

2+
from datetime import datetime, timedelta, timezone
23
from grpc._channel import _InactiveRpcError
34
import pytest
45
from durabletask import client, task, worker
@@ -13,6 +14,10 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
1314
return "Complete"
1415

1516

17+
def failing_orchestrator(ctx: task.OrchestrationContext, _):
18+
raise Exception("Orchestration failed")
19+
20+
1621
def test_get_all_orchestration_states():
1722
# Start a worker, which will connect to the sidecar in a background thread
1823
with worker.TaskHubGrpcWorker() as w:
@@ -26,3 +31,96 @@ def test_get_all_orchestration_states():
2631
with pytest.raises(_InactiveRpcError) as exec_info:
2732
c.get_all_orchestration_states()
2833
assert "unimplemented" in str(exec_info.value)
34+
35+
36+
def test_get_all_entities():
37+
# Start a worker, which will connect to the sidecar in a background thread
38+
with worker.TaskHubGrpcWorker() as w:
39+
w.add_orchestrator(empty_orchestrator)
40+
w.start()
41+
42+
c = client.TaskHubGrpcClient()
43+
with pytest.raises(_InactiveRpcError) as exec_info:
44+
c.get_all_entities()
45+
assert "unimplemented" in str(exec_info.value)
46+
47+
48+
def test_clean_entity_storage():
49+
# Start a worker, which will connect to the sidecar in a background thread
50+
with worker.TaskHubGrpcWorker() as w:
51+
w.add_orchestrator(empty_orchestrator)
52+
w.start()
53+
54+
c = client.TaskHubGrpcClient()
55+
with pytest.raises(_InactiveRpcError) as exec_info:
56+
c.clean_entity_storage()
57+
assert "unimplemented" in str(exec_info.value)
58+
59+
60+
def test_purge_orchestrations_by_status():
61+
with worker.TaskHubGrpcWorker() as w:
62+
w.add_orchestrator(failing_orchestrator)
63+
w.start()
64+
65+
c = client.TaskHubGrpcClient()
66+
67+
# Schedule and let it fail
68+
failed_id = c.schedule_new_orchestration(failing_orchestrator)
69+
try:
70+
c.wait_for_orchestration_completion(failed_id, timeout=30)
71+
except client.OrchestrationFailedError:
72+
pass # Expected failure
73+
74+
# Verify it exists and is failed
75+
state_before = c.get_orchestration_state(failed_id)
76+
assert state_before is not None
77+
assert state_before.runtime_status == client.OrchestrationStatus.FAILED
78+
79+
# Purge failed orchestrations
80+
result = c.purge_orchestrations_by(
81+
runtime_status=[client.OrchestrationStatus.FAILED],
82+
recursive=True
83+
)
84+
85+
# Verify purge result
86+
assert result.deleted_instance_count >= 1
87+
88+
# Verify the failed orchestration no longer exists
89+
state_after = c.get_orchestration_state(failed_id)
90+
assert state_after is None
91+
92+
93+
def test_purge_orchestrations_by_time_range():
94+
with worker.TaskHubGrpcWorker() as w:
95+
w.add_orchestrator(empty_orchestrator)
96+
w.start()
97+
98+
c = client.TaskHubGrpcClient()
99+
100+
# Get current time
101+
before_creation = datetime.now(timezone.utc) - timedelta(seconds=5)
102+
103+
# Schedule orchestration
104+
id = c.schedule_new_orchestration(empty_orchestrator, input="ToPurgeByTime")
105+
c.wait_for_orchestration_completion(id, timeout=30)
106+
107+
after_creation = datetime.now(timezone.utc) + timedelta(seconds=5)
108+
109+
# Verify it exists
110+
state_before = c.get_orchestration_state(id)
111+
assert state_before is not None
112+
113+
# Purge by time range
114+
result = c.purge_orchestrations_by(
115+
created_time_from=before_creation,
116+
created_time_to=after_creation,
117+
runtime_status=[client.OrchestrationStatus.COMPLETED],
118+
recursive=True
119+
)
120+
121+
# Verify purge result
122+
assert result.deleted_instance_count >= 1
123+
124+
# Verify it no longer exists
125+
state_after = c.get_orchestration_state(id)
126+
assert state_after is None

0 commit comments

Comments
 (0)