Skip to content

Commit

Permalink
feat(web): send trace events via Redis instead of Rest (langfuse#2579)
Browse files Browse the repository at this point in the history
  • Loading branch information
maxdeichmann authored Jul 24, 2024
1 parent 90fc9fb commit 6554c5e
Show file tree
Hide file tree
Showing 28 changed files with 239 additions and 165 deletions.
6 changes: 6 additions & 0 deletions .env.prod.example
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,11 @@ LANGFUSE_CSP_ENFORCE_HTTPS="true"
# Admin API
# ADMIN_API_KEY=

# Redis
# REDIS_HOST=
# REDIS_PORT=
# REDIS_AUTH=
# REDIS_CONNECTION_STRING=


### END Langfuse Cloud Config
4 changes: 3 additions & 1 deletion .github/workflows/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
fi
tests-web:
timeout-minutes: 20
runs-on: ubuntu-latest
name: tests-web (node${{ matrix.node-version }}, pg${{ matrix.postgres-version }})
strategy:
Expand Down Expand Up @@ -93,7 +94,7 @@ jobs:
- name: Load default env
run: |
cp .env.dev.example .env
cp .env.dev.example web/.env
grep -v '^REDIS_HOST=' .env.dev.example > web/.env
- name: Run + migrate
run: |
Expand All @@ -118,6 +119,7 @@ jobs:
run: pnpm --filter=web run test

tests-worker:
timeout-minutes: 20
runs-on: ubuntu-latest
name: tests-worker (node${{ matrix.node-version }}, pg${{ matrix.postgres-version }})
strategy:
Expand Down
2 changes: 2 additions & 0 deletions packages/shared/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
"@react-email/render": "^0.0.15",
"@types/bcryptjs": "^2.4.6",
"bcryptjs": "^2.4.3",
"bullmq": "^5.4.2",
"ioredis": "^5.4.1",
"kysely": "^0.27.3",
"langchain": "^0.2.6",
"lodash": "^4.17.21",
Expand Down
5 changes: 5 additions & 0 deletions packages/shared/prisma/seed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { v4 } from "uuid";
import { ModelUsageUnit } from "../src";
import { getDisplaySecretKey, hashSecretKey } from "../src/server";
import { encrypt } from "../src/encryption";
import { redis } from "../src/server/redis/redis";

const LOAD_TRACE_VOLUME = 10_000;

Expand Down Expand Up @@ -354,10 +355,14 @@ async function main() {
main()
.then(async () => {
await prisma.$disconnect();
redis?.disconnect();
console.log("Disconnected from postgres and redis");
})
.catch(async (e) => {
console.error(e);
await prisma.$disconnect();
redis?.disconnect();
console.log("Disconnected from postgres and redis");
process.exit(1);
});

Expand Down
15 changes: 15 additions & 0 deletions packages/shared/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,21 @@
import { z } from "zod";

const EnvSchema = z.object({
NODE_ENV: z
.enum(["development", "test", "production"])
.default("development"),
REDIS_HOST: z.string().nullish(),
REDIS_PORT: z.coerce
.number({
description:
".env files convert numbers to strings, therefoore we have to enforce them to be numbers",
})
.positive()
.max(65536, `options.port should be >= 0 and < 65536`)
.default(6379)
.nullable(),
REDIS_AUTH: z.string().nullish(),
REDIS_CONNECTION_STRING: z.string().nullish(),
ENCRYPTION_KEY: z
.string()
.length(
Expand Down
2 changes: 2 additions & 0 deletions packages/shared/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ export * from "./llm/fetchLLMCompletion";
export * from "./llm/types";
export * from "./utils/DatabaseReadStream";
export * from "./utils/transforms";
export * from "./redis/redis";
export * from "./redis/trace-upsert";
20 changes: 10 additions & 10 deletions worker/src/redis.ts → packages/shared/src/server/redis/redis.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import Redis from "ioredis";
import { env } from "./env";
import logger from "./logger";
import { env } from "../../env";

const createRedisClient = () => {
try {
return env.REDIS_CONNECTION_STRING
? new Redis(env.REDIS_CONNECTION_STRING, { maxRetriesPerRequest: null })
: new Redis({
host: String(env.REDIS_HOST),
port: Number(env.REDIS_PORT),
password: String(env.REDIS_AUTH),
maxRetriesPerRequest: null, // Set to `null` to disable retrying
});
: env.REDIS_HOST
? new Redis({
host: String(env.REDIS_HOST),
port: Number(env.REDIS_PORT),
password: String(env.REDIS_AUTH),
maxRetriesPerRequest: null, // Set to `null` to disable retrying
})
: null;
} catch (e) {
logger.error(e, "Failed to connect to redis");
console.error(e, "Failed to connect to redis");
return null;
}
};

declare global {
// eslint-disable-next-line no-var
var redis: undefined | ReturnType<typeof createRedisClient>;
Expand Down
56 changes: 56 additions & 0 deletions packages/shared/src/server/redis/trace-upsert.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { randomUUID } from "crypto";
import {
QueueJobs,
QueueName,
TQueueJobTypes,
TraceUpsertEventType,
} from "../../queues";
import { Queue } from "bullmq";
import { redis } from "./redis";

export const traceUpsertQueue = redis
? new Queue<TQueueJobTypes[QueueName.TraceUpsert]>(QueueName.TraceUpsert, {
connection: redis,
})
: null;

export function convertTraceUpsertEventsToRedisEvents(
events: TraceUpsertEventType[]
) {
const uniqueTracesPerProject = events.reduce((acc, event) => {
if (!acc.get(event.projectId)) {
acc.set(event.projectId, new Set());
}
acc.get(event.projectId)?.add(event.traceId);
return acc;
}, new Map<string, Set<string>>());

const jobs = [...uniqueTracesPerProject.entries()]
.map((tracesPerProject) => {
const [projectId, traceIds] = tracesPerProject;

return [...traceIds].map((traceId) => ({
name: QueueJobs.TraceUpsert,
data: {
payload: {
projectId,
traceId,
},
id: randomUUID(),
timestamp: new Date(),
name: QueueJobs.TraceUpsert as const,
},
opts: {
removeOnFail: 10000,
removeOnComplete: true,
attempts: 5,
backoff: {
type: "exponential",
delay: 1000,
},
},
}));
})
.flat();
return jobs;
}
33 changes: 11 additions & 22 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions web/jest.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,16 @@ const clientTestConfig = {
displayName: "client",
testMatch: ["/**/*.clienttest.[jt]s?(x)"],
testEnvironment: "jest-environment-jsdom",
// setupFilesAfterEnv: ["<rootDir>/src/__tests__/after-teardown.ts"],
// globalTeardown: "<rootDir>/src/__tests__/teardown.ts",
};

const serverTestConfig = {
displayName: "server",
testMatch: ["/**/*.servertest.[jt]s?(x)"],
testEnvironment: "jest-environment-node",
setupFilesAfterEnv: ["<rootDir>/src/__tests__/after-teardown.ts"],
globalTeardown: "<rootDir>/src/__tests__/teardown.ts",
};

// To avoid the "Cannot use import statement outside a module" errors while transforming ESM.
Expand Down
2 changes: 1 addition & 1 deletion web/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"prettier": "prettier --write ./src *.{ts,js}",
"clean": "rm -rf node_modules",
"start": "dotenv -e ../.env -- sh -c 'NEXT_MANUAL_SIG_HANDLE=true next start'",
"test": "dotenv -e ../.env -- jest --runInBand",
"test": "dotenv -e ../.env -- jest --runInBand --detectOpenHandles",
"test:watch": "dotenv -e ../.env -- jest --watch --runInBand",
"test:e2e": "dotenv -e ../.env -- playwright test",
"models:migrate": "dotenv -e ../.env -- tsx scripts/model-match.ts",
Expand Down
5 changes: 5 additions & 0 deletions web/src/__tests__/after-teardown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import teardown from "@/src/__tests__/teardown";

afterAll(async () => {
await teardown();
});
13 changes: 13 additions & 0 deletions web/src/__tests__/teardown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
export default async function teardown() {
const { redis } = await import("@langfuse/shared/src/server");
console.log(`Redis status ${redis?.status}`);
if (!redis) {
return;
}
if (redis.status === "end") {
console.log("Redis connection already closed");
return;
}
redis?.disconnect();
console.log("Teardown complete");
}
18 changes: 17 additions & 1 deletion web/src/env.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,21 @@ export const env = createEnv({
.string()
.length(
64,
"ENCRYPTION_KEY must be 256 bits, 64 string characters in hex format, generate via: openssl rand -hex 32"
"ENCRYPTION_KEY must be 256 bits, 64 string characters in hex format, generate via: openssl rand -hex 32",
)
.optional(),
REDIS_HOST: z.string().nullish(),
REDIS_PORT: z.coerce
.number({
description:
".env files convert numbers to strings, therefoore we have to enforce them to be numbers",
})
.positive()
.max(65536, `options.port should be >= 0 and < 65536`)
.default(6379)
.nullable(),
REDIS_AUTH: z.string().nullish(),
REDIS_CONNECTION_STRING: z.string().nullish(),
},

/**
Expand Down Expand Up @@ -239,6 +251,10 @@ export const env = createEnv({
LANGFUSE_EE_LICENSE_KEY: process.env.LANGFUSE_EE_LICENSE_KEY,
ADMIN_API_KEY: process.env.ADMIN_API_KEY,
ENCRYPTION_KEY: process.env.ENCRYPTION_KEY,
REDIS_HOST: process.env.REDIS_HOST,
REDIS_PORT: process.env.REDIS_PORT,
REDIS_AUTH: process.env.REDIS_AUTH,
REDIS_CONNECTION_STRING: process.env.REDIS_CONNECTION_STRING,
},
// Skip validation in Docker builds
// DOCKER_BUILD is set in Dockerfile
Expand Down
9 changes: 0 additions & 9 deletions web/src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,8 @@
// See: https://vercel.com/docs/observability/otel-overview

import { shutdown } from "@/src/utils/shutdown";
import prexit from "prexit";

export async function register() {
if (process.env.NEXT_RUNTIME === "nodejs") {
await import("./sentry.server.config");
if (process.env.NEXT_MANUAL_SIG_HANDLE) {
prexit(async (signal) => {
console.log("Signal: ", signal);
return await shutdown(signal);
});
}
}

if (process.env.NEXT_RUNTIME === "edge") {
Expand Down
Loading

0 comments on commit 6554c5e

Please sign in to comment.