Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions sdks/typescript/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdks/typescript/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@
"peerDependencies": {
"better-sqlite3": "^12.5.0"
},
"dependencies": {
"temporal-polyfill": "^0.3.0"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.13",
"@types/node": "^22.18.0",
Expand Down
43 changes: 21 additions & 22 deletions sdks/typescript/src/absurd.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* Modifications Copyright (c) absurd-sqlite contributors.
*/
import * as os from "node:os";
import { Temporal } from "temporal-polyfill";

/**
* Minimal query interface compatible with Absurd's database operations.
Expand Down Expand Up @@ -39,8 +40,8 @@ export interface RetryStrategy {
}

export interface CancellationPolicy {
maxDuration?: number;
maxDelay?: number;
maxDuration?: Temporal.Duration;
maxDelay?: Temporal.Duration;
}

export interface SpawnOptions {
Expand Down Expand Up @@ -315,27 +316,29 @@ export class TaskContext {
}

/**
* Suspends the task until the given duration (seconds) elapses.
* Suspends the task until the given duration elapses.
* @param stepName Checkpoint name for this wait.
* @param duration Duration to wait in seconds.
* @param duration Duration to wait.
*/
async sleepFor(stepName: string, duration: number): Promise<void> {
return await this.sleepUntil(stepName, new Date(Date.now() + duration * 1000));
async sleepFor(stepName: string, duration: Temporal.Duration): Promise<void> {
const now = Temporal.Now.instant();
const wakeAt = now.add(duration);
return await this.sleepUntil(stepName, wakeAt);
}

/**
* Suspends the task until the specified time.
* @param stepName Checkpoint name for this wait.
* @param wakeAt Absolute time when the task should resume.
*/
async sleepUntil(stepName: string, wakeAt: Date): Promise<void> {
async sleepUntil(stepName: string, wakeAt: Temporal.Instant): Promise<void> {
const checkpointName = this.getCheckpointName(stepName);
const state = await this.lookupCheckpoint(checkpointName);
const actualWakeAt = typeof state === "string" ? new Date(state) : wakeAt;
const actualWakeAt = typeof state === "string" ? Temporal.Instant.from(state) : wakeAt;
if (!state) {
await this.persistCheckpoint(checkpointName, wakeAt.toISOString());
await this.persistCheckpoint(checkpointName, wakeAt.toString());
}
if (Date.now() < actualWakeAt.getTime()) {
if (Temporal.Instant.compare(Temporal.Now.instant(), actualWakeAt) < 0) {
await this.scheduleRun(actualWakeAt);
throw new SuspendTask();
}
Expand Down Expand Up @@ -389,7 +392,7 @@ export class TaskContext {
this.recordLeaseExtension(this.claimTimeout);
}

private async scheduleRun(wakeAt: Date): Promise<void> {
private async scheduleRun(wakeAt: Temporal.Instant): Promise<void> {
await this.con.query(`SELECT absurd.schedule_run($1, $2, $3)`, [
this.queueName,
this.task.run_id,
Expand All @@ -398,24 +401,20 @@ export class TaskContext {
}

/**
* Waits for an event by name and returns its payload; optionally sets a custom step name and timeout (seconds).
* Waits for an event by name and returns its payload; optionally sets a custom step name and timeout.
* @param eventName Event identifier to wait for.
* @param options.stepName Optional checkpoint name (defaults to $awaitEvent:<eventName>).
* @param options.timeout Optional timeout in seconds.
* @param options.timeout Optional timeout duration.
* @throws TimeoutError If the event is not received before the timeout.
*/
async awaitEvent(
eventName: string,
options?: { stepName?: string; timeout?: number }
options?: { stepName?: string; timeout?: Temporal.Duration }
): Promise<JsonValue> {
const stepName = options?.stepName || `$awaitEvent:${eventName}`;
let timeout: number | null = null;
if (
options?.timeout !== undefined &&
Number.isFinite(options?.timeout) &&
options?.timeout >= 0
) {
timeout = Math.floor(options?.timeout);
if (options?.timeout !== undefined) {
timeout = Math.floor(options.timeout.total("seconds"));
}
const checkpointName = this.getCheckpointName(stepName);
const cached = await this.lookupCheckpoint(checkpointName);
Expand Down Expand Up @@ -1022,10 +1021,10 @@ function normalizeCancellation(
}
const normalized: JsonObject = {};
if (policy.maxDuration !== undefined) {
normalized.max_duration = policy.maxDuration;
normalized.max_duration = Math.floor(policy.maxDuration.total("seconds"));
}
if (policy.maxDelay !== undefined) {
normalized.max_delay = policy.maxDelay;
normalized.max_delay = Math.floor(policy.maxDelay.total("seconds"));
}
return Object.keys(normalized).length > 0 ? normalized : undefined;
}
3 changes: 3 additions & 0 deletions sdks/typescript/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import {
} from "./absurd";
import { SQLiteConnection } from "./sqlite-connection";

// Re-export Temporal from temporal-polyfill
export { Temporal } from "temporal-polyfill";

export type { Queryable } from "./absurd";
export {
CancelledTask,
Expand Down
28 changes: 23 additions & 5 deletions sdks/typescript/src/sqlite-connection.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Temporal } from "temporal-polyfill";
import type { Queryable } from "./absurd";
import type {
SQLiteColumnDefinition,
Expand Down Expand Up @@ -228,19 +229,36 @@ function decodeColumnValue<V = any>(args: {
}

if (columnTypeName === "datetime") {
if (typeof value !== "number") {
throw new Error(
`Expected datetime column ${columnName} to be a number, got ${typeof value}`
);
// SQLite stores datetimes as strings but may return them in different formats
// depending on how they were inserted. Support both ISO strings (from
// Temporal.Instant.toString() or Date.toISOString()) and epoch milliseconds
// (from numeric timestamps).
if (typeof value === "string") {
// Handle ISO string format (e.g., "2024-01-01T00:00:00Z")
return Temporal.Instant.from(value) as V;
}
return new Date(value) as V;
if (typeof value === "number") {
// Handle epoch milliseconds format
return Temporal.Instant.fromEpochMilliseconds(value) as V;
}
throw new Error(
`Expected datetime column ${columnName} to be a string or number, got ${typeof value}`
);
}

// For other types, return as is
return value as V;
}

function encodeColumnValue(value: any): any {
// Encode Temporal types to ISO string format for SQLite storage
if (value instanceof Temporal.Instant) {
return value.toString();
}
if (value instanceof Temporal.Duration) {
return value.toString();
}
// Legacy support for Date objects
if (value instanceof Date) {
return value.toISOString();
}
Expand Down
12 changes: 6 additions & 6 deletions sdks/typescript/test/basic.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
vi,
} from "vitest";
import { createTestAbsurd, randomName, type TestContext } from "./setup.js";
import type { Absurd } from "../src/index.js";
import { Temporal, type Absurd } from "../src/index.js";
import { EventEmitter, once } from "events";

describe("Basic SDK Operations", () => {
Expand Down Expand Up @@ -170,7 +170,7 @@ describe("Basic SDK Operations", () => {
const scheduledRun = await ctx.getRun(runID);
expect(scheduledRun).toMatchObject({
state: "sleeping",
available_at: wakeAt,
available_at: Temporal.Instant.fromEpochMilliseconds(wakeAt.getTime()),
wake_event: null,
});

Expand All @@ -188,7 +188,7 @@ describe("Basic SDK Operations", () => {
const resumedRun = await ctx.getRun(runID);
expect(resumedRun).toMatchObject({
state: "running",
started_at: wakeAt,
started_at: Temporal.Instant.fromEpochMilliseconds(wakeAt.getTime()),
});
});

Expand All @@ -215,7 +215,7 @@ describe("Basic SDK Operations", () => {
expect(running).toMatchObject({
state: "running",
claimed_by: "worker-a",
claim_expires_at: new Date(baseTime.getTime() + 30 * 1000),
claim_expires_at: Temporal.Instant.fromEpochMilliseconds(baseTime.getTime() + 30 * 1000),
});

await ctx.setFakeNow(new Date(baseTime.getTime() + 5 * 60 * 1000));
Expand Down Expand Up @@ -274,7 +274,7 @@ describe("Basic SDK Operations", () => {
const runRow = await ctx.getRun(runID);
expect(runRow).toMatchObject({
claimed_by: "worker-clean",
claim_expires_at: new Date(base.getTime() + 60 * 1000),
claim_expires_at: Temporal.Instant.fromEpochMilliseconds(base.getTime() + 60 * 1000),
});

const beforeTTL = new Date(finishTime.getTime() + 30 * 60 * 1000);
Expand Down Expand Up @@ -481,7 +481,7 @@ describe("Basic SDK Operations", () => {

const getExpiresAt = async (runID: string) => {
const run = await ctx.getRun(runID);
return run?.claim_expires_at ? run.claim_expires_at.getTime() : 0;
return run?.claim_expires_at ? run.claim_expires_at.epochMilliseconds : 0;
};

absurd.workBatch("test-worker", claimTimeout);
Expand Down
12 changes: 6 additions & 6 deletions sdks/typescript/test/events.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, test, expect, beforeAll, afterEach } from "vitest";
import { createTestAbsurd, randomName, type TestContext } from "./setup.js";
import type { Absurd } from "../src/index.js";
import { TimeoutError } from "../src/index.js";
import { TimeoutError, Temporal } from "../src/index.js";

describe("Event system", () => {
let ctx: TestContext;
Expand All @@ -21,7 +21,7 @@ describe("Event system", () => {
const eventName = randomName("test_event");

absurd.registerTask({ name: "waiter" }, async (params, ctx) => {
const payload = await ctx.awaitEvent(eventName, { timeout: 60 });
const payload = await ctx.awaitEvent(eventName, { timeout: Temporal.Duration.from({ seconds: 60 }) });
return { received: payload };
});

Expand Down Expand Up @@ -86,7 +86,7 @@ describe("Event system", () => {
absurd.registerTask({ name: "timeout-waiter" }, async (_params, ctx) => {
try {
const payload = await ctx.awaitEvent(eventName, {
timeout: timeoutSeconds,
timeout: Temporal.Duration.from({ seconds: timeoutSeconds }),
});
return { timedOut: false, result: payload };
} catch (err) {
Expand All @@ -109,7 +109,7 @@ describe("Event system", () => {
wake_event: eventName,
});
const expectedWake = new Date(baseTime.getTime() + timeoutSeconds * 1000);
expect(sleepingRun?.available_at?.getTime()).toBe(expectedWake.getTime());
expect(sleepingRun?.available_at?.epochMilliseconds).toBe(expectedWake.getTime());

await ctx.setFakeNow(new Date(expectedWake.getTime() + 1000));
await absurd.workBatch("worker1", 120, 1);
Expand Down Expand Up @@ -170,13 +170,13 @@ describe("Event system", () => {

absurd.registerTask({ name: "timeout-no-loop" }, async (_params, ctx) => {
try {
await ctx.awaitEvent(eventName, { stepName: "wait", timeout: 10 });
await ctx.awaitEvent(eventName, { stepName: "wait", timeout: Temporal.Duration.from({ seconds: 10 }) });
return { stage: "unexpected" };
} catch (err) {
if (err instanceof TimeoutError) {
const payload = await ctx.awaitEvent(eventName, {
stepName: "wait",
timeout: 10,
timeout: Temporal.Duration.from({ seconds: 10 }),
});
return { stage: "resumed", payload };
}
Expand Down
10 changes: 5 additions & 5 deletions sdks/typescript/test/retry.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { describe, test, expect, beforeAll, afterEach } from "vitest";
import { createTestAbsurd, randomName, type TestContext } from "./setup.js";
import type { Absurd } from "../src/index.js";
import { Temporal, type Absurd } from "../src/index.js";

describe("Retry and cancellation", () => {
let ctx: TestContext;
Expand Down Expand Up @@ -159,7 +159,7 @@ describe("Retry and cancellation", () => {
const { taskID } = await absurd.spawn("duration-cancel", undefined, {
maxAttempts: 4,
retryStrategy: { kind: "fixed", baseSeconds: 30 },
cancellation: { maxDuration: 90 },
cancellation: { maxDuration: Temporal.Duration.from({ seconds: 90 }) },
});

await absurd.workBatch("worker1", 60, 1);
Expand All @@ -185,7 +185,7 @@ describe("Retry and cancellation", () => {
});

const { taskID } = await absurd.spawn("delay-cancel", undefined, {
cancellation: { maxDelay: 60 },
cancellation: { maxDelay: Temporal.Duration.from({ seconds: 60 }) },
});

await ctx.setFakeNow(new Date(baseTime.getTime() + 61 * 1000));
Expand Down Expand Up @@ -312,8 +312,8 @@ describe("Retry and cancellation", () => {

await absurd.cancelTask(taskID);
const second = await ctx.getTask(taskID);
expect(second?.cancelled_at?.getTime()).toBe(
first?.cancelled_at?.getTime(),
expect(second?.cancelled_at?.epochMilliseconds).toBe(
first?.cancelled_at?.epochMilliseconds,
);
});

Expand Down
Loading