Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions apps/dbagent/dev/langfuse/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Make sure to update the credential placeholders with your own secrets.
# We mark them with # CHANGEME in the file below.
# In addition, we recommend to restrict inbound traffic on the host to langfuse-web (port 3000) and minio (port 9090) only.
# All other components are bound to localhost (127.0.0.1) to only accept connections from the local machine.
# External connections from other machines will not be able to reach these services directly.
services:
langfuse-worker:
image: langfuse/langfuse-worker:3
restart: always
depends_on: &langfuse-depends-on
postgres:
condition: service_healthy
minio:
condition: service_healthy
redis:
condition: service_healthy
clickhouse:
condition: service_healthy
ports:
- 127.0.0.1:3030:3030
environment: &langfuse-worker-env
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres # CHANGEME
SALT: 'mysalt' # CHANGEME
ENCRYPTION_KEY: '0000000000000000000000000000000000000000000000000000000000000000' # CHANGEME: generate via `openssl rand -hex 32`
TELEMETRY_ENABLED: ${TELEMETRY_ENABLED:-true}
LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: ${LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES:-true}
CLICKHOUSE_MIGRATION_URL: ${CLICKHOUSE_MIGRATION_URL:-clickhouse://clickhouse:9000}
CLICKHOUSE_URL: ${CLICKHOUSE_URL:-http://clickhouse:8123}
CLICKHOUSE_USER: ${CLICKHOUSE_USER:-clickhouse}
CLICKHOUSE_PASSWORD: ${CLICKHOUSE_PASSWORD:-clickhouse} # CHANGEME
CLICKHOUSE_CLUSTER_ENABLED: ${CLICKHOUSE_CLUSTER_ENABLED:-false}
LANGFUSE_S3_EVENT_UPLOAD_BUCKET: ${LANGFUSE_S3_EVENT_UPLOAD_BUCKET:-langfuse}
LANGFUSE_S3_EVENT_UPLOAD_REGION: ${LANGFUSE_S3_EVENT_UPLOAD_REGION:-auto}
LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_EVENT_UPLOAD_ACCESS_KEY_ID:-minio}
LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_EVENT_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} # CHANGEME
LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT: ${LANGFUSE_S3_EVENT_UPLOAD_ENDPOINT:-http://minio:9000}
LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_EVENT_UPLOAD_FORCE_PATH_STYLE:-true}
LANGFUSE_S3_EVENT_UPLOAD_PREFIX: ${LANGFUSE_S3_EVENT_UPLOAD_PREFIX:-events/}
LANGFUSE_S3_MEDIA_UPLOAD_BUCKET: ${LANGFUSE_S3_MEDIA_UPLOAD_BUCKET:-langfuse}
LANGFUSE_S3_MEDIA_UPLOAD_REGION: ${LANGFUSE_S3_MEDIA_UPLOAD_REGION:-auto}
LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID: ${LANGFUSE_S3_MEDIA_UPLOAD_ACCESS_KEY_ID:-minio}
LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY: ${LANGFUSE_S3_MEDIA_UPLOAD_SECRET_ACCESS_KEY:-miniosecret} # CHANGEME
LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT: ${LANGFUSE_S3_MEDIA_UPLOAD_ENDPOINT:-http://localhost:9090}
LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE: ${LANGFUSE_S3_MEDIA_UPLOAD_FORCE_PATH_STYLE:-true}
LANGFUSE_S3_MEDIA_UPLOAD_PREFIX: ${LANGFUSE_S3_MEDIA_UPLOAD_PREFIX:-media/}
LANGFUSE_S3_BATCH_EXPORT_ENABLED: ${LANGFUSE_S3_BATCH_EXPORT_ENABLED:-false}
LANGFUSE_S3_BATCH_EXPORT_BUCKET: ${LANGFUSE_S3_BATCH_EXPORT_BUCKET:-langfuse}
LANGFUSE_S3_BATCH_EXPORT_PREFIX: ${LANGFUSE_S3_BATCH_EXPORT_PREFIX:-exports/}
LANGFUSE_S3_BATCH_EXPORT_REGION: ${LANGFUSE_S3_BATCH_EXPORT_REGION:-auto}
LANGFUSE_S3_BATCH_EXPORT_ENDPOINT: ${LANGFUSE_S3_BATCH_EXPORT_ENDPOINT:-http://minio:9000}
LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT: ${LANGFUSE_S3_BATCH_EXPORT_EXTERNAL_ENDPOINT:-http://localhost:9090}
LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID: ${LANGFUSE_S3_BATCH_EXPORT_ACCESS_KEY_ID:-minio}
LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY: ${LANGFUSE_S3_BATCH_EXPORT_SECRET_ACCESS_KEY:-miniosecret} # CHANGEME
LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE: ${LANGFUSE_S3_BATCH_EXPORT_FORCE_PATH_STYLE:-true}
LANGFUSE_INGESTION_QUEUE_DELAY_MS: ${LANGFUSE_INGESTION_QUEUE_DELAY_MS:-}
LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS: ${LANGFUSE_INGESTION_CLICKHOUSE_WRITE_INTERVAL_MS:-}
REDIS_HOST: ${REDIS_HOST:-redis}
REDIS_PORT: ${REDIS_PORT:-6379}
REDIS_AUTH: ${REDIS_AUTH:-myredissecret} # CHANGEME
REDIS_TLS_ENABLED: ${REDIS_TLS_ENABLED:-false}
REDIS_TLS_CA: ${REDIS_TLS_CA:-/certs/ca.crt}
REDIS_TLS_CERT: ${REDIS_TLS_CERT:-/certs/redis.crt}
REDIS_TLS_KEY: ${REDIS_TLS_KEY:-/certs/redis.key}

langfuse-web:
image: langfuse/langfuse:3
restart: always
depends_on: *langfuse-depends-on
ports:
- 3000:3000
environment:
<<: *langfuse-worker-env
NEXTAUTH_URL: http://localhost:3000
NEXTAUTH_SECRET: mysecret # CHANGEME
LANGFUSE_INIT_ORG_ID: ${LANGFUSE_INIT_ORG_ID:-}
LANGFUSE_INIT_ORG_NAME: ${LANGFUSE_INIT_ORG_NAME:-}
LANGFUSE_INIT_PROJECT_ID: ${LANGFUSE_INIT_PROJECT_ID:-}
LANGFUSE_INIT_PROJECT_NAME: ${LANGFUSE_INIT_PROJECT_NAME:-}
LANGFUSE_INIT_PROJECT_PUBLIC_KEY: ${LANGFUSE_INIT_PROJECT_PUBLIC_KEY:-}
LANGFUSE_INIT_PROJECT_SECRET_KEY: ${LANGFUSE_INIT_PROJECT_SECRET_KEY:-}
LANGFUSE_INIT_USER_EMAIL: ${LANGFUSE_INIT_USER_EMAIL:-}
LANGFUSE_INIT_USER_NAME: ${LANGFUSE_INIT_USER_NAME:-}
LANGFUSE_INIT_USER_PASSWORD: ${LANGFUSE_INIT_USER_PASSWORD:-}

clickhouse:
image: clickhouse/clickhouse-server
restart: always
user: '101:101'
environment:
CLICKHOUSE_DB: default
CLICKHOUSE_USER: clickhouse
CLICKHOUSE_PASSWORD: clickhouse # CHANGEME
volumes:
- langfuse_clickhouse_data:/var/lib/clickhouse
- langfuse_clickhouse_logs:/var/log/clickhouse-server
#ports:
# - 127.0.0.1:8123:8123
# - 127.0.0.1:9000:9000
healthcheck:
test: wget --no-verbose --tries=1 --spider http://localhost:8123/ping || exit 1
interval: 5s
timeout: 5s
retries: 10
start_period: 1s

minio:
image: minio/minio
restart: always
entrypoint: sh
# create the 'langfuse' bucket before starting the service
command: -c 'mkdir -p /data/langfuse && minio server --address ":9000" --console-address ":9001" /data'
environment:
MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: miniosecret # CHANGEME
#ports:
# - 9090:9000
# - 127.0.0.1:9091:9001
volumes:
- langfuse_minio_data:/data
healthcheck:
test: ['CMD', 'mc', 'ready', 'local']
interval: 1s
timeout: 5s
retries: 5
start_period: 1s

redis:
image: redis:7
restart: always
# CHANGEME: row below to secure redis password
command: >
--requirepass ${REDIS_AUTH:-myredissecret}
#ports:
# - 127.0.0.1:6379:6379
healthcheck:
test: ['CMD', 'redis-cli', 'ping']
interval: 3s
timeout: 10s
retries: 10

postgres:
image: postgres:${POSTGRES_VERSION:-latest}
restart: always
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U postgres']
interval: 3s
timeout: 3s
retries: 10
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres # CHANGEME
POSTGRES_DB: postgres
#ports:
# - 127.0.0.1:5432:5432
volumes:
- langfuse_postgres_data:/var/lib/postgresql/data

volumes:
langfuse_postgres_data:
driver: local
langfuse_clickhouse_data:
driver: local
langfuse_clickhouse_logs:
driver: local
langfuse_minio_data:
driver: local
134 changes: 134 additions & 0 deletions apps/dbagent/instrumentation-node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import { diag, DiagConsoleLogger, DiagLogLevel } from '@opentelemetry/api';
import { getNodeAutoInstrumentations } from '@opentelemetry/auto-instrumentations-node';
import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks';
import { ExportResult } from '@opentelemetry/core';
import { OTLPTraceExporter as OTLPGrpcTraceExporter } from '@opentelemetry/exporter-trace-otlp-grpc';
import { OTLPTraceExporter as OTLPHttpJsonTraceExporter } from '@opentelemetry/exporter-trace-otlp-http';
import { OTLPTraceExporter as OTLPHttpProtoTraceExporter } from '@opentelemetry/exporter-trace-otlp-proto';
import { NodeSDK } from '@opentelemetry/sdk-node';
import { BatchSpanProcessor, ReadableSpan, SpanExporter } from '@opentelemetry/sdk-trace-base';
import { ConsoleSpanExporter } from '@opentelemetry/sdk-trace-node';
import { LangfuseExporter } from 'langfuse-vercel';
import { z } from 'zod';

const schema = z.object({
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().optional(),
OTEL_EXPORTER_OTLP_PROTOCOL: z.enum(['http/json', 'http/protobuf', 'grpc']).optional(),
OTEL_EXPORTER_OTLP_HEADERS: z.record(z.string(), z.string()).optional(),
OTEL_EXPORTER_OTLP_KEY: z.string().optional(),
OTEL_SERVICE_NAME: z.string().default('xata-agent'),
OTEL_DEBUG: z.string(z.enum(['true', 'false'])).default('false'),

LANGFUSE_HOST: z.string().optional(),
LANGFUSE_PUBLIC_KEY: z.string().optional(),
LANGFUSE_SECRET_KEY: z.string().optional(),
LANGFUSE_DEBUG: z.string(z.enum(['true', 'false'])).default('false')
});

/* eslint-disable no-process-env */
const env = schema.parse(process.env);

function otelExporter(
protocol: 'http/json' | 'http/protobuf' | 'grpc',
config: {
url: string;
headers?: Record<string, string>;
}
): SpanExporter {
switch (protocol) {
case 'http/json':
return new OTLPHttpJsonTraceExporter(config);
case 'http/protobuf':
return new OTLPHttpProtoTraceExporter(config);
case 'grpc':
return new OTLPGrpcTraceExporter(config);
default:
throw new Error(`Unsupported protocol: ${protocol}`);
}
}

function createExporter(): SpanExporter {
const exporters: SpanExporter[] = [];

const level = env.OTEL_DEBUG === 'true' ? DiagLogLevel.DEBUG : DiagLogLevel.ERROR;
diag.setLogger(new DiagConsoleLogger(), level); // set diaglog level to DEBUG when debugging

if (env.OTEL_EXPORTER_OTLP_ENDPOINT) {
let headers: Record<string, string> | undefined = env.OTEL_EXPORTER_OTLP_HEADERS;
if (env.OTEL_EXPORTER_OTLP_KEY) {
if (!headers) {
headers = {};
}
headers['Authorization'] = `Bearer ${env.OTEL_EXPORTER_OTLP_KEY}`;
}

console.log('OTEL exporter is enabled');
exporters.push(
otelExporter(env.OTEL_EXPORTER_OTLP_PROTOCOL || 'http/json', {
url: env.OTEL_EXPORTER_OTLP_ENDPOINT,
headers
})
);
}

if (env.LANGFUSE_HOST && env.LANGFUSE_PUBLIC_KEY && env.LANGFUSE_SECRET_KEY) {
console.log('Langfuse exporter is enabled');
exporters.push(
new LangfuseExporter({
baseUrl: env.LANGFUSE_HOST,
publicKey: env.LANGFUSE_PUBLIC_KEY,
secretKey: env.LANGFUSE_SECRET_KEY,
debug: env.LANGFUSE_DEBUG === 'true'
})
);
}

if (exporters.length === 0) {
return new ConsoleSpanExporter();
}
if (exporters.length === 1) {
return exporters[0] as SpanExporter;
}

return {
export(spans: ReadableSpan[], resultCallback: (result: ExportResult) => void): void {
for (const exporter of exporters) {
exporter.export(spans, resultCallback);
}
console.log('Exported spans', spans.length);
},

shutdown(): Promise<void> {
return Promise.all(exporters.map((exporter) => exporter.shutdown())).then(() => undefined);
},

forceFlush(): Promise<void> {
return Promise.all(exporters.map((exporter) => exporter.forceFlush?.())).then(() => undefined);
}
} as SpanExporter;
}

console.log('Initializing OTel SDK');
const exporter = createExporter();
const contextManager = new AsyncLocalStorageContextManager();
contextManager.enable();

const sdk = new NodeSDK({
contextManager,
traceExporter: exporter,
spanProcessor: new BatchSpanProcessor(exporter),
instrumentations: [getNodeAutoInstrumentations()]
});

console.log('Starting OTel SDK');
sdk.start();

process.on('SIGTERM', () => {
sdk
.shutdown()
.then(
() => console.log('OTel shutdown complete'),
(error) => console.error('OTel shutdown error', error)
)
.finally(() => process.exit(0));
});
6 changes: 6 additions & 0 deletions apps/dbagent/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export async function register() {
/* eslint-disable no-process-env */
if (process.env.NEXT_RUNTIME === 'nodejs') {
await import('./instrumentation-node.ts');
}
}
18 changes: 18 additions & 0 deletions apps/dbagent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@
"@mastra/core": "^0.9.3",
"@mastra/evals": "^0.1.22",
"@modelcontextprotocol/sdk": "^1.11.1",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/api-logs": "^0.200.0",
"@opentelemetry/auto-instrumentations-node": "^0.58.0",
"@opentelemetry/context-async-hooks": "^2.0.0",
"@opentelemetry/core": "^2.0.0",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.200.0",
"@opentelemetry/exporter-trace-otlp-http": "^0.200.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.200.0",
"@opentelemetry/instrumentation": "^0.200.0",
"@opentelemetry/resources": "^2.0.0",
"@opentelemetry/sdk-logs": "^0.200.0",
"@opentelemetry/sdk-node": "^0.200.0",
"@opentelemetry/sdk-trace-base": "^2.0.0",
"@opentelemetry/sdk-trace-node": "^2.0.0",
"@opentelemetry/semantic-conventions": "^1.32.0",
"@tailwindcss/postcss": "^4.1.6",
"@tanstack/react-query": "^5.75.7",
"@vercel/functions": "^2.0.3",
Expand All @@ -55,7 +70,9 @@
"fast-deep-equal": "^3.1.3",
"framer-motion": "^12.10.5",
"geist": "^1.4.2",
"import-in-the-middle": "^1.13.1",
"kysely": "^0.28.2",
"langfuse-vercel": "^3.37.2",
"litellm-api": "^0.0.3",
"lucide-react": "^0.509.0",
"next": "^15.3.2",
Expand All @@ -79,6 +96,7 @@
"react-markdown": "^10.1.0",
"react-syntax-highlighter": "^15.6.1",
"remark-gfm": "^4.0.1",
"require-in-the-middle": "^7.5.2",
"server-only": "^0.0.1",
"tailwind-merge": "^3.3.0",
"tailwindcss-animate": "^1.0.7",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ import { getUserSessionDBAccess } from '~/lib/db/db';
export async function generateTitleFromUserMessage({ message }: { message: Message }) {
const { text: title } = await generateText({
model: await getModelInstance('title'),
experimental_telemetry: {
isEnabled: true,
metadata: {
tags: ['internal', 'chat', 'title']
}
},
system: `\n
- you will generate a short title based on the first message a user begins a conversation with
- ensure it is not more than 80 characters long
Expand Down
12 changes: 12 additions & 0 deletions apps/dbagent/src/app/api/chat/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ export async function POST(request: Request) {
toolCallStreaming: true,
experimental_transform: smoothStream({ chunking: 'word' }),
experimental_generateMessageId: generateUUID,
experimental_telemetry: {
isEnabled: true,
metadata: {
projectId: connection.projectId,
connectionId: connectionId,
sessionId: id,
model: model.info().id,
userId,
cloudProvider: project.cloudProvider,
tags: ['chat']
}
},
tools,
onFinish: async ({ response }) => {
try {
Expand Down
Loading
Loading