Skip to content

Commit c54c386

Browse files
authored
Add new_uuid method to client (#83)
* Add new_uuid method to client * Update entity messages to use UUIDs as requestIds
1 parent 7bdfbcf commit c54c386

File tree

5 files changed

+118
-7
lines changed

5 files changed

+118
-7
lines changed

durabletask/internal/helpers.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,14 @@ def new_schedule_task_action(id: int, name: str, encoded_input: Optional[str],
196196
))
197197

198198

199-
def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
199+
def new_call_entity_action(id: int,
200+
parent_instance_id: str,
201+
entity_id: EntityInstanceId,
202+
operation: str,
203+
encoded_input: Optional[str],
204+
request_id: str) -> pb.OrchestratorAction:
200205
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationCalled=pb.EntityOperationCalledEvent(
201-
requestId=f"{parent_instance_id}:{id}",
206+
requestId=request_id,
202207
operation=operation,
203208
scheduledTime=None,
204209
input=get_string_value(encoded_input),
@@ -208,9 +213,13 @@ def new_call_entity_action(id: int, parent_instance_id: str, entity_id: EntityIn
208213
)))
209214

210215

211-
def new_signal_entity_action(id: int, entity_id: EntityInstanceId, operation: str, encoded_input: Optional[str]):
216+
def new_signal_entity_action(id: int,
217+
entity_id: EntityInstanceId,
218+
operation: str,
219+
encoded_input: Optional[str],
220+
request_id: str) -> pb.OrchestratorAction:
212221
return pb.OrchestratorAction(id=id, sendEntityMessage=pb.SendEntityMessageAction(entityOperationSignaled=pb.EntityOperationSignaledEvent(
213-
requestId=f"{entity_id}:{id}",
222+
requestId=request_id,
214223
operation=operation,
215224
scheduledTime=None,
216225
input=get_string_value(encoded_input),

durabletask/task.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,22 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
259259
"""
260260
pass
261261

262+
@abstractmethod
263+
def new_uuid(self) -> str:
264+
"""Create a new UUID that is safe for replay within an orchestration or operation.
265+
266+
The default implementation of this method creates a name-based UUID
267+
using the algorithm from RFC 4122 §4.3. The name input used to generate
268+
this value is a combination of the orchestration instance ID, the current UTC datetime,
269+
and an internally managed counter.
270+
271+
Returns
272+
-------
273+
str
274+
New UUID that is safe for replay within an orchestration or operation.
275+
"""
276+
pass
277+
262278
@abstractmethod
263279
def _exit_critical_section(self) -> None:
264280
pass

durabletask/worker.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from types import GeneratorType
1414
from enum import Enum
1515
from typing import Any, Generator, Optional, Sequence, TypeVar, Union
16+
import uuid
1617
from packaging.version import InvalidVersion, parse
1718

1819
import grpc
@@ -33,6 +34,7 @@
3334

3435
TInput = TypeVar("TInput")
3536
TOutput = TypeVar("TOutput")
37+
DATETIME_STRING_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
3638

3739

3840
class ConcurrencyOptions:
@@ -832,6 +834,7 @@ def __init__(self, instance_id: str, registry: _Registry):
832834
# Maps criticalSectionId to task ID
833835
self._entity_lock_id_map: dict[str, int] = {}
834836
self._sequence_number = 0
837+
self._new_uuid_counter = 0
835838
self._current_utc_datetime = datetime(1000, 1, 1)
836839
self._instance_id = instance_id
837840
self._registry = registry
@@ -1166,7 +1169,7 @@ def call_entity_function_helper(
11661169
raise RuntimeError(error_message)
11671170

11681171
encoded_input = shared.to_json(input) if input is not None else None
1169-
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input)
1172+
action = ph.new_call_entity_action(id, self.instance_id, entity_id, operation, encoded_input, self.new_uuid())
11701173
self._pending_actions[id] = action
11711174

11721175
fn_task = task.CompletableTask()
@@ -1189,7 +1192,7 @@ def signal_entity_function_helper(
11891192

11901193
encoded_input = shared.to_json(input) if input is not None else None
11911194

1192-
action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input)
1195+
action = ph.new_signal_entity_action(id, entity_id, operation, encoded_input, self.new_uuid())
11931196
self._pending_actions[id] = action
11941197

11951198
def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId]) -> None:
@@ -1200,7 +1203,7 @@ def lock_entities_function_helper(self, id: int, entities: list[EntityInstanceId
12001203
if not transition_valid:
12011204
raise RuntimeError(error_message)
12021205

1203-
critical_section_id = f"{self.instance_id}:{id:04x}"
1206+
critical_section_id = self.new_uuid()
12041207

12051208
request, target = self._entity_context.emit_acquire_message(critical_section_id, entities)
12061209

@@ -1252,6 +1255,17 @@ def continue_as_new(self, new_input, *, save_events: bool = False) -> None:
12521255

12531256
self.set_continued_as_new(new_input, save_events)
12541257

1258+
def new_uuid(self) -> str:
1259+
NAMESPACE_UUID: str = "9e952958-5e33-4daf-827f-2fa12937b875"
1260+
1261+
uuid_name_value = \
1262+
f"{self._instance_id}" \
1263+
f"_{self.current_utc_datetime.strftime(DATETIME_STRING_FORMAT)}" \
1264+
f"_{self._new_uuid_counter}"
1265+
self._new_uuid_counter += 1
1266+
namespace_uuid = uuid.uuid5(uuid.NAMESPACE_OID, NAMESPACE_UUID)
1267+
return str(uuid.uuid5(namespace_uuid, uuid_name_value))
1268+
12551269

12561270
class ExecutionResults:
12571271
actions: list[pb.OrchestratorAction]

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import threading
77
from datetime import timedelta
8+
import uuid
89

910
import pytest
1011

@@ -532,3 +533,39 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
532533
assert state.serialized_input is None
533534
assert state.serialized_output is None
534535
assert state.serialized_custom_status == "\"foobaz\""
536+
537+
538+
def test_new_uuid():
539+
def noop(_: task.ActivityContext, _1):
540+
pass
541+
542+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
543+
# Assert that two new_uuid calls return different values
544+
results = [ctx.new_uuid(), ctx.new_uuid()]
545+
yield ctx.call_activity("noop")
546+
# Assert that new_uuid still returns a unique value after replay
547+
results.append(ctx.new_uuid())
548+
return results
549+
550+
# Start a worker, which will connect to the sidecar in a background thread
551+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
552+
taskhub=taskhub_name, token_credential=None) as w:
553+
w.add_orchestrator(empty_orchestrator)
554+
w.add_activity(noop)
555+
w.start()
556+
557+
c = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
558+
taskhub=taskhub_name, token_credential=None)
559+
id = c.schedule_new_orchestration(empty_orchestrator)
560+
state = c.wait_for_orchestration_completion(id, timeout=30)
561+
562+
assert state is not None
563+
assert state.name == task.get_name(empty_orchestrator)
564+
assert state.instance_id == id
565+
assert state.failure_details is None
566+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
567+
results = json.loads(state.serialized_output or "\"\"")
568+
assert isinstance(results, list) and len(results) == 3
569+
assert uuid.UUID(results[0]) != uuid.UUID(results[1])
570+
assert uuid.UUID(results[0]) != uuid.UUID(results[2])
571+
assert uuid.UUID(results[1]) != uuid.UUID(results[2])

tests/durabletask/test_orchestration_e2e.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import threading
66
import time
77
from datetime import timedelta
8+
import uuid
89

910
import pytest
1011

@@ -499,3 +500,37 @@ def empty_orchestrator(ctx: task.OrchestrationContext, _):
499500
assert state.serialized_input is None
500501
assert state.serialized_output is None
501502
assert state.serialized_custom_status == "\"foobaz\""
503+
504+
505+
def test_new_uuid():
506+
def noop(_: task.ActivityContext, _1):
507+
pass
508+
509+
def empty_orchestrator(ctx: task.OrchestrationContext, _):
510+
# Assert that two new_uuid calls return different values
511+
results = [ctx.new_uuid(), ctx.new_uuid()]
512+
yield ctx.call_activity("noop")
513+
# Assert that new_uuid still returns a unique value after replay
514+
results.append(ctx.new_uuid())
515+
return results
516+
517+
# Start a worker, which will connect to the sidecar in a background thread
518+
with worker.TaskHubGrpcWorker() as w:
519+
w.add_orchestrator(empty_orchestrator)
520+
w.add_activity(noop)
521+
w.start()
522+
523+
c = client.TaskHubGrpcClient()
524+
id = c.schedule_new_orchestration(empty_orchestrator)
525+
state = c.wait_for_orchestration_completion(id, timeout=30)
526+
527+
assert state is not None
528+
assert state.name == task.get_name(empty_orchestrator)
529+
assert state.instance_id == id
530+
assert state.failure_details is None
531+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
532+
results = json.loads(state.serialized_output or "\"\"")
533+
assert isinstance(results, list) and len(results) == 3
534+
assert uuid.UUID(results[0]) != uuid.UUID(results[1])
535+
assert uuid.UUID(results[0]) != uuid.UUID(results[2])
536+
assert uuid.UUID(results[1]) != uuid.UUID(results[2])

0 commit comments

Comments
 (0)