-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathcontext.ts
676 lines (629 loc) · 22.1 KB
/
context.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
/*
* Copyright (c) 2023-2024 - Restate Software, Inc., Restate GmbH
*
* This file is part of the Restate SDK for Node.js/TypeScript,
* which is released under the MIT license.
*
* You can find a copy of the license in file LICENSE in the root
* directory of this repository or package, or at
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
*/
import type { Client, SendClient } from "./types/rpc.js";
import type {
RestateContext,
RestateObjectContext,
RestateObjectSharedContext,
RestateWorkflowContext,
RestateWorkflowSharedContext,
Service,
ServiceDefinitionFrom,
VirtualObject,
VirtualObjectDefinitionFrom,
Workflow,
WorkflowDefinitionFrom,
} from "@restatedev/restate-sdk-core";
import { ContextImpl } from "./context_impl.js";
/**
* Represents the original request as sent to this handler.
*
* A request object includes the request headers, and the raw unparsed
* request body.
*/
export interface Request {
/**
* The unique id that identifies the current function invocation. This id is guaranteed to be
* unique across invocations, but constant across reties and suspensions.
*/
readonly id: Uint8Array;
/**
* Request headers - the following headers capture the original invocation headers, as provided to
* the ingress.
*/
readonly headers: ReadonlyMap<string, string>;
/**
* Attempt headers - the following headers are sent by the restate runtime.
* These headers are attempt specific, generated by the restate runtime uniquely for each attempt.
* These headers might contain information such as the W3C trace context, and attempt specific information.
*/
readonly attemptHeaders: ReadonlyMap<string, string | string[] | undefined>;
/**
* Raw unparsed request body
*/
readonly body: Uint8Array;
}
/**
* Key value store operations. Only keyed services have an attached key-value store.
*/
export interface KeyValueStore {
/**
* Get/retrieve state from the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
* and deserialized with `JSON.parse(value.toString()) as T`.
*
* @param name key of the state to retrieve
* @returns a Promise that is resolved with the value of the state key
*
* @example
* const state = await ctx.get<string>("STATE");
*/
get<T>(name: string): Promise<T | null>;
stateKeys(): Promise<Array<string>>;
/**
* Set/store state in the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
* and deserialized with `JSON.parse(value.toString()) as T`.
*
* @param name key of the state to set
* @param value value to set
*
* @example
* ctx.set("STATE", "Hello");
*/
set<T>(name: string, value: T): void;
/**
* Clear/delete state in the Restate runtime.
* @param name key of the state to delete
*
* @example
* ctx.clear("STATE");
*/
clear(name: string): void;
/**
* Clear/delete all the state entries in the Restate runtime.
*
* @example
* ctx.clearAll();
*/
clearAll(): void;
}
export interface SendOptions {
/**
* Makes a type-safe one-way RPC to the specified target service, after a delay specified by the
* milliseconds' argument.
* This method is like setting up a fault-tolerant cron job that enqueues the message in a
* message queue.
* The handler calling this function does not have to stay active for the delay time.
*
* Both the delay timer and the message are durably stored in Restate and guaranteed to be reliably
* delivered. The delivery happens no earlier than specified through the delay, but may happen
* later, if the target service is down, or backpressuring the system.
*
* The delay message is journaled for durable execution and will thus not be duplicated when the
* handler is re-invoked for retries or after suspending.
*
* This call will return immediately; the message sending happens asynchronously in the background.
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
* triggers the send (calls this function) happens logically after the sending. That means that any
* failure where the message does not reach Restate also cannot complete this invocation, and will
* hence recover this handler and (through the durable execution) recover the message to be sent.
*
* @example
* ```ts
* ctx.serviceSendClient(Service, {delay: 60_000}).anotherAction(1337);
* ```
*/
delay?: number;
}
export interface ContextDate {
/** Returns the number of milliseconds elapsed since midnight, January 1, 1970 Universal Coordinated Time (UTC).
* This is equivalent to Date.now()
*/
now(): Promise<number>;
/** Returns the JSON representation of the current date.
* This is equivalent to new Date().toJSON()
**/
toJSON(): Promise<string>;
}
/**
* A function that can be run and its result durably persisted by Restate.
*/
export type RunAction<T> = (() => Promise<T>) | (() => T);
/**
* The context that gives access to all Restate-backed operations, for example
* - sending reliable messages / RPC through Restate
* - execute non-deterministic closures and memoize their result
* - sleeps and delayed calls
* - awakeables
* - ...
*
* Virtual objects can also access their key-value store using the {@link ObjectContext}.
*
*/
export interface Context extends RestateContext {
/**
* Deterministic random methods; these are inherently predictable (seeded on the invocation ID, which is not secret)
* and so should not be used for any cryptographic purposes. They are useful for identifiers, idempotency keys,
* and for uniform sampling from a set of options. If a cryptographically secure value is needed, please generate that
* externally and capture the result with a side effect.
*
* Calls to these methods from inside `ctx.run` are disallowed and will fail - side effects must be idempotent, and
* these calls are not.
*/
rand: Rand;
/**
* Console to use for logging. It attaches to each log message some contextual information,
* such as invoked service method and invocation id, and automatically excludes logs during replay.
*/
console: Console;
/**
* Deterministic date.
*/
date: ContextDate;
/**
* Run an operation and store the result in Restate. The operation will thus not
* be re-run during a later replay, but take the durable result from Restate.
*
* This let you capture potentially non-deterministic computation and interaction
* with external systems in a safe way.
*
* Failure semantics are:
* - If an operation has run and persisted before, the result (value or Error) will be
* taken from the Restate journal.
* - There is a small window where an action may be re-run, if a failure
* occurred between a successful run and persisting the result.
* - No second action will be run while a previous run's result is not
* yet durable. That way, effects that build on top of each other can assume
* deterministic results from previous runs, and at most one run will be
* re-executed on replay (the latest, if the failure happened in the small windows
* described above).
*
* @example
* ```ts
* const result = await ctx.run(someExternalAction)
*```
* @example
* ```ts
* await ctx.run("payment action", async () => {
* const result = await paymentProvider.charge(txId, paymentInfo);
* if (result.paymentRejected) {
* // this action will not be retried anymore
* throw new TerminalError("Payment failed");
* } else if (result.paymentGatewayBusy) {
* // restate will retry automatically
* throw new Exception("Payment gateway busy");
* } else {
* // success!
* }
* });
*
* ```
*
* @param action The function to run.
*/
run<T>(action: RunAction<T>): Promise<T>;
/**
* Run an operation and store the result in Restate. The operation will thus not
* be re-run during a later replay, but take the durable result from Restate.
*
* @param name the action's name
* @param action the action to run.
*/
run<T>(name: string, action: RunAction<T>): Promise<T>;
/**
* Register an awakeable and pause the processing until the awakeable ID (and optional payload) have been returned to the service
* (via ctx.completeAwakeable(...)). The SDK deserializes the payload with `JSON.parse(result.toString()) as T`.
* @returns
* - id: the string ID that has to be used to complete the awakaeble by some external service
* - promise: the Promise that needs to be awaited and that is resolved with the payload that was supplied by the service which completed the awakeable
*
* @example
* const awakeable = ctx.awakeable<string>();
*
* // send the awakeable ID to some external service that will wake this one back up
* // The ID can be retrieved by:
* const id = awakeable.id;
*
* // ... send to external service ...
*
* // Wait for the external service to wake this service back up
* const result = await awakeable.promise;
*/
awakeable<T>(): { id: string; promise: CombineablePromise<T> };
/**
* Resolve an awakeable.
* @param id the string ID of the awakeable.
* This is supplied by the service that needs to be woken up.
* @param payload the payload to pass to the service that is woken up.
* The SDK serializes the payload with `Buffer.from(JSON.stringify(payload))`
* and deserializes it in the receiving service with `JSON.parse(result.toString()) as T`.
*
* @example
* // The sleeping service should have sent the awakeableIdentifier string to this service.
* ctx.resolveAwakeable(awakeableIdentifier, "hello");
*/
resolveAwakeable<T>(id: string, payload?: T): void;
/**
* Reject an awakeable. When rejecting, the service waiting on this awakeable will be woken up with a terminal error with the provided reason.
* @param id the string ID of the awakeable.
* This is supplied by the service that needs to be woken up.
* @param reason the reason of the rejection.
*
* @example
* // The sleeping service should have sent the awakeableIdentifier string to this service.
* ctx.rejectAwakeable(awakeableIdentifier, "super bad error");
*/
rejectAwakeable(id: string, reason: string): void;
/**
* Sleep until a timeout has passed.
* @param millis duration of the sleep in millis.
* This is a lower-bound.
*
* @example
* await ctx.sleep(1000);
*/
sleep(millis: number): CombineablePromise<void>;
/**
* Makes a type-safe request/response RPC to the specified target service.
*
* The RPC goes through Restate and is guaranteed to be reliably delivered. The RPC is also
* journaled for durable execution and will thus not be duplicated when the handler is re-invoked
* for retries or after suspending.
*
* This call will return the result produced by the target handler, or the Error, if the target
* handler finishes with a Terminal Error.
*
* This call is a suspension point: The handler might suspend while awaiting the response and
* resume once the response is available.
*
* @example
* *Service Side:*
* ```ts
* const service = restate.service(
* name: "myservice",
* handlers: {
* someAction: async(ctx: restate.Context, req: string) => { ... },
* anotherAction: async(ctx: restate.Context, count: number) => { ... }
* });
*
* // option 1: export only the type signature
* export type Service = typeof service;
*
*
* restate.endpoint().bind(service).listen(9080);
* ```
* **Client side:**
* ```ts
* // option 1: use only types and supply service name separately
* const result1 = await ctx.serviceClient<Service>({name: "myservice"}).someAction("hello!");
*
* // option 2: use full API spec
* type MyService: Service = { name: "myservice" };
* const result2 = await ctx.serviceClient(Service).anotherAction(1337);
* ```
*/
serviceClient<D>(opts: ServiceDefinitionFrom<D>): Client<Service<D>>;
/**
* Same as {@link serviceClient} but for virtual objects.
*
* @param opts
* @param key the virtual object key
*/
objectClient<D>(
opts: VirtualObjectDefinitionFrom<D>,
key: string
): Client<VirtualObject<D>>;
/**
* Same as {@link serviceClient} but for workflows.
*
* @param opts
* @param key the workflow key
*/
workflowClient<D>(
opts: WorkflowDefinitionFrom<D>,
key: string
): Client<Workflow<D>>;
/**
* Same as {@link objectSendClient} but for workflows.
*
* @param opts
* @param key the workflow key
*/
workflowSendClient<D>(
opts: WorkflowDefinitionFrom<D>,
key: string
): SendClient<Workflow<D>>;
/**
* Makes a type-safe one-way RPC to the specified target service. This method effectively behaves
* like enqueuing the message in a message queue.
*
* The message goes through Restate and is guaranteed to be reliably delivered. The RPC is also
* journaled for durable execution and will thus not be duplicated when the handler is re-invoked
* for retries or after suspending.
*
* This call will return immediately; the message sending happens asynchronously in the background.
* Despite that, the message is guaranteed to be sent, because the completion of the invocation that
* triggers the send (calls this function) happens logically after the sending. That means that any
* failure where the message does not reach Restate also cannot complete this invocation, and will
* hence recover this handler and (through the durable execution) recover the message to be sent.
*
* @example
* *Service Side:*
* ```ts
* const service = restate.service(
* name: "myservice",
* handlers: {
* someAction: async(ctx: restate.Context, req: string) => { ... },
* anotherAction: async(ctx: restate.Context, count: number) => { ... }
* });
*
* // option 1: export only the type signature of the router
* export type MyApi = typeof service;
*
* // option 2: export the API definition with type and name (name)
* const MyService: MyApi = { name: "myservice" };
*
* restate.endpoint().bind(service).listen(9080);
* ```
* **Client side:**
* ```ts
* // option 1: use only types and supply service name separately
* ctx.serviceSendClient<MyApi>({name: "myservice"}).someAction("hello!");
*
* // option 2: use full API spec
* ctx.serviceSendClient(MyService).anotherAction(1337);
* ```
*/
serviceSendClient<D>(
service: ServiceDefinitionFrom<D>,
opts?: SendOptions
): SendClient<Service<D>>;
/**
* Same as {@link serviceSendClient} but for virtual objects.
*
* @param obj
* @param key the virtual object key
* @param opts Send options
*/
objectSendClient<D>(
obj: VirtualObjectDefinitionFrom<D>,
key: string,
opts?: SendOptions
): SendClient<VirtualObject<D>>;
/**
* Returns the raw request that triggered that handler.
* Use that object to inspect the original request headers
*/
request(): Request;
}
/**
* The context that gives access to all Restate-backed operations, for example
* - sending reliable messages / RPC through Restate
* - access/update state
* - execute non-deterministic closures and memoize their result
* - sleeps and delayed calls
* - awakeables
* - ...
*
* This context can be used only within virtual objects.
*
*/
export interface ObjectContext
extends Context,
KeyValueStore,
RestateObjectContext {
key: string;
}
/**
* The context that gives access to all Restate-backed operations, for example
* - sending reliable messages / RPC through Restate
* - execute non-deterministic closures and memoize their result
* - sleeps and delayed calls
* - awakeables
* - ...
*
* This context can be used only within a shared virtual objects.
*
*/
export interface ObjectSharedContext
extends Context,
RestateObjectSharedContext {
key: string;
/**
* Get/retrieve state from the Restate runtime.
* Note that state objects are serialized with `Buffer.from(JSON.stringify(theObject))`
* and deserialized with `JSON.parse(value.toString()) as T`.
*
* @param name key of the state to retrieve
* @returns a Promise that is resolved with the value of the state key
*
* @example
* const state = await ctx.get<string>("STATE");
*/
get<T>(name: string): Promise<T | null>;
/**
* Retrieve all the state keys for this object.
*/
stateKeys(): Promise<Array<string>>;
}
export interface Rand {
/**
* Equivalent of JS `Math.random()` but deterministic; seeded by the invocation ID of the current invocation,
* each call will return a new pseudorandom float within the range [0,1)
*/
random(): number;
/**
* Using the same random source and seed as random(), produce a UUID version 4 string. This is inherently predictable
* based on the invocation ID and should not be used in cryptographic contexts
*/
uuidv4(): string;
}
/**
* A promise that can be combined using Promise combinators in RestateContext.
*/
export type CombineablePromise<T> = Promise<T> & {
/**
* Creates a promise that awaits for the current promise up to the specified timeout duration.
* If the timeout is fired, this Promise will be rejected with a {@link TimeoutError}.
*
* @param millis duration of the sleep in millis.
* This is a lower-bound.
*/
orTimeout(millis: number): Promise<T>;
};
export const CombineablePromise = {
/**
* Creates a Promise that is resolved with an array of results when all of the provided Promises
* resolve, or rejected when any Promise is rejected.
*
* See {@link Promise.all} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
all<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<{ -readonly [P in keyof T]: Awaited<T[P]> }> {
if (values.length == 0) {
return Promise.all(values);
}
return ContextImpl.createCombinator(
Promise.all.bind(Promise),
values
) as Promise<{
-readonly [P in keyof T]: Awaited<T[P]>;
}>;
},
/**
* Creates a Promise that is resolved or rejected when any of the provided Promises are resolved
* or rejected.
*
* See {@link Promise.race} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
race<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<Awaited<T[number]>> {
if (values.length == 0) {
return Promise.race(values);
}
return ContextImpl.createCombinator(
Promise.race.bind(Promise),
values
) as Promise<Awaited<T[number]>>;
},
/**
* Creates a promise that fulfills when any of the input's promises fulfills, with this first fulfillment value.
* It rejects when all the input's promises reject (including when an empty iterable is passed),
* with an AggregateError containing an array of rejection reasons.
*
* See {@link Promise.any} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
any<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<Awaited<T[number]>> {
if (values.length == 0) {
return Promise.any(values);
}
return ContextImpl.createCombinator(
Promise.any.bind(Promise),
values
) as Promise<Awaited<T[number]>>;
},
/**
* Creates a promise that fulfills when all the input's promises settle (including when an empty iterable is passed),
* with an array of objects that describe the outcome of each promise.
*
* See {@link Promise.allSettled} for more details.
*
* @param values An iterable of Promises.
* @returns A new Promise.
*/
allSettled<T extends readonly CombineablePromise<unknown>[] | []>(
values: T
): Promise<{
-readonly [P in keyof T]: PromiseSettledResult<Awaited<T[P]>>;
}> {
if (values.length == 0) {
return Promise.allSettled(values);
}
return ContextImpl.createCombinator(
Promise.allSettled.bind(Promise),
values
) as Promise<{
-readonly [P in keyof T]: PromiseSettledResult<Awaited<T[P]>>;
}>;
},
};
/**
* Workflow bound durable promise
*
* See {@link WorkflowSharedContext} promise..
*/
export type DurablePromise<T> = Promise<T> & {
/**
* Returns the value of the promise, if it has been resolved.
*/
peek(): Promise<T | undefined>;
/**
* Resolve the promise with the given value.
* @param value the value to resolve the promise with
*/
resolve(value?: T): Promise<void>;
/**
* Reject the promise with the given error message.
* @param errorMsg the error message to use for rejection.
*/
reject(errorMsg: string): Promise<void>;
/**
* Obtain a {@link CombineablePromise} variant of this promise.
*/
get(): CombineablePromise<T>;
};
export interface WorkflowSharedContext
extends ObjectSharedContext,
RestateWorkflowSharedContext {
/**
* Create a durable promise that can be resolved or rejected during the workflow execution.
* The promise is bound to the workflow and will be persisted across suspensions and retries.
*
* @example
* ```ts
* const wf = restate.workflow({
* name: "myWorkflow",
* handlers: {
* run: async (ctx: restate.WorkflowContext) => {
* // ... do some work ...
* const payment = await ctx.promise<Payment>("payment.succeeded");
* // ... do some more work ...
* },
*
* onPaymentSucceeded: async (ctx: restate.WorkflowContext, payment) => {
* // ... handle payment succeeded ...
* await ctx.promise("payment.succeeded").resolve(payment);
* }
* });
* ```
*
* @param name the name of the durable promise
*/
promise<T = void>(name: string): DurablePromise<T>;
}
export interface WorkflowContext
extends WorkflowSharedContext,
ObjectContext,
RestateWorkflowContext {}