Skip to content

Commit d8f31f3

Browse files
authored
Merge pull request #165 from import-ai/feat/trace
feat(trace): add trace module
2 parents 02a371d + c3bcc00 commit d8f31f3

File tree

9 files changed

+163
-0
lines changed

9 files changed

+163
-0
lines changed

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
"cookie-parser": "^1.4.7",
7878
"express": "^5.1.0",
7979
"handlebars": "^4.7.8",
80+
"kafkajs": "^2.2.4",
8081
"lodash": "^4.17.21",
8182
"meilisearch": "^0.50.0",
8283
"minio": "^8.0.5",

pnpm-lock.yaml

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/app/app.module.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import { UpdateAttachmentUrls1755499552000 } from 'omniboxd/migrations/175549955
4747
import { ScanResourceAttachments1755504936756 } from 'omniboxd/migrations/1755504936756-scan-resource-attachments';
4848
import { SharesAllResources1754471311959 } from 'omniboxd/migrations/1754471311959-shares-all-resources';
4949
import { ResourcesModule } from 'omniboxd/resources/resources.module';
50+
import { TraceModule } from 'omniboxd/trace/trace.module';
5051

5152
@Module({})
5253
export class AppModule implements NestModule {
@@ -93,6 +94,7 @@ export class AppModule implements NestModule {
9394
AttachmentsModule,
9495
SharesModule,
9596
SharedResourcesModule,
97+
TraceModule,
9698
// CacheModule.registerAsync({
9799
// imports: [ConfigModule],
98100
// inject: [ConfigService],

src/trace/dto/trace-event.dto.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { IsNotEmpty, IsString, IsObject } from 'class-validator';
2+
3+
export class TraceEventDto {
4+
@IsString()
5+
@IsNotEmpty()
6+
name: string;
7+
8+
@IsObject()
9+
props: Record<string, any>;
10+
}

src/trace/dto/trace-message.dto.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import { Expose } from 'class-transformer';
2+
import { TraceEventDto } from './trace-event.dto';
3+
4+
export class TraceMessageDto {
5+
@Expose()
6+
timestamp: number;
7+
8+
@Expose({ name: 'event_name' })
9+
eventName: string;
10+
11+
@Expose({ name: 'event_props' })
12+
eventProps?: string;
13+
14+
@Expose({ name: 'user_id' })
15+
userId?: string;
16+
17+
@Expose({ name: 'user_agent' })
18+
userAgent?: string;
19+
20+
static fromEvent(event: TraceEventDto, userId?: string, userAgent?: string) {
21+
const message = new TraceMessageDto();
22+
message.timestamp = Date.now();
23+
message.eventName = event.name;
24+
message.eventProps = JSON.stringify(event.props);
25+
message.userId = userId;
26+
message.userAgent = userAgent;
27+
return message;
28+
}
29+
}

src/trace/dto/trace-req.dto.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import { IsArray, ValidateNested } from 'class-validator';
2+
import { Type } from 'class-transformer';
3+
import { TraceEventDto } from 'omniboxd/trace/dto/trace-event.dto';
4+
5+
export class TraceReqDto {
6+
@IsArray()
7+
@ValidateNested({ each: true })
8+
@Type(() => TraceEventDto)
9+
events: TraceEventDto[];
10+
}

src/trace/trace.controller.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import { Body, Controller, Post, Req } from '@nestjs/common';
2+
import { TraceService } from 'omniboxd/trace/trace.service';
3+
import { TraceReqDto } from './dto/trace-req.dto';
4+
import { CookieAuth } from 'omniboxd/auth/decorators';
5+
import { UserId } from 'omniboxd/decorators/user-id.decorator';
6+
7+
@Controller('api/v1/trace')
8+
export class TraceController {
9+
constructor(private traceService: TraceService) {}
10+
11+
@Post()
12+
@CookieAuth({ onAuthFail: 'continue' })
13+
async trace(
14+
@Req() req: Request,
15+
@Body() traceReq: TraceReqDto,
16+
@UserId({ optional: true }) userId?: string,
17+
): Promise<void> {
18+
const userAgent = req.headers['user-agent'];
19+
await this.traceService.emitTraceEvents(traceReq.events, userId, userAgent);
20+
}
21+
}

src/trace/trace.module.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import { Module } from '@nestjs/common';
2+
import { ConfigModule } from '@nestjs/config';
3+
import { TraceController } from 'omniboxd/trace/trace.controller';
4+
import { TraceService } from 'omniboxd/trace/trace.service';
5+
6+
@Module({
7+
imports: [ConfigModule],
8+
controllers: [TraceController],
9+
providers: [TraceService],
10+
exports: [TraceService],
11+
})
12+
export class TraceModule {}

src/trace/trace.service.ts

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import {
2+
Injectable,
3+
Logger,
4+
OnModuleDestroy,
5+
OnModuleInit,
6+
} from '@nestjs/common';
7+
import { ConfigService } from '@nestjs/config';
8+
import { Kafka, Producer } from 'kafkajs';
9+
import { TraceEventDto } from './dto/trace-event.dto';
10+
import { instanceToPlain } from 'class-transformer';
11+
import { TraceMessageDto } from './dto/trace-message.dto';
12+
13+
@Injectable()
14+
export class TraceService implements OnModuleInit, OnModuleDestroy {
15+
private readonly logger = new Logger(TraceService.name);
16+
private readonly topic?: string;
17+
private readonly kafka?: Kafka;
18+
private readonly producer?: Producer;
19+
20+
constructor(private readonly configService: ConfigService) {
21+
const brokerUrl = this.configService.get<string>('OBB_KAFKA_BROKER');
22+
const topic = this.configService.get<string>('OBB_KAFKA_TOPIC');
23+
const clientId = this.configService.get<string>('OBB_KAFKA_CLIENT_ID');
24+
if (!brokerUrl || !topic || !clientId) {
25+
return;
26+
}
27+
const brokers = brokerUrl.split(',');
28+
this.topic = topic;
29+
this.kafka = new Kafka({
30+
clientId,
31+
brokers,
32+
});
33+
this.producer = this.kafka.producer();
34+
}
35+
36+
async onModuleInit() {
37+
if (this.producer) {
38+
await this.producer.connect();
39+
this.logger.log('Kafka producer connected successfully');
40+
}
41+
}
42+
43+
async onModuleDestroy() {
44+
if (this.producer) {
45+
await this.producer.disconnect();
46+
this.logger.log('Kafka producer disconnected');
47+
}
48+
}
49+
50+
async emitTraceEvents(
51+
events: TraceEventDto[],
52+
userId?: string,
53+
userAgent?: string,
54+
): Promise<void> {
55+
if (!this.producer || !this.topic) {
56+
return;
57+
}
58+
const messages = events.map((event) => {
59+
const dto = TraceMessageDto.fromEvent(event, userId, userAgent);
60+
return {
61+
value: JSON.stringify(instanceToPlain(dto)),
62+
};
63+
});
64+
await this.producer.send({
65+
topic: this.topic,
66+
messages,
67+
});
68+
}
69+
}

0 commit comments

Comments
 (0)