Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions packages/durabletask-js/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import { callWithMetadata, MetadataGenerator } from "../utils/grpc-helper.util";
import { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "../orchestration/orchestration-query";
import { Page, AsyncPageable, createAsyncPageable } from "../orchestration/page";
import { FailureDetails } from "../task/failure-details";
import { HistoryEvent } from "../orchestration/history-event";
import { convertProtoHistoryEvent } from "../utils/history-event-converter";
import { Logger, ConsoleLogger } from "../types/logger.type";

// Re-export MetadataGenerator for backward compatibility
Expand Down Expand Up @@ -735,6 +737,77 @@ export class TaskHubGrpcClient {
return new Page(instanceIds, lastInstanceKey);
}

/**
* Retrieves the history of the specified orchestration instance as a list of HistoryEvent objects.
*
* This method streams the history events from the backend and returns them as an array.
* The history includes all events that occurred during the orchestration execution,
* such as task scheduling, completion, failure, timer events, and more.
*
* If the orchestration instance does not exist, an empty array is returned.
*
* @param instanceId - The unique identifier of the orchestration instance.
* @returns A Promise that resolves to an array of HistoryEvent objects representing
* the orchestration's history. Returns an empty array if the instance is not found.
* @throws {Error} If the instanceId is null or empty.
* @throws {Error} If the operation is canceled.
* @throws {Error} If an internal error occurs while retrieving the history.
*
* @example
* ```typescript
* const history = await client.getOrchestrationHistory(instanceId);
* for (const event of history) {
* console.log(`Event ${event.eventId}: ${event.type} at ${event.timestamp}`);
* }
* ```
*/
async getOrchestrationHistory(instanceId: string): Promise<HistoryEvent[]> {
if (!instanceId) {
throw new Error("instanceId is required");
}

const req = new pb.StreamInstanceHistoryRequest();
req.setInstanceid(instanceId);
req.setForworkitemprocessing(false);

const metadata = this._metadataGenerator ? await this._metadataGenerator() : new grpc.Metadata();
const stream = this._stub.streamInstanceHistory(req, metadata);

return new Promise<HistoryEvent[]>((resolve, reject) => {
const historyEvents: HistoryEvent[] = [];

stream.on("data", (chunk: pb.HistoryChunk) => {
const protoEvents = chunk.getEventsList();
for (const protoEvent of protoEvents) {
const event = convertProtoHistoryEvent(protoEvent);
if (event) {
historyEvents.push(event);
}
}
});

stream.on("end", () => {
stream.removeAllListeners();
resolve(historyEvents);
});

stream.on("error", (err: grpc.ServiceError) => {
stream.removeAllListeners();
// Return empty array for NOT_FOUND to be consistent with DTS behavior
// (DTS returns empty stream for non-existent instances) and user-friendly
if (err.code === grpc.status.NOT_FOUND) {
resolve([]);
} else if (err.code === grpc.status.CANCELLED) {
reject(new Error(`The getOrchestrationHistory operation was canceled.`));
} else if (err.code === grpc.status.INTERNAL) {
reject(new Error(`An error occurred while retrieving the history for orchestration with instanceId '${instanceId}'.`));
} else {
reject(err);
}
});
});
}

/**
* Helper method to create an OrchestrationState from a protobuf OrchestrationState.
*/
Expand Down
38 changes: 38 additions & 0 deletions packages/durabletask-js/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,44 @@ export { OrchestrationState } from "./orchestration/orchestration-state";
export { OrchestrationQuery, ListInstanceIdsOptions, DEFAULT_PAGE_SIZE } from "./orchestration/orchestration-query";
export { Page, AsyncPageable, createAsyncPageable } from "./orchestration/page";

// History event types
export {
HistoryEvent,
HistoryEventType,
HistoryEventBase,
ExecutionStartedEvent,
ExecutionCompletedEvent,
ExecutionTerminatedEvent,
ExecutionSuspendedEvent,
ExecutionResumedEvent,
ExecutionRewoundEvent,
TaskScheduledEvent,
TaskCompletedEvent,
TaskFailedEvent,
SubOrchestrationInstanceCreatedEvent,
SubOrchestrationInstanceCompletedEvent,
SubOrchestrationInstanceFailedEvent,
TimerCreatedEvent,
TimerFiredEvent,
OrchestratorStartedEvent,
OrchestratorCompletedEvent,
EventSentEvent,
EventRaisedEvent,
GenericEvent,
HistoryStateEvent,
ContinueAsNewEvent,
OrchestrationInstance,
ParentInstanceInfo,
TraceContext,
EntityOperationSignaledEvent,
EntityOperationCalledEvent,
EntityOperationCompletedEvent,
EntityOperationFailedEvent,
EntityLockRequestedEvent,
EntityLockGrantedEvent,
EntityUnlockSentEvent,
} from "./orchestration/history-event";

// Proto types (for advanced usage)
export { OrchestrationStatus as ProtoOrchestrationStatus } from "./proto/orchestrator_service_pb";

Expand Down
Loading
Loading