Skip to content

Commit 03cb002

Browse files
authored
Merge branch 'main' into andystaples/add-async-client
2 parents e915ce3 + 9313d6f commit 03cb002

File tree

8 files changed

+744
-242
lines changed

8 files changed

+744
-242
lines changed

durabletask/client.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,26 @@ def resume_orchestration(self, instance_id: str) -> None:
268268
self._logger.info(f"Resuming instance '{instance_id}'.")
269269
self._stub.ResumeInstance(req)
270270

271+
def restart_orchestration(self, instance_id: str, *,
272+
restart_with_new_instance_id: bool = False) -> str:
273+
"""Restarts an existing orchestration instance.
274+
275+
Args:
276+
instance_id: The ID of the orchestration instance to restart.
277+
restart_with_new_instance_id: If True, the restarted orchestration will use a new instance ID.
278+
If False (default), the restarted orchestration will reuse the same instance ID.
279+
280+
Returns:
281+
The instance ID of the restarted orchestration.
282+
"""
283+
req = pb.RestartInstanceRequest(
284+
instanceId=instance_id,
285+
restartWithNewInstanceId=restart_with_new_instance_id)
286+
287+
self._logger.info(f"Restarting instance '{instance_id}'.")
288+
res: pb.RestartInstanceResponse = self._stub.RestartInstance(req)
289+
return res.instanceId
290+
271291
def purge_orchestration(self, instance_id: str, recursive: bool = True) -> PurgeInstancesResult:
272292
req = pb.PurgeInstancesRequest(instanceId=instance_id, recursive=recursive)
273293
self._logger.info(f"Purging instance '{instance_id}'.")
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
443b333f4f65a438dc9eb4f090560d232afec4b7
22
fd9369c6a03d6af4e95285e432b7c4e943c06970
3+
026329c53fe6363985655857b9ca848ec7238bd2

durabletask/internal/orchestrator_service_pb2.py

Lines changed: 262 additions & 220 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

durabletask/internal/orchestrator_service_pb2.pyi

Lines changed: 186 additions & 22 deletions
Large diffs are not rendered by default.

durabletask/internal/orchestrator_service_pb2_grpc.py

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ def __init__(self, channel):
6060
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceRequest.SerializeToString,
6161
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceResponse.FromString,
6262
_registered_method=True)
63+
self.RestartInstance = channel.unary_unary(
64+
'/TaskHubSidecarService/RestartInstance',
65+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.SerializeToString,
66+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.FromString,
67+
_registered_method=True)
6368
self.WaitForInstanceStart = channel.unary_unary(
6469
'/TaskHubSidecarService/WaitForInstanceStart',
6570
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceRequest.SerializeToString,
@@ -95,6 +100,11 @@ def __init__(self, channel):
95100
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesRequest.SerializeToString,
96101
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesResponse.FromString,
97102
_registered_method=True)
103+
self.ListInstanceIds = channel.unary_unary(
104+
'/TaskHubSidecarService/ListInstanceIds',
105+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.SerializeToString,
106+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.FromString,
107+
_registered_method=True)
98108
self.PurgeInstances = channel.unary_unary(
99109
'/TaskHubSidecarService/PurgeInstances',
100110
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.PurgeInstancesRequest.SerializeToString,
@@ -170,6 +180,11 @@ def __init__(self, channel):
170180
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskRequest.SerializeToString,
171181
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskResponse.FromString,
172182
_registered_method=True)
183+
self.SkipGracefulOrchestrationTerminations = channel.unary_unary(
184+
'/TaskHubSidecarService/SkipGracefulOrchestrationTerminations',
185+
request_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.SerializeToString,
186+
response_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.FromString,
187+
_registered_method=True)
173188

174189

175190
class TaskHubSidecarServiceServicer(object):
@@ -203,6 +218,13 @@ def RewindInstance(self, request, context):
203218
context.set_details('Method not implemented!')
204219
raise NotImplementedError('Method not implemented!')
205220

221+
def RestartInstance(self, request, context):
222+
"""Restarts an orchestration instance.
223+
"""
224+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
225+
context.set_details('Method not implemented!')
226+
raise NotImplementedError('Method not implemented!')
227+
206228
def WaitForInstanceStart(self, request, context):
207229
"""Waits for an orchestration instance to reach a running or completion state.
208230
"""
@@ -253,6 +275,12 @@ def QueryInstances(self, request, context):
253275
context.set_details('Method not implemented!')
254276
raise NotImplementedError('Method not implemented!')
255277

278+
def ListInstanceIds(self, request, context):
279+
"""Missing associated documentation comment in .proto file."""
280+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
281+
context.set_details('Method not implemented!')
282+
raise NotImplementedError('Method not implemented!')
283+
256284
def PurgeInstances(self, request, context):
257285
"""Missing associated documentation comment in .proto file."""
258286
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
@@ -353,6 +381,14 @@ def AbandonTaskEntityWorkItem(self, request, context):
353381
context.set_details('Method not implemented!')
354382
raise NotImplementedError('Method not implemented!')
355383

384+
def SkipGracefulOrchestrationTerminations(self, request, context):
385+
""""Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
386+
Note that a maximum of 500 orchestrations can be terminated at a time using this method.
387+
"""
388+
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
389+
context.set_details('Method not implemented!')
390+
raise NotImplementedError('Method not implemented!')
391+
356392

357393
def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
358394
rpc_method_handlers = {
@@ -376,6 +412,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
376412
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceRequest.FromString,
377413
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RewindInstanceResponse.SerializeToString,
378414
),
415+
'RestartInstance': grpc.unary_unary_rpc_method_handler(
416+
servicer.RestartInstance,
417+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.FromString,
418+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.SerializeToString,
419+
),
379420
'WaitForInstanceStart': grpc.unary_unary_rpc_method_handler(
380421
servicer.WaitForInstanceStart,
381422
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.GetInstanceRequest.FromString,
@@ -411,6 +452,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
411452
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesRequest.FromString,
412453
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.QueryInstancesResponse.SerializeToString,
413454
),
455+
'ListInstanceIds': grpc.unary_unary_rpc_method_handler(
456+
servicer.ListInstanceIds,
457+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.FromString,
458+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.SerializeToString,
459+
),
414460
'PurgeInstances': grpc.unary_unary_rpc_method_handler(
415461
servicer.PurgeInstances,
416462
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.PurgeInstancesRequest.FromString,
@@ -486,6 +532,11 @@ def add_TaskHubSidecarServiceServicer_to_server(servicer, server):
486532
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskRequest.FromString,
487533
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.AbandonEntityTaskResponse.SerializeToString,
488534
),
535+
'SkipGracefulOrchestrationTerminations': grpc.unary_unary_rpc_method_handler(
536+
servicer.SkipGracefulOrchestrationTerminations,
537+
request_deserializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.FromString,
538+
response_serializer=durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.SerializeToString,
539+
),
489540
}
490541
generic_handler = grpc.method_handlers_generic_handler(
491542
'TaskHubSidecarService', rpc_method_handlers)
@@ -605,6 +656,33 @@ def RewindInstance(request,
605656
metadata,
606657
_registered_method=True)
607658

659+
@staticmethod
660+
def RestartInstance(request,
661+
target,
662+
options=(),
663+
channel_credentials=None,
664+
call_credentials=None,
665+
insecure=False,
666+
compression=None,
667+
wait_for_ready=None,
668+
timeout=None,
669+
metadata=None):
670+
return grpc.experimental.unary_unary(
671+
request,
672+
target,
673+
'/TaskHubSidecarService/RestartInstance',
674+
durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceRequest.SerializeToString,
675+
durabletask_dot_internal_dot_orchestrator__service__pb2.RestartInstanceResponse.FromString,
676+
options,
677+
channel_credentials,
678+
insecure,
679+
call_credentials,
680+
compression,
681+
wait_for_ready,
682+
timeout,
683+
metadata,
684+
_registered_method=True)
685+
608686
@staticmethod
609687
def WaitForInstanceStart(request,
610688
target,
@@ -794,6 +872,33 @@ def QueryInstances(request,
794872
metadata,
795873
_registered_method=True)
796874

875+
@staticmethod
876+
def ListInstanceIds(request,
877+
target,
878+
options=(),
879+
channel_credentials=None,
880+
call_credentials=None,
881+
insecure=False,
882+
compression=None,
883+
wait_for_ready=None,
884+
timeout=None,
885+
metadata=None):
886+
return grpc.experimental.unary_unary(
887+
request,
888+
target,
889+
'/TaskHubSidecarService/ListInstanceIds',
890+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsRequest.SerializeToString,
891+
durabletask_dot_internal_dot_orchestrator__service__pb2.ListInstanceIdsResponse.FromString,
892+
options,
893+
channel_credentials,
894+
insecure,
895+
call_credentials,
896+
compression,
897+
wait_for_ready,
898+
timeout,
899+
metadata,
900+
_registered_method=True)
901+
797902
@staticmethod
798903
def PurgeInstances(request,
799904
target,
@@ -1198,3 +1303,30 @@ def AbandonTaskEntityWorkItem(request,
11981303
timeout,
11991304
metadata,
12001305
_registered_method=True)
1306+
1307+
@staticmethod
1308+
def SkipGracefulOrchestrationTerminations(request,
1309+
target,
1310+
options=(),
1311+
channel_credentials=None,
1312+
call_credentials=None,
1313+
insecure=False,
1314+
compression=None,
1315+
wait_for_ready=None,
1316+
timeout=None,
1317+
metadata=None):
1318+
return grpc.experimental.unary_unary(
1319+
request,
1320+
target,
1321+
'/TaskHubSidecarService/SkipGracefulOrchestrationTerminations',
1322+
durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsRequest.SerializeToString,
1323+
durabletask_dot_internal_dot_orchestrator__service__pb2.SkipGracefulOrchestrationTerminationsResponse.FromString,
1324+
options,
1325+
channel_credentials,
1326+
insecure,
1327+
call_credentials,
1328+
compression,
1329+
wait_for_ready,
1330+
timeout,
1331+
metadata,
1332+
_registered_method=True)

durabletask/internal/proto_task_hub_sidecar_service_stub.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ class ProtoTaskHubSidecarServiceStub(Protocol):
99
StartInstance: Callable[..., Any]
1010
GetInstance: Callable[..., Any]
1111
RewindInstance: Callable[..., Any]
12+
RestartInstance: Callable[..., Any]
1213
WaitForInstanceStart: Callable[..., Any]
1314
WaitForInstanceCompletion: Callable[..., Any]
1415
RaiseEvent: Callable[..., Any]
1516
TerminateInstance: Callable[..., Any]
1617
SuspendInstance: Callable[..., Any]
1718
ResumeInstance: Callable[..., Any]
1819
QueryInstances: Callable[..., Any]
20+
ListInstanceIds: Callable[..., Any]
1921
PurgeInstances: Callable[..., Any]
2022
GetWorkItems: Callable[..., Any]
2123
CompleteActivityTask: Callable[..., Any]
@@ -31,3 +33,4 @@ class ProtoTaskHubSidecarServiceStub(Protocol):
3133
AbandonTaskActivityWorkItem: Callable[..., Any]
3234
AbandonTaskOrchestratorWorkItem: Callable[..., Any]
3335
AbandonTaskEntityWorkItem: Callable[..., Any]
36+
SkipGracefulOrchestrationTerminations: Callable[..., Any]

tests/durabletask-azuremanaged/test_dts_orchestration_e2e.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@
2222
endpoint = os.getenv("ENDPOINT", "http://localhost:8080")
2323

2424

25+
def _get_credential():
26+
"""Returns DefaultAzureCredential if endpoint is https, otherwise None (for emulator)."""
27+
if endpoint.startswith("https://"):
28+
from azure.identity import DefaultAzureCredential
29+
return DefaultAzureCredential()
30+
return None
31+
32+
2533
def test_empty_orchestration():
2634

2735
invoked = False
@@ -371,6 +379,75 @@ def child(ctx: task.OrchestrationContext, _):
371379
assert state is None
372380

373381

382+
def test_restart_with_same_instance_id():
383+
def orchestrator(ctx: task.OrchestrationContext, _):
384+
result = yield ctx.call_activity(say_hello, input="World")
385+
return result
386+
387+
def say_hello(ctx: task.ActivityContext, input: str):
388+
return f"Hello, {input}!"
389+
390+
credential = _get_credential()
391+
392+
# Start a worker, which will connect to the sidecar in a background thread
393+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
394+
taskhub=taskhub_name, token_credential=credential) as w:
395+
w.add_orchestrator(orchestrator)
396+
w.add_activity(say_hello)
397+
w.start()
398+
399+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
400+
taskhub=taskhub_name, token_credential=credential)
401+
id = task_hub_client.schedule_new_orchestration(orchestrator)
402+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
403+
assert state is not None
404+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
405+
assert state.serialized_output == json.dumps("Hello, World!")
406+
407+
# Restart the orchestration with the same instance ID
408+
restarted_id = task_hub_client.restart_orchestration(id)
409+
assert restarted_id == id
410+
411+
state = task_hub_client.wait_for_orchestration_completion(restarted_id, timeout=30)
412+
assert state is not None
413+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
414+
assert state.serialized_output == json.dumps("Hello, World!")
415+
416+
417+
def test_restart_with_new_instance_id():
418+
def orchestrator(ctx: task.OrchestrationContext, _):
419+
result = yield ctx.call_activity(say_hello, input="World")
420+
return result
421+
422+
def say_hello(ctx: task.ActivityContext, input: str):
423+
return f"Hello, {input}!"
424+
425+
credential = _get_credential()
426+
427+
# Start a worker, which will connect to the sidecar in a background thread
428+
with DurableTaskSchedulerWorker(host_address=endpoint, secure_channel=True,
429+
taskhub=taskhub_name, token_credential=credential) as w:
430+
w.add_orchestrator(orchestrator)
431+
w.add_activity(say_hello)
432+
w.start()
433+
434+
task_hub_client = DurableTaskSchedulerClient(host_address=endpoint, secure_channel=True,
435+
taskhub=taskhub_name, token_credential=credential)
436+
id = task_hub_client.schedule_new_orchestration(orchestrator)
437+
state = task_hub_client.wait_for_orchestration_completion(id, timeout=30)
438+
assert state is not None
439+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
440+
441+
# Restart the orchestration with a new instance ID
442+
restarted_id = task_hub_client.restart_orchestration(id, restart_with_new_instance_id=True)
443+
assert restarted_id != id
444+
445+
state = task_hub_client.wait_for_orchestration_completion(restarted_id, timeout=30)
446+
assert state is not None
447+
assert state.runtime_status == client.OrchestrationStatus.COMPLETED
448+
assert state.serialized_output == json.dumps("Hello, World!")
449+
450+
374451
# def test_continue_as_new():
375452
# all_results = []
376453

0 commit comments

Comments
 (0)