Skip to content

Commit 7357abe

Browse files
committed
Merge changes from base branch (andystaples/add-batch-actions)
2 parents aa035cb + 1791b65 commit 7357abe

File tree

4 files changed

+118
-438
lines changed

4 files changed

+118
-438
lines changed

durabletask/client.py

Lines changed: 102 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,16 @@ def raise_if_failed(self):
5757
self.failure_details)
5858

5959

60+
@dataclass
6061
class PurgeInstancesResult:
61-
def __init__(self, deleted_instance_count: int, is_complete: bool):
62-
self.deleted_instance_count = deleted_instance_count
63-
self.is_complete = is_complete
62+
deleted_instance_count: int
63+
is_complete: bool
6464

6565

66+
@dataclass
6667
class CleanEntityStorageResult:
67-
def __init__(self, empty_entities_removed: int, orphaned_locks_released: int):
68-
self.empty_entities_removed = empty_entities_removed
69-
self.orphaned_locks_released = orphaned_locks_released
68+
empty_entities_removed: int
69+
orphaned_locks_released: int
7070

7171

7272
class OrchestrationFailedError(Exception):
@@ -153,7 +153,7 @@ def schedule_new_orchestration(self, orchestrator: Union[task.Orchestrator[TInpu
153153
req = pb.CreateInstanceRequest(
154154
name=name,
155155
instanceId=instance_id if instance_id else uuid.uuid4().hex,
156-
input=helpers.get_string_value(shared.to_json(input)),
156+
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
157157
scheduledStartTimestamp=helpers.new_timestamp(start_at) if start_at else None,
158158
version=helpers.get_string_value(version if version else self.default_version),
159159
orchestrationIdReusePolicy=reuse_id_policy,
@@ -189,31 +189,42 @@ def get_orchestration_state_by(self,
189189
_continuation_token: Optional[pb2.StringValue] = None
190190
) -> List[OrchestrationState]:
191191
if max_instance_count is None:
192-
# DTS backend does not behave well with max_instance_count = None, so we set to max 32-bit signed value
192+
# Some backends do not behave well with max_instance_count = None, so we set to max 32-bit signed value
193193
max_instance_count = (1 << 31) - 1
194-
req = pb.QueryInstancesRequest(
195-
query=pb.InstanceQuery(
196-
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
197-
createdTimeFrom=helpers.new_timestamp(created_time_from) if created_time_from else None,
198-
createdTimeTo=helpers.new_timestamp(created_time_to) if created_time_to else None,
199-
maxInstanceCount=max_instance_count,
200-
fetchInputsAndOutputs=fetch_inputs_and_outputs,
201-
continuationToken=_continuation_token
202-
)
203-
)
204-
resp: pb.QueryInstancesResponse = self._stub.QueryInstances(req)
205-
states = [parse_orchestration_state(res) for res in resp.orchestrationState]
206-
# Check the value for continuationToken - none or "0" indicates that there are no more results.
207-
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
208-
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next list of instances...")
209-
states += self.get_orchestration_state_by(
210-
created_time_from,
211-
created_time_to,
212-
runtime_status,
213-
max_instance_count,
214-
fetch_inputs_and_outputs,
215-
_continuation_token=resp.continuationToken
194+
195+
self._logger.info(f"Querying orchestration instances with filters - "
196+
f"created_time_from={created_time_from}, "
197+
f"created_time_to={created_time_to}, "
198+
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
199+
f"max_instance_count={max_instance_count}, "
200+
f"fetch_inputs_and_outputs={fetch_inputs_and_outputs}, "
201+
f"continuation_token={_continuation_token.value if _continuation_token else None}")
202+
203+
states = []
204+
205+
while True:
206+
req = pb.QueryInstancesRequest(
207+
query=pb.InstanceQuery(
208+
runtimeStatus=[status.value for status in runtime_status] if runtime_status else None,
209+
createdTimeFrom=helpers.new_timestamp(created_time_from) if created_time_from else None,
210+
createdTimeTo=helpers.new_timestamp(created_time_to) if created_time_to else None,
211+
maxInstanceCount=max_instance_count,
212+
fetchInputsAndOutputs=fetch_inputs_and_outputs,
213+
continuationToken=_continuation_token
214+
)
216215
)
216+
resp: pb.QueryInstancesResponse = self._stub.QueryInstances(req)
217+
states += [parse_orchestration_state(res) for res in resp.orchestrationState]
218+
# Check the value for continuationToken - none or "0" indicates that there are no more results.
219+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
220+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next list of instances...")
221+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
222+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
223+
break
224+
_continuation_token = resp.continuationToken
225+
else:
226+
break
227+
217228
states = [state for state in states if state is not None] # Filter out any None values
218229
return states
219230

@@ -264,7 +275,8 @@ def raise_orchestration_event(self, instance_id: str, event_name: str, *,
264275
req = pb.RaiseEventRequest(
265276
instanceId=instance_id,
266277
name=event_name,
267-
input=helpers.get_string_value(shared.to_json(data)))
278+
input=helpers.get_string_value(shared.to_json(data) if data is not None else None)
279+
)
268280

269281
self._logger.info(f"Raising event '{event_name}' for instance '{instance_id}'.")
270282
self._stub.RaiseEvent(req)
@@ -274,7 +286,7 @@ def terminate_orchestration(self, instance_id: str, *,
274286
recursive: bool = True):
275287
req = pb.TerminateRequest(
276288
instanceId=instance_id,
277-
output=helpers.get_string_value(shared.to_json(output)),
289+
output=helpers.get_string_value(shared.to_json(output) if output is not None else None),
278290
recursive=recursive)
279291

280292
self._logger.info(f"Terminating instance '{instance_id}'.")
@@ -301,6 +313,11 @@ def purge_orchestrations_by(self,
301313
created_time_to: Optional[datetime] = None,
302314
runtime_status: Optional[List[OrchestrationStatus]] = None,
303315
recursive: bool = False) -> PurgeInstancesResult:
316+
self._logger.info("Purging orchestrations by filter: "
317+
f"created_time_from={created_time_from}, "
318+
f"created_time_to={created_time_to}, "
319+
f"runtime_status={[str(status) for status in runtime_status] if runtime_status else None}, "
320+
f"recursive={recursive}")
304321
resp: pb.PurgeInstancesResponse = self._stub.PurgeInstances(pb.PurgeInstancesRequest(
305322
instanceId=None,
306323
purgeInstanceFilter=pb.PurgeInstanceFilter(
@@ -319,7 +336,7 @@ def signal_entity(self,
319336
req = pb.SignalEntityRequest(
320337
instanceId=str(entity_instance_id),
321338
name=operation_name,
322-
input=helpers.get_string_value(shared.to_json(input)),
339+
input=helpers.get_string_value(shared.to_json(input) if input is not None else None),
323340
requestId=str(uuid.uuid4()),
324341
scheduledTime=None,
325342
parentTraceContext=None,
@@ -362,31 +379,38 @@ def get_entities_by(self,
362379
page_size: Optional[int] = None,
363380
_continuation_token: Optional[pb2.StringValue] = None
364381
) -> List[EntityMetadata]:
365-
self._logger.info("Getting entities")
366-
query_request = pb.QueryEntitiesRequest(
367-
query=pb.EntityQuery(
368-
instanceIdStartsWith=helpers.get_string_value(instance_id_starts_with),
369-
lastModifiedFrom=helpers.new_timestamp(last_modified_from) if last_modified_from else None,
370-
lastModifiedTo=helpers.new_timestamp(last_modified_to) if last_modified_to else None,
371-
includeState=include_state,
372-
includeTransient=include_transient,
373-
pageSize=helpers.get_int_value(page_size),
374-
continuationToken=_continuation_token
375-
)
376-
)
377-
resp: pb.QueryEntitiesResponse = self._stub.QueryEntities(query_request)
378-
entities = [EntityMetadata.from_entity_metadata(entity, query_request.query.includeState) for entity in resp.entities]
379-
if resp.continuationToken and resp.continuationToken.value != "0":
380-
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next page of entities...")
381-
entities += self.get_entities_by(
382-
instance_id_starts_with=instance_id_starts_with,
383-
last_modified_from=last_modified_from,
384-
last_modified_to=last_modified_to,
385-
include_state=include_state,
386-
include_transient=include_transient,
387-
page_size=page_size,
388-
_continuation_token=resp.continuationToken
382+
self._logger.info(f"Retrieving entities by filter: "
383+
f"instance_id_starts_with={instance_id_starts_with}, "
384+
f"last_modified_from={last_modified_from}, "
385+
f"last_modified_to={last_modified_to}, "
386+
f"include_state={include_state}, "
387+
f"include_transient={include_transient}, "
388+
f"page_size={page_size}")
389+
390+
entities = []
391+
392+
while True:
393+
query_request = pb.QueryEntitiesRequest(
394+
query=pb.EntityQuery(
395+
instanceIdStartsWith=helpers.get_string_value(instance_id_starts_with),
396+
lastModifiedFrom=helpers.new_timestamp(last_modified_from) if last_modified_from else None,
397+
lastModifiedTo=helpers.new_timestamp(last_modified_to) if last_modified_to else None,
398+
includeState=include_state,
399+
includeTransient=include_transient,
400+
pageSize=helpers.get_int_value(page_size),
401+
continuationToken=_continuation_token
402+
)
389403
)
404+
resp: pb.QueryEntitiesResponse = self._stub.QueryEntities(query_request)
405+
entities += [EntityMetadata.from_entity_metadata(entity, query_request.query.includeState) for entity in resp.entities]
406+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
407+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, fetching next page of entities...")
408+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
409+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
410+
break
411+
_continuation_token = resp.continuationToken
412+
else:
413+
break
390414
return entities
391415

392416
def clean_entity_storage(self,
@@ -395,23 +419,27 @@ def clean_entity_storage(self,
395419
_continuation_token: Optional[pb2.StringValue] = None
396420
) -> CleanEntityStorageResult:
397421
self._logger.info("Cleaning entity storage")
398-
req = pb.CleanEntityStorageRequest(
399-
removeEmptyEntities=remove_empty_entities,
400-
releaseOrphanedLocks=release_orphaned_locks,
401-
continuationToken=_continuation_token
402-
)
403-
resp: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
404-
empty_entities_removed = resp.emptyEntitiesRemoved
405-
orphaned_locks_released = resp.orphanedLocksReleased
406-
407-
if resp.continuationToken and resp.continuationToken.value != "0":
408-
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, cleaning next page...")
409-
next_result = self.clean_entity_storage(
410-
remove_empty_entities=remove_empty_entities,
411-
release_orphaned_locks=release_orphaned_locks,
412-
_continuation_token=resp.continuationToken
422+
423+
empty_entities_removed = 0
424+
orphaned_locks_released = 0
425+
426+
while True:
427+
req = pb.CleanEntityStorageRequest(
428+
removeEmptyEntities=remove_empty_entities,
429+
releaseOrphanedLocks=release_orphaned_locks,
430+
continuationToken=_continuation_token
413431
)
414-
empty_entities_removed += next_result.empty_entities_removed
415-
orphaned_locks_released += next_result.orphaned_locks_released
432+
resp: pb.CleanEntityStorageResponse = self._stub.CleanEntityStorage(req)
433+
empty_entities_removed += resp.emptyEntitiesRemoved
434+
orphaned_locks_released += resp.orphanedLocksReleased
435+
436+
if resp.continuationToken and resp.continuationToken.value and resp.continuationToken.value != "0":
437+
self._logger.info(f"Received continuation token with value {resp.continuationToken.value}, cleaning next page...")
438+
if _continuation_token and _continuation_token.value and _continuation_token.value == resp.continuationToken.value:
439+
self._logger.warning(f"Received the same continuation token value {resp.continuationToken.value} again, stopping to avoid infinite loop.")
440+
break
441+
_continuation_token = resp.continuationToken
442+
else:
443+
break
416444

417445
return CleanEntityStorageResult(empty_entities_removed, orphaned_locks_released)

durabletask/entities/entity_metadata.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from datetime import datetime, timezone
22
from typing import Any, Optional, Type, TypeVar, Union, overload
3-
from warnings import deprecated
43
from durabletask.entities.entity_instance_id import EntityInstanceId
54

65
import durabletask.internal.orchestrator_service_pb2 as pb
@@ -43,7 +42,6 @@ def __init__(self,
4342
self.includes_state = includes_state
4443
self._state = state
4544

46-
@deprecated("This method is deprecated. Use 'from_entity_metadata' instead.")
4745
@staticmethod
4846
def from_entity_response(entity_response: pb.GetEntityResponse, includes_state: bool):
4947
return EntityMetadata.from_entity_metadata(entity_response.entity, includes_state)

tests/durabletask-azuremanaged/test_dts_batch_actions.py

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -43,20 +43,18 @@ def test_get_all_orchestration_states():
4343
assert this_orch.instance_id == id
4444

4545
assert all_orchestrations is not None
46-
assert len(all_orchestrations) >= 1
47-
print(f"Received {len(all_orchestrations)} orchestrations")
48-
assert len([o for o in all_orchestrations if o.instance_id == id]) == 1
49-
orchestration_state = [o for o in all_orchestrations if o.instance_id == id][0]
46+
matching_orchestrations = [o for o in all_orchestrations if o.instance_id == id]
47+
assert len(matching_orchestrations) == 1
48+
orchestration_state = matching_orchestrations[0]
5049
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
5150
assert orchestration_state.serialized_input is None
5251
assert orchestration_state.serialized_output is None
5352
assert orchestration_state.failure_details is None
5453

5554
assert all_orchestrations_with_state is not None
56-
assert len(all_orchestrations_with_state) >= 1
57-
print(f"Received {len(all_orchestrations_with_state)} orchestrations")
58-
assert len([o for o in all_orchestrations_with_state if o.instance_id == id]) == 1
59-
orchestration_state = [o for o in all_orchestrations_with_state if o.instance_id == id][0]
55+
matching_orchestrations = [o for o in all_orchestrations_with_state if o.instance_id == id]
56+
assert len(matching_orchestrations) == 1
57+
orchestration_state = matching_orchestrations[0]
6058
assert orchestration_state.runtime_status == client.OrchestrationStatus.COMPLETED
6159
assert orchestration_state.serialized_input == '"Hello"'
6260
assert orchestration_state.serialized_output == '"Complete"'

0 commit comments

Comments
 (0)