diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 011b8737..dc354371 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -262,6 +262,51 @@ jobs: run: npm run dev & working-directory: examples/agents-researcher + nestjs-local-build: + needs: + - local-tests + runs-on: ubuntu-latest + name: Nestjs Local Build + steps: + - name: Setup repo + uses: actions/checkout@v4 + + - name: Setup Bun + uses: oven-sh/setup-bun@v1 + with: + bun-version: latest + + - name: Install Dependencies + run: bun install + + - name: Build + run: bun run build + + - name: Install Node.js + uses: actions/setup-node@v4 + with: + node-version: 20 + + - name: Install Dependencies + run: npm install + working-directory: examples/nestjs + + - name: Install local package + run: npm install @upstash/workflow@file:../../dist + working-directory: examples/nestjs + + - name: Build Project + run: npm run build + working-directory: examples/nestjs + + - name: Run example + run: npm run dev & + working-directory: examples/nestjs + + - name: Run local tests + run: bun test . + working-directory: examples/nestjs + nextjs-local-build: needs: - local-tests @@ -624,7 +669,7 @@ jobs: - name: Setup nodejs uses: actions/setup-node@v3 with: - node-version: 18 + node-version: 20 - name: Setup Bun uses: oven-sh/setup-bun@v1 diff --git a/.husky/pre-push b/.husky/pre-push index 55bdc789..1f9223c3 100755 --- a/.husky/pre-push +++ b/.husky/pre-push @@ -1 +1 @@ -bun run test && bun run build +bun run build diff --git a/examples/README.md b/examples/README.md index 656d0a36..0ee1cf59 100644 --- a/examples/README.md +++ b/examples/README.md @@ -3,36 +3,3 @@ This directory has example projects for Upstash Workflow with different frameworks. Each project has an interface where you can enter the deployment URL, pick a workflow endpoint, enter a payload and finally call the picked workflow endpoint. - -## How to Run - -There are three alternatives: -1. Deploy the app and use the interface to call it -2. Run the app locally and create a local tunnel with Ngrok so that QStash can call it. Doing this is simplified through the `bootstrap.sh` script. -3. If you have access to the QStash development server, run both the development server and the example workflow project locally. Unfortunetly, local QStash development server is not public. - -### `bootstrap.sh` Script - -First, set the environment variables `QSTASH_TOKEN`, `QSTASH_URL`, `QSTASH_CURRENT_SIGNING_KEY` and `QSTASH_NEXT_SIGNING_KEY`. - -The `bootstrap.sh` script makes it possible to start an examplew workflow project and create a Ngrok tunnel in one script. To run it, simply choose the framework and the endpoint you would like to choose as default: - -``` -bash bootstrap.sh -``` - -Here is an example call: - -``` -bash bootstrap.sh nextjs -``` - -Here is what the script does in a nutshell: -- create a Ngrok tunnel from `localhost:3001` -- Public URL of the tunnel is inferred from Ngrok logs. This URL is set to the `UPSTASH_WORKFLOW_URL` environment variable. -- `npm install` and `npm run dev` are executed in the example directory -- a web browser is opened with the picked endpoint - -To use the app, simply send a request through the opened interface. - -You will be able to see the workflow executing in the console logs. You can also monitor the events in [the QStash tab of Upstash Console](https://console.upstash.com/qstash?tab=workflow). diff --git a/examples/astro/README.md b/examples/astro/README.md index 2eb4028e..a0288afc 100644 --- a/examples/astro/README.md +++ b/examples/astro/README.md @@ -7,36 +7,28 @@ This is an example of how to use Upstash Workflow with Astro. You can learn more ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh astro -> ``` - 1. Install the dependencies ```bash npm install ``` -1. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.env` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/cloudflare-workers-hono/README.md b/examples/cloudflare-workers-hono/README.md index 84432b99..3ed0db30 100644 --- a/examples/cloudflare-workers-hono/README.md +++ b/examples/cloudflare-workers-hono/README.md @@ -7,37 +7,28 @@ This is an example of how to use Upstash Workflow with Cloudflare Workers with H ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh cloudflare-workers-hono -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.dev.vars` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_URL= -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.dev.vars` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/cloudflare-workers-hono/ci.test.ts b/examples/cloudflare-workers-hono/ci.test.ts index d730d2e9..9921b3ee 100644 --- a/examples/cloudflare-workers-hono/ci.test.ts +++ b/examples/cloudflare-workers-hono/ci.test.ts @@ -135,9 +135,9 @@ describe("cloudflare workers", () => { token: "mock" }) - // @ts-expect-error mocking publish - qstashClient.publish = async () => { - return { messageId: "msgId" } + //mocking batch + qstashClient.batch = async () => { + return [{ messageId: "msgId" }] } const { POST: serveHandler } = serve( diff --git a/examples/cloudflare-workers/README.md b/examples/cloudflare-workers/README.md index d6769d4e..ee1911ff 100644 --- a/examples/cloudflare-workers/README.md +++ b/examples/cloudflare-workers/README.md @@ -7,37 +7,28 @@ This is an example of how to use Upstash Workflow with Cloudflare Workers. You c ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh cloudflare-workers -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.dev.vars` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_URL= -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.dev.vars` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/express/README.md b/examples/express/README.md index 605997b4..c20bb26d 100644 --- a/examples/express/README.md +++ b/examples/express/README.md @@ -4,38 +4,28 @@ This is an example of how to use Upstash Workflow in a Express.js project. You c ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/qstash-js/tree/main/examples/workflow) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `qstash-js/examples/workflow` directory: -> -> ``` -> bash bootstrap.sh express -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.env` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_TOKEN= - -UPSTASH_WORKFLOW_URL= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/fastify/README.md b/examples/fastify/README.md new file mode 100644 index 00000000..ecc6778b --- /dev/null +++ b/examples/fastify/README.md @@ -0,0 +1,57 @@ +# Upstash Workflow & Fastify Example + +This is an example of how to use [Upstash Workflow](https://upstash.com/docs/workflow/getstarted) with a Fastify server using TypeScript and ES modules. + +## Getting Started + +### 1. Install the dependencies + +```bash +bun install +``` + +### 2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): + +```bash +npx @upstash/qstash-cli dev +``` + +### 3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env` file: + +```bash +QSTASH_URL="***" +QSTASH_TOKEN="***" +``` + +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). + +### 4. Run your app: + +```bash +bun start +``` + +For production builds: + +```bash +bun run build +bun run start:prod +``` + +### 5. Send a `POST` request to the workflow endpoint. + +```bash +curl -X POST http://localhost:3001/workflow -d '{"hello": "world"}' -H "content-type:application/json" +``` + +## Project Structure + +- `src/index.ts`: Entry point for the application +- `src/server.ts`: Fastify server configuration and workflow implementation +- `tsconfig.json`: TypeScript configuration +- `package.json`: Project dependencies and scripts + +## Scripts + +- `bun start`: Start the development server using Bun +- `bun run build`: Build the TypeScript files diff --git a/examples/fastify/package.json b/examples/fastify/package.json new file mode 100644 index 00000000..2a811ae5 --- /dev/null +++ b/examples/fastify/package.json @@ -0,0 +1,26 @@ +{ + "name": "fastify", + "version": "1.0.0", + "description": "", + "main": "dist/index.js", + "type": "module", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1", + "build": "tsc", + "start": "bun src/index.ts", + "start:prod": "bun dist/index.js" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "@fastify/env": "^5.0.2", + "@upstash/workflow": "file:../../dist", + "fastify": "^5.3.3", + "fastify-cli": "^7.4.0" + }, + "devDependencies": { + "@types/node": "^22.15.23", + "typescript": "^5.8.3" + } +} diff --git a/examples/fastify/src/index.ts b/examples/fastify/src/index.ts new file mode 100644 index 00000000..0a16bbfa --- /dev/null +++ b/examples/fastify/src/index.ts @@ -0,0 +1,19 @@ +// src/index.ts +import server from './server.js'; + +// Run the server! +const start = async (): Promise => { + try { + await server.listen({ port: 3001, host: '0.0.0.0' }); + const address = server.server.address(); + const port = typeof address === 'string' ? address : address?.port; + server.log.info(`Server is running on http://localhost:${port}`); + } catch (err) { + server.log.error(err); + console.log(err); + console.error('Server failed to start'); + process.exit(1); + } +}; + +start(); diff --git a/examples/fastify/src/server.ts b/examples/fastify/src/server.ts new file mode 100644 index 00000000..7a247ac4 --- /dev/null +++ b/examples/fastify/src/server.ts @@ -0,0 +1,109 @@ +// src/server.ts +import Fastify, { FastifyRequest, FastifyReply } from 'fastify'; +import fastifyEnv from '@fastify/env'; +import { serve } from '@upstash/workflow'; + +const fastify = Fastify({ + logger: false +}); + +const schema = { + type: 'object', + required: ['QSTASH_URL', 'QSTASH_TOKEN'], + properties: { + QSTASH_URL: { + type: 'string', + }, + QSTASH_TOKEN: { + type: 'string', + } + } +} + +const options = { + confKey: 'config', // optional, default: 'config' + schema: schema, + dotenv: true +} + +interface FastifyEnvConfig { + QSTASH_URL: string; + QSTASH_TOKEN: string; +} + +// Extend FastifyInstance to include the config property +declare module 'fastify' { + interface FastifyInstance { + config: FastifyEnvConfig; + } +} + +fastify.register(fastifyEnv, options).after(() => { + // Register the /upstash route as a regular fastify route + const { handler } = serve(async (context) => { + const input = context.requestPayload; + const result1 = await context.run('step1', async () => { + const output = someWork(input); + console.log('step 1 input', input, 'output', output); + return output; + }); + + await context.run('step2', async () => { + const output = someWork(result1); + console.log('step 2 input', result1, 'output', output); + }); + }); + + fastify.route({ + method: ['POST'], + url: '/workflow', + handler: async (request: FastifyRequest, reply: FastifyReply) => { + // Convert Fastify request to Web Request + const { url, headers } = request; + + // @ts-expect-error getting encrypted property from raw socket + const encrypted: boolean = request.raw.socket.encrypted; + + // Get full URL including protocol and host + const protocol = request.protocol || (encrypted ? 'https' : 'http'); + const host = headers['host'] as string; + const fullUrl = `${protocol}://${host}${url}`; + + // Read body as a Buffer + let body: Buffer | null = null; + if (request.body) { + if (Buffer.isBuffer(request.body)) { + body = request.body; + } else if (typeof request.body === 'string') { + body = Buffer.from(request.body); + } else { + // Assume JSON + body = Buffer.from(JSON.stringify(request.body)); + } + } + + // Construct Web Request + const webRequest = new Request(fullUrl, { + method: "POST", + headers: headers as HeadersInit, + body + }); + + // Call the handler + const webResponse = await handler(webRequest); + + // Set status and headers + reply.code(webResponse.status); + + // Send body + const responseBody = await webResponse.arrayBuffer(); + reply.send(Buffer.from(responseBody)); + } + }); +}); + +const someWork = (input: unknown): string => { + return `processed '${JSON.stringify(input)}'`; +}; + +export default fastify; diff --git a/examples/fastify/tsconfig.json b/examples/fastify/tsconfig.json new file mode 100644 index 00000000..12ca9704 --- /dev/null +++ b/examples/fastify/tsconfig.json @@ -0,0 +1,15 @@ +{ + "compilerOptions": { + "target": "ESNext", + "module": "NodeNext", + "moduleResolution": "NodeNext", + "esModuleInterop": true, + "strict": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*"], + "exclude": ["node_modules"] +} diff --git a/examples/hono/README.md b/examples/hono/README.md index 50446cf9..e608afe7 100644 --- a/examples/hono/README.md +++ b/examples/hono/README.md @@ -4,37 +4,28 @@ This is an example of how to use Upstash Workflow in a Hono project. You can lea ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh hono -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.env` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_URL= -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/nestjs/.gitignore b/examples/nestjs/.gitignore new file mode 100644 index 00000000..4b56acfb --- /dev/null +++ b/examples/nestjs/.gitignore @@ -0,0 +1,56 @@ +# compiled output +/dist +/node_modules +/build + +# Logs +logs +*.log +npm-debug.log* +pnpm-debug.log* +yarn-debug.log* +yarn-error.log* +lerna-debug.log* + +# OS +.DS_Store + +# Tests +/coverage +/.nyc_output + +# IDEs and editors +/.idea +.project +.classpath +.c9/ +*.launch +.settings/ +*.sublime-workspace + +# IDE - VSCode +.vscode/* +!.vscode/settings.json +!.vscode/tasks.json +!.vscode/launch.json +!.vscode/extensions.json + +# dotenv environment variable files +.env +.env.development.local +.env.test.local +.env.production.local +.env.local + +# temp directory +.temp +.tmp + +# Runtime data +pids +*.pid +*.seed +*.pid.lock + +# Diagnostic reports (https://nodejs.org/api/report.html) +report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json diff --git a/examples/nestjs/.prettierrc b/examples/nestjs/.prettierrc new file mode 100644 index 00000000..dcb72794 --- /dev/null +++ b/examples/nestjs/.prettierrc @@ -0,0 +1,4 @@ +{ + "singleQuote": true, + "trailingComma": "all" +} \ No newline at end of file diff --git a/examples/nestjs/README.md b/examples/nestjs/README.md new file mode 100644 index 00000000..b0c2d25c --- /dev/null +++ b/examples/nestjs/README.md @@ -0,0 +1,42 @@ +# Upstash Workflow Nestjs Example + +This is an example of how to use Upstash Workflow in a Nestjs project. You can learn more in [Getting Started with Upstash Workflow](https://upstash.com/docs/workflow/getstarted). + +## Development + +1. Install the dependencies + +```bash +npm install +``` + +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): + +```bash +npx @upstash/qstash-cli dev +``` + +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env` file: + +```bash +QSTASH_URL="***" +QSTASH_TOKEN="***" +``` + +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). + +4. Run your app: + +```bash +npm run start +``` + +5. Send a `POST` request to the `/workflow` endpoint. + +**NOTE**: Workflow on Nestjs only works with `Content-Type: application/json` header. + +```bash +curl -X POST http://localhost:3001/workflow \ + -H "Content-Type: application/json" \ + -d '{"message": "Hello from the workflow!"}' +``` diff --git a/examples/nestjs/app.module.ts b/examples/nestjs/app.module.ts new file mode 100644 index 00000000..3f9d6175 --- /dev/null +++ b/examples/nestjs/app.module.ts @@ -0,0 +1,7 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule } from '@nestjs/config'; + +@Module({ + imports: [ConfigModule.forRoot()], +}) +export class AppModule {} diff --git a/examples/nestjs/eslint.config.mjs b/examples/nestjs/eslint.config.mjs new file mode 100644 index 00000000..caebf6e7 --- /dev/null +++ b/examples/nestjs/eslint.config.mjs @@ -0,0 +1,34 @@ +// @ts-check +import eslint from '@eslint/js'; +import eslintPluginPrettierRecommended from 'eslint-plugin-prettier/recommended'; +import globals from 'globals'; +import tseslint from 'typescript-eslint'; + +export default tseslint.config( + { + ignores: ['eslint.config.mjs'], + }, + eslint.configs.recommended, + ...tseslint.configs.recommendedTypeChecked, + eslintPluginPrettierRecommended, + { + languageOptions: { + globals: { + ...globals.node, + ...globals.jest, + }, + sourceType: 'commonjs', + parserOptions: { + projectService: true, + tsconfigRootDir: import.meta.dirname, + }, + }, + }, + { + rules: { + '@typescript-eslint/no-explicit-any': 'off', + '@typescript-eslint/no-floating-promises': 'warn', + '@typescript-eslint/no-unsafe-argument': 'warn' + }, + }, +); \ No newline at end of file diff --git a/examples/nestjs/nest-cli.json b/examples/nestjs/nest-cli.json new file mode 100644 index 00000000..f9aa683b --- /dev/null +++ b/examples/nestjs/nest-cli.json @@ -0,0 +1,8 @@ +{ + "$schema": "https://json.schemastore.org/nest-cli", + "collection": "@nestjs/schematics", + "sourceRoot": "src", + "compilerOptions": { + "deleteOutDir": true + } +} diff --git a/examples/nestjs/package.json b/examples/nestjs/package.json new file mode 100644 index 00000000..9936b637 --- /dev/null +++ b/examples/nestjs/package.json @@ -0,0 +1,76 @@ +{ + "name": "nestjs", + "version": "0.0.1", + "description": "", + "author": "", + "private": true, + "license": "UNLICENSED", + "scripts": { + "build": "nest build", + "format": "prettier --write \"src/**/*.ts\" \"test/**/*.ts\"", + "start": "nest start", + "start:dev": "nest start --watch", + "start:debug": "nest start --debug --watch", + "start:prod": "node dist/main", + "lint": "eslint \"{src,apps,libs,test}/**/*.ts\" --fix", + "test": "jest", + "test:watch": "jest --watch", + "test:cov": "jest --coverage", + "test:debug": "node --inspect-brk -r tsconfig-paths/register -r ts-node/register node_modules/.bin/jest --runInBand", + "test:e2e": "jest --config ./test/jest-e2e.json" + }, + "dependencies": { + "@nestjs/common": "^11.0.1", + "@nestjs/config": "^4.0.2", + "@nestjs/core": "^11.0.1", + "@nestjs/platform-express": "^11.0.1", + "@upstash/qstash": "^2.8.1", + "@upstash/workflow": "latest", + "reflect-metadata": "^0.2.2", + "rxjs": "^7.8.1" + }, + "devDependencies": { + "@eslint/eslintrc": "^3.2.0", + "@eslint/js": "^9.18.0", + "@nestjs/cli": "^11.0.0", + "@nestjs/schematics": "^11.0.0", + "@nestjs/testing": "^11.0.1", + "@swc/cli": "^0.6.0", + "@swc/core": "^1.10.7", + "@types/express": "^5.0.0", + "@types/jest": "^29.5.14", + "@types/node": "^22.10.7", + "@types/supertest": "^6.0.2", + "eslint": "^9.18.0", + "eslint-config-prettier": "^10.0.1", + "eslint-plugin-prettier": "^5.2.2", + "globals": "^16.0.0", + "jest": "^29.7.0", + "prettier": "^3.4.2", + "source-map-support": "^0.5.21", + "supertest": "^7.0.0", + "ts-jest": "^29.2.5", + "ts-loader": "^9.5.2", + "ts-node": "^10.9.2", + "tsconfig-paths": "^4.2.0", + "typescript": "^5.7.3", + "typescript-eslint": "^8.20.0" + }, + "jest": { + "moduleFileExtensions": [ + "js", + "json", + "ts" + ], + "rootDir": "src", + "testRegex": ".*\\.spec\\.ts$", + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + }, + "collectCoverageFrom": [ + "**/*.(t|j)s" + ], + "coverageDirectory": "../coverage", + "testEnvironment": "node" + } +} diff --git a/examples/nestjs/src/app.controller.ts b/examples/nestjs/src/app.controller.ts new file mode 100644 index 00000000..56c5a665 --- /dev/null +++ b/examples/nestjs/src/app.controller.ts @@ -0,0 +1,58 @@ +import { Controller, Get, Post, Req, Res, Next } from '@nestjs/common'; +import { AppService } from './app.service'; +import { serve } from '@upstash/workflow/express'; +import { Request, Response, NextFunction } from 'express'; +import { ConfigService } from '@nestjs/config'; +import { Client } from '@upstash/qstash'; + +const someWork = (input: string) => { + return `processed '${JSON.stringify(input)}'`; +}; + +@Controller() +export class AppController { + private qstashClient: Client; + + constructor( + private readonly appService: AppService, + private configService: ConfigService, + ) { + const qstashUrl = this.configService.get('QSTASH_URL'); + const qstashToken = this.configService.get('QSTASH_TOKEN'); + this.qstashClient = new Client({ + baseUrl: qstashUrl, + token: qstashToken, + }); + } + + @Get() + getHello(): string { + return this.configService.get('QSTASH_URL') || 'missing'; + } + + @Post('workflow') + async upstashWorkflow( + @Req() req: Request, + @Res() res: Response, + @Next() next: NextFunction, + ) { + return serve<{ message: string }>( + async (context) => { + const input = context.requestPayload.message; + const result1 = await context.run('step1', () => { + const output = someWork(input); + console.log('step 1 input', input, 'output', output); + return output; + }); + + await context.run('step2', () => { + const output = someWork(result1); + console.log('step 2 input', result1, 'output', output); + }); + }, + { + qstashClient: this.qstashClient, + }, + )(req, res, next); + } +} diff --git a/examples/nestjs/src/app.module.ts b/examples/nestjs/src/app.module.ts new file mode 100644 index 00000000..f2664dc9 --- /dev/null +++ b/examples/nestjs/src/app.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { AppController } from './app.controller'; +import { AppService } from './app.service'; +import { ConfigModule } from '@nestjs/config'; + +@Module({ + imports: [ConfigModule.forRoot()], + controllers: [AppController], + providers: [AppService], +}) +export class AppModule {} diff --git a/examples/nestjs/src/app.service.ts b/examples/nestjs/src/app.service.ts new file mode 100644 index 00000000..927d7cca --- /dev/null +++ b/examples/nestjs/src/app.service.ts @@ -0,0 +1,8 @@ +import { Injectable } from '@nestjs/common'; + +@Injectable() +export class AppService { + getHello(): string { + return 'Hello World!'; + } +} diff --git a/examples/nestjs/src/main.ts b/examples/nestjs/src/main.ts new file mode 100644 index 00000000..7bde003d --- /dev/null +++ b/examples/nestjs/src/main.ts @@ -0,0 +1,8 @@ +import { NestFactory } from '@nestjs/core'; +import { AppModule } from './app.module'; + +async function bootstrap() { + const app = await NestFactory.create(AppModule); + await app.listen(process.env.PORT ?? 3001); +} +bootstrap(); diff --git a/examples/nestjs/test/app.e2e-spec.ts b/examples/nestjs/test/app.e2e-spec.ts new file mode 100644 index 00000000..4df6580c --- /dev/null +++ b/examples/nestjs/test/app.e2e-spec.ts @@ -0,0 +1,25 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { INestApplication } from '@nestjs/common'; +import * as request from 'supertest'; +import { App } from 'supertest/types'; +import { AppModule } from './../src/app.module'; + +describe('AppController (e2e)', () => { + let app: INestApplication; + + beforeEach(async () => { + const moduleFixture: TestingModule = await Test.createTestingModule({ + imports: [AppModule], + }).compile(); + + app = moduleFixture.createNestApplication(); + await app.init(); + }); + + it('/ (GET)', () => { + return request(app.getHttpServer()) + .get('/') + .expect(200) + .expect('Hello World!'); + }); +}); diff --git a/examples/nestjs/test/jest-e2e.json b/examples/nestjs/test/jest-e2e.json new file mode 100644 index 00000000..e9d912f3 --- /dev/null +++ b/examples/nestjs/test/jest-e2e.json @@ -0,0 +1,9 @@ +{ + "moduleFileExtensions": ["js", "json", "ts"], + "rootDir": ".", + "testEnvironment": "node", + "testRegex": ".e2e-spec.ts$", + "transform": { + "^.+\\.(t|j)s$": "ts-jest" + } +} diff --git a/examples/nestjs/tsconfig.build.json b/examples/nestjs/tsconfig.build.json new file mode 100644 index 00000000..64f86c6b --- /dev/null +++ b/examples/nestjs/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["node_modules", "test", "dist", "**/*spec.ts"] +} diff --git a/examples/nestjs/tsconfig.json b/examples/nestjs/tsconfig.json new file mode 100644 index 00000000..e4dbf2e6 --- /dev/null +++ b/examples/nestjs/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "module": "commonjs", + "declaration": true, + "removeComments": true, + "emitDecoratorMetadata": true, + "experimentalDecorators": true, + "allowSyntheticDefaultImports": true, + "target": "ES2023", + "sourceMap": true, + "outDir": "./dist", + "baseUrl": "./", + "incremental": true, + "skipLibCheck": true, + "strictNullChecks": true, + "forceConsistentCasingInFileNames": true, + "noImplicitAny": false, + "strictBindCallApply": false, + "noFallthroughCasesInSwitch": false + } +} diff --git a/examples/nextjs-12/README.md b/examples/nextjs-12/README.md index e873fa2c..0e7c10e9 100644 --- a/examples/nextjs-12/README.md +++ b/examples/nextjs-12/README.md @@ -4,37 +4,28 @@ This example is for testing the compatibility of Upstash Workflow with older ver ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh cloudflare-workers-hono -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.env.local` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_URL= -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env.local` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/nextjs-12/ci.mjs b/examples/nextjs-12/ci.mjs index d1654df1..6f3c8d99 100644 --- a/examples/nextjs-12/ci.mjs +++ b/examples/nextjs-12/ci.mjs @@ -13,8 +13,9 @@ const qstashClient = new Client({ token: "mock", }) -qstashClient.publish = async () => { - return { messageId: "msgId" } +// mocking batch +qstashClient.batch = async () => { + return [{ messageId: "msgId" }] } console.log(">>> TESTING INITIAL INVOCATION") diff --git a/examples/nextjs-pages/README.md b/examples/nextjs-pages/README.md index b18d8c6f..83527702 100644 --- a/examples/nextjs-pages/README.md +++ b/examples/nextjs-pages/README.md @@ -8,37 +8,28 @@ In the `src/pages/api/path.sh` file, you can find out how one can define endpoin ## Development -> [!TIP] -> You can use [the `bootstrap.sh` script](https://github.com/upstash/workflow-js/tree/main/examples) to run this example with a local tunnel. -> -> Simply set the environment variables as explained below and run the following command in the `workflow-js/examples` directory: -> -> ``` -> bash bootstrap.sh cloudflare-workers-hono -> ``` - 1. Install the dependencies ```bash npm install ``` -2. Get the credentials from the [Upstash Console](https://console.upstash.com/qstash) and add them to the `.env.local` file. +2. [Start the QStash development server](https://upstash.com/docs/workflow/howto/local-development): ```bash -QSTASH_URL= -QSTASH_TOKEN= +npx @upstash/qstash-cli dev ``` -3. Open a local tunnel to port of the development server +3. Once you run the development server, you will see `QSTASH_URL` and `QSTASH_TOKEN` environment variables for the local development server. Add these to the `.env.local` file: ```bash -ngrok http 3001 +QSTASH_URL="***" +QSTASH_TOKEN="***" ``` -Also, set the `UPSTASH_WORKLFOW_URL` environment variable to the public url provided by ngrok. +When you are deploying your app to production, you don't need to set `QSTASH_URL`. You should only set the `QSTASH_TOKEN` environment variable to the token you get from [Upstash Console](https://console.upstash.com/qstash). -4. Run the development server +4. Run your app: ```bash npm run dev diff --git a/examples/nextjs-pages/ci.test.ts b/examples/nextjs-pages/ci.test.ts index 7fc06e5c..8c03e758 100644 --- a/examples/nextjs-pages/ci.test.ts +++ b/examples/nextjs-pages/ci.test.ts @@ -146,8 +146,8 @@ describe("nextjs-pages", () => { token: "mock" }) - // @ts-expect-error mocking publish - qstashClient.publish = async () => { + // @ts-expect-error mocking batch + qstashClient.batch = async () => { return { messageId: "msgId" } } diff --git a/examples/nextjs/README.md b/examples/nextjs/README.md index ae466c7d..a21968db 100644 --- a/examples/nextjs/README.md +++ b/examples/nextjs/README.md @@ -20,7 +20,7 @@ To deploy the project at vercel and try the endpoints, you should start with set vercel ``` -Next, you shoud go to vercel.com, find your project and add `QSTASH_TOKEN`, to the project as environment variables. You can find this env variables from the [Upstash Console](https://console.upstash.com/qstash). To learn more about other env variables and their use in the context of Upstash Workflow, you can read [the Secure your Endpoint in our documentation](https://upstash.com/docs/qstash/workflow/howto/security#using-qstashs-built-in-request-verification-recommended). +Next, you shoud go to vercel.com, find your project and add `QSTASH_TOKEN`, to the project as environment variables. You can find this env variables from the [Upstash Console](https://console.upstash.com/workflow). To learn more about other env variables and their use in the context of Upstash Workflow, you can read [the Secure your Endpoint in our documentation](https://upstash.com/docs/qstash/workflow/howto/security#using-qstashs-built-in-request-verification-recommended). Once you add the env variables, you can deploy the project with: @@ -32,7 +32,7 @@ Note that the project won't work in preview. It should be deployed to production Once you have the app deployed, you can go to the deployment and call the endpoints using the form on the page. -You can observe the logs at [Upstash console under the Worfklow tab](https://console.upstash.com/qstash?tab=workflow) or vercel.com to see your workflow operate. +You can observe the logs at [Upstash console under the Worfklow tab](https://console.upstash.com/workflow) or vercel.com to see your workflow operate. ## Local Development diff --git a/examples/nextjs/ci.test.ts b/examples/nextjs/ci.test.ts index 54b7e1cb..ec3ceb75 100644 --- a/examples/nextjs/ci.test.ts +++ b/examples/nextjs/ci.test.ts @@ -8,9 +8,9 @@ const qstashClient = new Client({ token: "mock" }) -// @ts-expect-error mocking publish -qstashClient.publish = async () => { - return { messageId: "msgId" } +// mocking batch +qstashClient.batch = async () => { + return [{ messageId: "msgId" }] } const { POST: serveHandler } = serve( diff --git a/src/client/index.test.ts b/src/client/index.test.ts index 7603102c..f570a4ee 100644 --- a/src/client/index.test.ts +++ b/src/client/index.test.ts @@ -214,26 +214,188 @@ describe("workflow client", () => { }, responseFields: { status: 200, - body: "msgId", + body: [{ messageId: "msgId" }], }, receivesRequest: { method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, - body, - headers: { - "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-user-header": "user-header-value", - "upstash-method": "POST", - "upstash-retries": "15", - "upstash-workflow-init": "true", - "upstash-workflow-runid": `wfr_${myWorkflowRunId}`, - "upstash-workflow-url": "https://requestcatcher.com/api", - "upstash-delay": "1s", - }, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-forward-user-header": "user-header-value", + "upstash-method": "POST", + "upstash-retries": "15", + "upstash-workflow-init": "true", + "upstash-workflow-runid": `wfr_${myWorkflowRunId}`, + "upstash-workflow-url": "https://requestcatcher.com/api", + "upstash-delay": "1s", + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-telemetry-runtime": expect.stringMatching(/bun@/), + "upstash-telemetry-sdk": expect.stringMatching(/upstash-qstash-js@/), + "upstash-workflow-sdk-version": "1", + }, + body, + }, + ], + }, + }); + }); + + test("should trigger multiple workflow runs", async () => { + const myWorkflowRunId = `mock-${getWorkflowRunId()}`; + const myWorkflowRunId2 = `mock-${getWorkflowRunId()}`; + const body = "request-body"; + const body2 = "request-body-2"; + await mockQStashServer({ + execute: async () => { + const result = await client.trigger([ + { + url: WORKFLOW_ENDPOINT, + body, + headers: { "user-header": "user-header-value" }, + workflowRunId: myWorkflowRunId, + retries: 15, + delay: 1, + }, + { + url: WORKFLOW_ENDPOINT, + body: body2, + headers: { "user-header": "user-header-value" }, + workflowRunId: myWorkflowRunId2, + retries: 15, + delay: 1, + useFailureFunction: true, + }, + ]); + expect(result).toEqual([ + { workflowRunId: `wfr_${myWorkflowRunId}` }, + { workflowRunId: `wfr_${myWorkflowRunId2}` }, + ]); + }, + responseFields: { + status: 200, + body: [{ messageId: "msgId" }], + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-forward-user-header": "user-header-value", + "upstash-method": "POST", + "upstash-retries": "15", + "upstash-workflow-init": "true", + "upstash-workflow-runid": `wfr_${myWorkflowRunId}`, + "upstash-workflow-url": "https://requestcatcher.com/api", + "upstash-delay": "1s", + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-telemetry-runtime": expect.stringMatching(/bun@/), + "upstash-telemetry-sdk": expect.stringMatching(/upstash-qstash-js@/), + "upstash-workflow-sdk-version": "1", + }, + body, + }, + { + destination: WORKFLOW_ENDPOINT, + headers: { + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-forward-user-header": "user-header-value", + "upstash-method": "POST", + "upstash-retries": "15", + "upstash-workflow-init": "true", + "upstash-workflow-runid": `wfr_${myWorkflowRunId2}`, + "upstash-workflow-url": "https://requestcatcher.com/api", + "upstash-delay": "1s", + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-telemetry-runtime": expect.stringMatching(/bun@/), + "upstash-telemetry-sdk": expect.stringMatching(/upstash-qstash-js@/), + "upstash-workflow-sdk-version": "1", + "upstash-failure-callback": "https://requestcatcher.com/api", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-forward-upstash-workflow-failure-callback": "true", + "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-failure-callback-forward-user-header": "user-header-value", + "upstash-failure-callback-retries": "15", + "upstash-failure-callback-workflow-calltype": "failureCall", + "upstash-failure-callback-workflow-init": "false", + "upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId2}`, + "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", + }, + body: body2, + }, + ], + }, + }); + }); + + test("should trigger workflow run with failure callback", async () => { + const myWorkflowRunId = `mock-${getWorkflowRunId()}`; + const body = "request-body"; + await mockQStashServer({ + execute: async () => { + await client.trigger({ + url: WORKFLOW_ENDPOINT, + body, + headers: { "user-header": "user-header-value" }, + workflowRunId: myWorkflowRunId, + retries: 15, + delay: 1, + failureUrl: "https://requestcatcher.com/some-failure-callback", + }); + }, + responseFields: { + status: 200, + body: [{ messageId: "msgId" }], + }, + receivesRequest: { + method: "POST", + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, + token, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-forward-user-header": "user-header-value", + "upstash-method": "POST", + "upstash-retries": "15", + "upstash-workflow-init": "true", + "upstash-workflow-runid": `wfr_${myWorkflowRunId}`, + "upstash-workflow-url": "https://requestcatcher.com/api", + "upstash-delay": "1s", + "upstash-failure-callback": "https://requestcatcher.com/some-failure-callback", + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-feature-set": "LazyFetch,InitialBody", + "upstash-failure-callback-forward-upstash-workflow-failure-callback": "true", + "upstash-failure-callback-forward-upstash-workflow-is-failure": "true", + "upstash-failure-callback-forward-user-header": "user-header-value", + "upstash-failure-callback-retries": "15", + "upstash-failure-callback-workflow-calltype": "failureCall", + "upstash-failure-callback-workflow-init": "false", + "upstash-failure-callback-workflow-runid": `wfr_${myWorkflowRunId}`, + "upstash-failure-callback-workflow-url": "https://requestcatcher.com/api", + "upstash-telemetry-runtime": expect.stringMatching(/bun@/), + "upstash-telemetry-sdk": expect.stringMatching(/upstash-qstash-js@/), + "upstash-workflow-sdk-version": "1", + }, + body, + }, + ], }, }); }); + describe("logs", () => { test("should send logs request", async () => { const count = 10; @@ -330,6 +492,7 @@ describe("workflow client", () => { state: "STEP_SUCCESS", stepName: "init", stepType: "Initial", + retries: 3, }, ], type: "sequential", @@ -372,6 +535,7 @@ describe("workflow client", () => { state: "STEP_SUCCESS", stepName: "init", stepType: "Initial", + retries: 3, }, ], type: "sequential", @@ -430,14 +594,15 @@ describe("workflow client", () => { workflowState: "RUN_FAILED", workflowRunCreatedAt: expect.any(Number), workflowRunCompletedAt: expect.any(Number), + dlqId: expect.any(String), failureFunction: { messageId: expect.any(String), - dlqId: expect.any(String), failResponse: "400 Bad Request", failStatus: 400, url: "https://httpstat.us/400", state: "DELIVERED", failHeaders: expect.any(Object), + dlqId: expect.any(String), }, steps: [ { @@ -452,6 +617,7 @@ describe("workflow client", () => { }, messageId: expect.any(String), out: "some-body", + retries: 0, state: "STEP_SUCCESS", stepName: "init", stepType: "Initial", @@ -464,6 +630,15 @@ describe("workflow client", () => { { state: "STEP_FAILED", messageId: expect.any(String), + retries: 0, + errors: [ + { + error: "400 Bad Request", + headers: expect.any(Object), + status: 400, + time: expect.any(Number), + }, + ], }, ], type: "next", @@ -471,7 +646,7 @@ describe("workflow client", () => { ], }); }, - { timeout: 30_000, interval: 100 } + { timeout: 30_000, interval: 1000 } ); }, { diff --git a/src/client/index.ts b/src/client/index.ts index 8c6084e5..a4a75f4f 100644 --- a/src/client/index.ts +++ b/src/client/index.ts @@ -1,10 +1,10 @@ import { NotifyResponse, Waiter } from "../types"; -import { FlowControl, PublishRequest, Client as QStashClient } from "@upstash/qstash"; +import { Client as QStashClient } from "@upstash/qstash"; import { makeGetWaitersRequest, makeNotifyRequest } from "./utils"; import { getWorkflowRunId } from "../utils"; import { triggerFirstInvocation } from "../workflow-requests"; import { WorkflowContext } from "../context"; -import { WorkflowRunLog, WorkflowRunLogs } from "./types"; +import { TriggerOptions, WorkflowRunLog, WorkflowRunLogs } from "./types"; type ClientConfig = ConstructorParameters[0]; @@ -165,8 +165,9 @@ export class Client { } /** - * Trigger new workflow run and returns the workflow run id + * Trigger new workflow run and returns the workflow run id or an array of workflow run ids * + * trigger a single workflow run: * ```ts * const { workflowRunId } = await client.trigger({ * url: "https://workflow-endpoint.com", @@ -179,6 +180,31 @@ export class Client { * console.log(workflowRunId) * // wfr_my-workflow * ``` + * trigger multiple workflow runs: + * ```ts + * const result = await client.trigger([ + * { + * url: "https://workflow-endpoint.com", + * body: "hello there!", // Optional body + * headers: { ... }, // Optional headers + * workflowRunId: "my-workflow", // Optional workflow run ID + * retries: 3 // Optional retries for the initial request + * }, + * { + * url: "https://workflow-endpoint-2.com", + * body: "hello world!", // Optional body + * headers: { ... }, // Optional headers + * workflowRunId: "my-workflow-2", // Optional workflow run ID + * retries: 5 // Optional retries for the initial request + * }, + * ]); + * + * console.log(result) + * // [ + * // { workflowRunId: "wfr_my-workflow" }, + * // { workflowRunId: "wfr_my-workflow-2" }, + * // ] + * ``` * * @param url URL of the workflow * @param body body to start the workflow with @@ -195,45 +221,52 @@ export class Client { * @param delay Delay for the workflow run. This is used to delay the * execution of the workflow run. The delay is in seconds or can be passed * as a string with a time unit (e.g. "1h", "30m", "15s"). - * @returns workflow run id + * @returns workflow run id or an array of workflow run ids */ - public async trigger({ - url, - body, - headers, - workflowRunId, - retries, - flowControl, - delay, - }: { - url: string; - body?: unknown; - headers?: Record; - workflowRunId?: string; - retries?: number; - flowControl?: FlowControl; - delay?: PublishRequest["delay"]; - }): Promise<{ workflowRunId: string }> { - const finalWorkflowRunId = getWorkflowRunId(workflowRunId); - const context = new WorkflowContext({ - qstashClient: this.client, - // @ts-expect-error headers type mismatch - headers: new Headers(headers ?? {}), - initialPayload: body, - steps: [], - url, - workflowRunId: finalWorkflowRunId, - retries, - telemetry: undefined, // can't know workflow telemetry here - flowControl, - }); - const result = await triggerFirstInvocation({ - workflowContext: context, - telemetry: undefined, // can't know workflow telemetry here - delay, + + public async trigger(params: TriggerOptions): Promise<{ workflowRunId: string }>; + public async trigger(params: TriggerOptions[]): Promise<{ workflowRunId: string }[]>; + + public async trigger( + params: TriggerOptions | TriggerOptions[] + ): Promise<{ workflowRunId: string } | { workflowRunId: string }[]> { + const isBatchInput = Array.isArray(params); + const options = isBatchInput ? params : [params]; + + const invocations = options.map((option) => { + const failureUrl = option.useFailureFunction ? option.url : option.failureUrl; + const finalWorkflowRunId = getWorkflowRunId(option.workflowRunId); + + const context = new WorkflowContext({ + qstashClient: this.client, + // @ts-expect-error headers type mismatch + headers: new Headers(option.headers ?? {}), + initialPayload: option.body, + steps: [], + url: option.url, + workflowRunId: finalWorkflowRunId, + retries: option.retries, + telemetry: undefined, // can't know workflow telemetry here + flowControl: option.flowControl, + failureUrl, + }); + + return { + workflowContext: context, + telemetry: undefined, // can't know workflow telemetry here + delay: option.delay, + }; }); + const result = await triggerFirstInvocation(invocations); + + const workflowRunIds: string[] = invocations.map( + (invocation) => invocation.workflowContext.workflowRunId + ); + if (result.isOk()) { - return { workflowRunId: finalWorkflowRunId }; + return isBatchInput + ? workflowRunIds.map((id) => ({ workflowRunId: id })) + : { workflowRunId: workflowRunIds[0] }; } else { throw result.error; } diff --git a/src/client/types.ts b/src/client/types.ts index 3e4b990d..c7438c4f 100644 --- a/src/client/types.ts +++ b/src/client/types.ts @@ -1,4 +1,4 @@ -import { HTTPMethods, State } from "@upstash/qstash"; +import { FlowControl, HTTPMethods, PublishRequest, State } from "@upstash/qstash"; import { RawStep, StepType } from "../types"; type BaseStepLog = { @@ -49,6 +49,10 @@ type BaseStepLog = { * headers */ headers: Record; + /** + * retries + */ + retries: number; }; type CallUrlGroup = { @@ -165,6 +169,19 @@ type StepLogGroup = steps: { messageId: string; state: "STEP_PROGRESS" | "STEP_RETRY" | "STEP_FAILED" | "STEP_CANCELED"; + /** + * retries + */ + retries: number; + /** + * errors which occured in the step + */ + errors?: { + error: string; + headers: Record; + status: number; + time: number; + }[]; }[]; /** * Log which belongs to the next step @@ -197,6 +214,9 @@ type FailureFunctionLog = { * Response body of the step which caused the workflow to fail */ failResponse: string; + /** + * @deprecated use dlqId field of the workflow run itself + */ dlqId: string; }; @@ -264,9 +284,75 @@ export type WorkflowRunLog = { */ workflowRunCreatedAt: number; }; + /** + * If the workflow run has failed, id of the run in DLQ + */ + dlqId?: string; }; export type WorkflowRunLogs = { cursor: string; runs: WorkflowRunLog[]; }; + +export type TriggerOptions = { + /** + * URL of the workflow to trigger + */ + url: string; + /** + * Body to send to the workflow + */ + body?: unknown; + /** + * Headers to send to the workflow + */ + headers?: Record; + /** + * Workflow run id to use for the workflow run. + * If not provided, a random workflow run id will be generated. + */ + workflowRunId?: string; + /** + * Number of retries to perform if the request fails. + * + * @default 3 + */ + retries?: number; + /** + * Flow control to use for the workflow run. + * If not provided, no flow control will be used. + */ + flowControl?: FlowControl; + /** + * Delay to apply before triggering the workflow. + */ + delay?: PublishRequest["delay"]; +} & ( + | { + /** + * URL to call if the first request to the workflow endpoint fails + */ + failureUrl?: never; + /** + * Whether the workflow endpoint has a failure function + * defined to be invoked if the first request fails. + * + * If true, the failureUrl will be ignored. + */ + useFailureFunction?: true; + } + | { + /** + * URL to call if the first request to the workflow endpoint fails + */ + failureUrl?: string; + /** + * Whether the workflow endpoint has a failure function + * defined to be invoked if the first request fails. + * + * If true, the failureUrl will be ignored. + */ + useFailureFunction?: never; + } +); diff --git a/src/constants.ts b/src/constants.ts index 02bfe9d9..31779b1f 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -16,7 +16,7 @@ export const NO_CONCURRENCY = 1; export const NOT_SET = "not-set"; export const DEFAULT_RETRIES = 3; -export const VERSION = "v0.2.13"; +export const VERSION = "v0.2.14"; export const SDK_TELEMETRY = `@upstash/workflow@${VERSION}`; export const TELEMETRY_HEADER_SDK = "Upstash-Telemetry-Sdk" as const; diff --git a/src/context/steps.ts b/src/context/steps.ts index 5f951851..543bb6d8 100644 --- a/src/context/steps.ts +++ b/src/context/steps.ts @@ -393,7 +393,7 @@ export class LazyCallStep extends BaseLazySt } } - private static applicationHeaders = new Set([ + private static applicationContentTypes = [ "application/json", "application/xml", "application/javascript", @@ -402,13 +402,13 @@ export class LazyCallStep extends BaseLazySt "application/ld+json", "application/rss+xml", "application/atom+xml", - ]); + ]; private static isText = (contentTypeHeader: string | null) => { if (!contentTypeHeader) { return false; } - if (LazyCallStep.applicationHeaders.has(contentTypeHeader)) { + if (LazyCallStep.applicationContentTypes.some((type) => contentTypeHeader.includes(type))) { return true; } if (contentTypeHeader.startsWith("text/")) { diff --git a/src/receiver.test.ts b/src/receiver.test.ts index b0938000..39161bdd 100644 --- a/src/receiver.test.ts +++ b/src/receiver.test.ts @@ -196,12 +196,34 @@ describe("receiver", () => { const response = await endpoint(requestWithHeader); expect(response.status).toBe(200); }, - responseFields: { body: "msgId", status: 200 }, + responseFields: { body: [{ messageId: "msgId" }], status: 200 }, receivesRequest: { method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, - body, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-forward-authorization": expect.stringMatching(/Bearer /), + "upstash-forward-upstash-signature": expect.any(String), + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": expect.stringMatching(/unknown, bun@/), + "upstash-telemetry-sdk": expect.stringMatching( + /@upstash\/workflow@.*upstash-qstash-js@/ + ), + "upstash-workflow-init": "true", + "upstash-workflow-runid": expect.any(String), + "upstash-workflow-sdk-version": "1", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + body: JSON.stringify(body), + }, + ], }, }); expect(called).toBeTrue(); diff --git a/src/serve/serve.test.ts b/src/serve/serve.test.ts index 4a3aaf86..f2199aff 100644 --- a/src/serve/serve.test.ts +++ b/src/serve/serve.test.ts @@ -65,20 +65,33 @@ describe("serve", () => { const response = await endpoint(request); expect(response.status).toBe(200); }, - responseFields: { body: "msgId", status: 200 }, + responseFields: { body: [{ messageId: "msgId" }], status: 200 }, receivesRequest: { method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/${WORKFLOW_ENDPOINT}`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, - body: initialPayload, - headers: { - [WORKFLOW_INIT_HEADER]: "true", - [WORKFLOW_PROTOCOL_VERSION_HEADER]: "1", - [`Upstash-Forward-${WORKFLOW_PROTOCOL_VERSION_HEADER}`]: "1", - "upstash-retries": "1", - "Upstash-Flow-Control-Key": "my-key", - "Upstash-Flow-Control-Value": "parallelism=1", - }, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-flow-control-key": "my-key", + "upstash-flow-control-value": "parallelism=1", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-retries": "1", + "upstash-telemetry-framework": "unknown", + "upstash-telemetry-runtime": "unknown", + "upstash-telemetry-sdk": "@upstash/workflow@v0.2.13", + "upstash-workflow-init": "true", + "upstash-workflow-runid": expect.stringMatching(/^wfr_/), + "upstash-workflow-sdk-version": "1", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + body: initialPayload, + }, + ], }, }); }); @@ -148,25 +161,32 @@ describe("serve", () => { { stepsToAdd: [], responseFields: { - body: { messageId: "some-message-id" }, + body: [{ messageId: "some-message-id" }], status: 200, }, receivesRequest: { method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/https://requestcatcher.com/api`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, - body: initialPayload, - headers: { - "upstash-workflow-sdk-version": "1", - "upstash-feature-set": "LazyFetch,InitialBody", - "upstash-forward-upstash-workflow-sdk-version": "1", - "upstash-forward-upstash-workflow-invoke-count": "2", - "upstash-method": "POST", - "upstash-workflow-init": "true", - "upstash-workflow-url": WORKFLOW_ENDPOINT, - "upstash-flow-control-key": "my-key", - "upstash-flow-control-value": "rate=3", - }, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "upstash-workflow-sdk-version": "1", + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-forward-upstash-workflow-invoke-count": "2", + "upstash-method": "POST", + "upstash-workflow-init": "true", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + "upstash-flow-control-key": "my-key", + "upstash-flow-control-value": "rate=3", + "upstash-workflow-runid": expect.stringMatching(/^wfr/), + }, + body: initialPayload, + }, + ], }, }, { @@ -1020,7 +1040,9 @@ describe("serve", () => { qstashClient.batch = jest .fn() .mockReturnValue([{ deduplicatedId: false, messageId: "some-message-id" }]); - qstashClient.publish = jest.fn({ deduplicatedId: false, messageId: "some-message-id" }); + qstashClient.publish = jest + .fn() + .mockReturnValue({ deduplicatedId: false, messageId: "some-message-id" }); const client = new WorkflowClient({ token: process.env.QSTASH_TOKEN! }); test("allow http://", async () => { diff --git a/src/workflow-requests.test.ts b/src/workflow-requests.test.ts index 0b4b35bd..415cb95c 100644 --- a/src/workflow-requests.test.ts +++ b/src/workflow-requests.test.ts @@ -56,20 +56,32 @@ describe("Workflow Requests", () => { expect(result.isOk()).toBeTrue(); }, responseFields: { - body: { messageId: "msgId" }, + body: [{ messageId: "msgId" }], status: 200, }, receivesRequest: { method: "POST", - url: `${MOCK_QSTASH_SERVER_URL}/v2/publish/https://requestcatcher.com/api`, + url: `${MOCK_QSTASH_SERVER_URL}/v2/batch`, token, - body: initialPayload, - headers: { - "upstash-retries": "0", - "Upstash-Workflow-Init": "true", - "Upstash-Workflow-RunId": workflowRunId, - "Upstash-Workflow-Url": WORKFLOW_ENDPOINT, - }, + body: [ + { + destination: WORKFLOW_ENDPOINT, + headers: { + "content-type": "application/json", + "upstash-feature-set": "LazyFetch,InitialBody", + "upstash-forward-upstash-workflow-sdk-version": "1", + "upstash-method": "POST", + "upstash-retries": "0", + "upstash-telemetry-runtime": expect.stringMatching(/bun@/), + "upstash-telemetry-sdk": expect.stringMatching(/upstash-qstash-js@/), + "upstash-workflow-init": "true", + "upstash-workflow-runid": workflowRunId, + "upstash-workflow-sdk-version": "1", + "upstash-workflow-url": WORKFLOW_ENDPOINT, + }, + body: initialPayload, + }, + ], }, }); }); diff --git a/src/workflow-requests.ts b/src/workflow-requests.ts index df7ec320..a6d739d6 100644 --- a/src/workflow-requests.ts +++ b/src/workflow-requests.ts @@ -20,79 +20,105 @@ import type { } from "./types"; import { StepTypes } from "./types"; import type { WorkflowLogger } from "./logger"; -import { FlowControl, PublishRequest, QstashError } from "@upstash/qstash"; +import { FlowControl, PublishBatchRequest, PublishRequest, QstashError } from "@upstash/qstash"; import { getSteps } from "./client/utils"; import { getHeaders } from "./qstash/headers"; +import { PublishToUrlResponse } from "@upstash/qstash"; -export const triggerFirstInvocation = async ({ - workflowContext, - useJSONContent, - telemetry, - debug, - invokeCount, - delay, -}: { +type TriggerFirstInvocationParams = { workflowContext: WorkflowContext; useJSONContent?: boolean; telemetry?: Telemetry; debug?: WorkflowLogger; invokeCount?: number; delay?: PublishRequest["delay"]; -}): Promise | Err> => { - const { headers } = getHeaders({ - initHeaderValue: "true", - workflowConfig: { - workflowRunId: workflowContext.workflowRunId, - workflowUrl: workflowContext.url, - failureUrl: workflowContext.failureUrl, - retries: workflowContext.retries, - telemetry, - flowControl: workflowContext.flowControl, - useJSONContent: useJSONContent ?? false, - }, - invokeCount: invokeCount ?? 0, - userHeaders: workflowContext.headers, - }); +}; - // QStash doesn't forward content-type when passed in `upstash-forward-content-type` - // so we need to pass it in the headers - if (workflowContext.headers.get("content-type")) { - headers["content-type"] = workflowContext.headers.get("content-type")!; - } +export const triggerFirstInvocation = async ( + params: + | TriggerFirstInvocationParams + | TriggerFirstInvocationParams[] +): Promise | Err> => { + const firstInvocationParams = Array.isArray(params) ? params : [params]; + const workflowContextClient = firstInvocationParams[0].workflowContext.qstashClient; - if (useJSONContent) { - headers["content-type"] = "application/json"; - } + const invocationBatch = firstInvocationParams.map( + ({ workflowContext, useJSONContent, telemetry, invokeCount, delay }) => { + const { headers } = getHeaders({ + initHeaderValue: "true", + workflowConfig: { + workflowRunId: workflowContext.workflowRunId, + workflowUrl: workflowContext.url, + failureUrl: workflowContext.failureUrl, + retries: workflowContext.retries, + telemetry: telemetry, + flowControl: workflowContext.flowControl, + useJSONContent: useJSONContent ?? false, + }, + invokeCount: invokeCount ?? 0, + userHeaders: workflowContext.headers, + }); - try { - const body = - typeof workflowContext.requestPayload === "string" - ? workflowContext.requestPayload - : JSON.stringify(workflowContext.requestPayload); - const result = await workflowContext.qstashClient.publish({ - headers, - method: "POST", - body, - url: workflowContext.url, - delay, - }); + // QStash doesn't forward content-type when passed in `upstash-forward-content-type` + // so we need to pass it in the headers + if (workflowContext.headers.get("content-type")) { + headers["content-type"] = workflowContext.headers.get("content-type")!; + } - if (result.deduplicated) { - await debug?.log("WARN", "SUBMIT_FIRST_INVOCATION", { - message: `Workflow run ${workflowContext.workflowRunId} already exists. A new one isn't created.`, + if (useJSONContent) { + headers["content-type"] = "application/json"; + } + + const body = + typeof workflowContext.requestPayload === "string" + ? workflowContext.requestPayload + : JSON.stringify(workflowContext.requestPayload); + + return { headers, - requestPayload: workflowContext.requestPayload, + method: "POST", + body, url: workflowContext.url, - messageId: result.messageId, - }); + delay: delay, + } as PublishBatchRequest; + } + ); + + try { + const results = (await workflowContextClient.batch(invocationBatch)) as PublishToUrlResponse[]; + + const invocationStatuses: ("success" | "workflow-run-already-exists")[] = []; + + for (let i = 0; i < results.length; i++) { + const result = results[i]; + const invocationParams = firstInvocationParams[i]; + if (result.deduplicated) { + await invocationParams.debug?.log("WARN", "SUBMIT_FIRST_INVOCATION", { + message: `Workflow run ${invocationParams.workflowContext.workflowRunId} already exists. A new one isn't created.`, + headers: invocationBatch[i].headers, + requestPayload: invocationParams.workflowContext.requestPayload, + url: invocationParams.workflowContext.url, + messageId: result.messageId, + }); + invocationStatuses.push("workflow-run-already-exists"); + } else { + await invocationParams.debug?.log("SUBMIT", "SUBMIT_FIRST_INVOCATION", { + headers: invocationBatch[i].headers, + requestPayload: invocationParams.workflowContext.requestPayload, + url: invocationParams.workflowContext.url, + messageId: result.messageId, + }); + invocationStatuses.push("success"); + } + } + + const hasAnyDeduplicated = invocationStatuses.some( + (status) => status === "workflow-run-already-exists" + ); + + if (hasAnyDeduplicated) { return ok("workflow-run-already-exists"); } else { - await debug?.log("SUBMIT", "SUBMIT_FIRST_INVOCATION", { - headers, - requestPayload: workflowContext.requestPayload, - url: workflowContext.url, - messageId: result.messageId, - }); return ok("success"); } } catch (error) {