Skip to content

Multiple streams can now be consumed at the same time #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/modern-nails-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@trigger.dev/core": patch
---

Multiple streams can now be consumed simultaneously
80 changes: 52 additions & 28 deletions packages/core/src/v3/apiClient/runStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
IOPacket,
parsePacket,
} from "../utils/ioSerialization.js";
import { ApiError } from "./errors.js";
import { ApiClient } from "./index.js";
import { AsyncIterableStream, createAsyncIterableStream, zodShapeStream } from "./stream.js";
import { EventSourceParserStream } from "eventsource-parser/stream";
Expand Down Expand Up @@ -97,7 +98,7 @@ export function runShapeStream<TRunTypes extends AnyRunTypes>(

// First, define interfaces for the stream handling
export interface StreamSubscription {
subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void>;
subscribe(): Promise<ReadableStream<unknown>>;
}

export interface StreamSubscriptionFactory {
Expand All @@ -111,33 +112,38 @@ export class SSEStreamSubscription implements StreamSubscription {
private options: { headers?: Record<string, string>; signal?: AbortSignal }
) {}

async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
const response = await fetch(this.url, {
async subscribe(): Promise<ReadableStream<unknown>> {
return fetch(this.url, {
headers: {
Accept: "text/event-stream",
...this.options.headers,
},
signal: this.options.signal,
});

if (!response.body) {
throw new Error("No response body");
}

const reader = response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.getReader();

while (true) {
const { done, value } = await reader.read();

if (done) break;
}).then((response) => {
if (!response.ok) {
throw ApiError.generate(
response.status,
{},
"Could not subscribe to stream",
Object.fromEntries(response.headers)
);
}

await onChunk(safeParseJSON(value.data));
}
if (!response.body) {
throw new Error("No response body");
}

return () => reader.cancel();
return response.body
.pipeThrough(new TextDecoderStream())
.pipeThrough(new EventSourceParserStream())
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(safeParseJSON(chunk.data));
},
})
);
});
}
}

Expand Down Expand Up @@ -254,13 +260,31 @@ export class RunSubscription<TRunTypes extends AnyRunTypes> {
this.options.client?.baseUrl
);

await subscription.subscribe(async (chunk) => {
controller.enqueue({
type: streamKey,
chunk: chunk as TStreams[typeof streamKey],
run,
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
});
const stream = await subscription.subscribe();

// Create the pipeline and start it
stream
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue({
type: streamKey,
chunk: chunk as TStreams[typeof streamKey],
run,
} as StreamPartResult<RunShape<TRunTypes>, TStreams>);
},
})
)
.pipeTo(
new WritableStream({
write(chunk) {
controller.enqueue(chunk);
},
})
)
.catch((error) => {
console.error(`Error in stream ${streamKey}:`, error);
});
Comment on lines +263 to +287
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in stream pipeline

The current error handling only logs to console, which could lead to silent failures. Consider:

  1. Propagating errors to the caller
  2. Adding error recovery mechanisms
  3. Providing error events that consumers can listen to
              stream
                .pipeThrough(
                  new TransformStream({
                    transform(chunk, controller) {
                      controller.enqueue({
                        type: streamKey,
                        chunk: chunk as TStreams[typeof streamKey],
                        run,
                      } as StreamPartResult<RunShape<TRunTypes>, TStreams>);
                    },
                  })
                )
                .pipeTo(
                  new WritableStream({
                    write(chunk) {
                      controller.enqueue(chunk);
                    },
+                   abort(reason) {
+                     controller.error(new Error(`Stream ${streamKey} aborted: ${reason}`));
+                   }
                  })
                )
                .catch((error) => {
-                 console.error(`Error in stream ${streamKey}:`, error);
+                 // Propagate error to consumer
+                 controller.error(new Error(`Error in stream ${streamKey}: ${error.message}`));
+                 // Attempt recovery by resubscribing after a delay
+                 setTimeout(() => {
+                   if (!activeStreams.has(streamKey)) return;
+                   subscription.subscribe()
+                     .then(/* ... handle resubscription ... */)
+                     .catch(/* ... handle resubscription failure ... */);
+                 }, 1000);
                });

Committable suggestion skipped: line range outside the PR's diff.

}
}
}
Expand Down
16 changes: 11 additions & 5 deletions packages/core/test/runStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,23 @@ import {
import type { SubscribeRunRawShape } from "../src/v3/schemas/api.js";

// Test implementations
// Update TestStreamSubscription to return a ReadableStream
class TestStreamSubscription implements StreamSubscription {
constructor(private chunks: unknown[]) {}

async subscribe(onChunk: (chunk: unknown) => Promise<void>): Promise<() => void> {
for (const chunk of this.chunks) {
await onChunk(chunk);
}
return () => {};
async subscribe(): Promise<ReadableStream<unknown>> {
return new ReadableStream({
start: async (controller) => {
for (const chunk of this.chunks) {
controller.enqueue(chunk);
}
controller.close();
},
});
}
}

// TestStreamSubscriptionFactory can remain the same
class TestStreamSubscriptionFactory implements StreamSubscriptionFactory {
private streams = new Map<string, unknown[]>();

Expand Down
12 changes: 10 additions & 2 deletions scripts/publish-prerelease.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,17 @@ else
fi

# Run your commands

# Run changeset version command and capture its output
echo "Running: pnpm exec changeset version --snapshot $version"
pnpm exec changeset version --snapshot $version
if output=$(pnpm exec changeset version --snapshot $version 2>&1); then
if echo "$output" | grep -q "No unreleased changesets found"; then
echo "No unreleased changesets found. Exiting."
exit 0
fi
else
echo "Error running changeset version command"
exit 1
fi

echo "Running: pnpm run build --filter \"@trigger.dev/*\" --filter \"trigger.dev\""
pnpm run build --filter "@trigger.dev/*" --filter "trigger.dev"
Expand Down