Skip to content

Commit 6bb79fc

Browse files
authored
Merge pull request #156 from import-ai/fix/callback
Fix/callback
2 parents b6c4fca + 852bb7c commit 6bb79fc

12 files changed

+307
-158
lines changed

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "omniboxd",
3-
"version": "0.1.3",
3+
"version": "0.1.4",
44
"private": true,
55
"scripts": {
66
"build": "nest build",

src/migrations/1755499552000-update-attachment-urls.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,13 @@ export class UpdateAttachmentUrls1755499552000 implements MigrationInterface {
88
while (true) {
99
const resources = await queryRunner.query(
1010
`
11-
SELECT id, content
12-
FROM resources
13-
WHERE content != '' AND deleted_at IS NULL
14-
ORDER BY id
15-
LIMIT $1 OFFSET $2
16-
`,
11+
SELECT id, content
12+
FROM resources
13+
WHERE content != '' AND deleted_at IS NULL
14+
ORDER BY id
15+
LIMIT $1
16+
OFFSET $2
17+
`,
1718
[batchSize, offset],
1819
);
1920

src/migrations/1755504936756-scan-resource-attachments.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,13 @@ export class ScanResourceAttachments1755504936756
3838
// Check if this resource-attachment relation already exists
3939
const existingRelation = await queryRunner.query(
4040
`
41-
SELECT id FROM resource_attachments
42-
WHERE namespace_id = $1 AND resource_id = $2 AND attachment_id = $3 AND deleted_at IS NULL
43-
`,
41+
SELECT id
42+
FROM resource_attachments
43+
WHERE namespace_id = $1
44+
AND resource_id = $2
45+
AND attachment_id = $3
46+
AND deleted_at IS NULL
47+
`,
4448
[resource.namespace_id, resource.id, attachmentId],
4549
);
4650

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import { Injectable, Logger } from '@nestjs/common';
2+
import { MinioService } from 'omniboxd/minio/minio.service';
3+
import { ConfigService } from '@nestjs/config';
4+
5+
@Injectable()
6+
export class ChunkManagerService {
7+
private readonly logger = new Logger(ChunkManagerService.name);
8+
private readonly cleanupDelay: number;
9+
10+
constructor(
11+
private readonly minioService: MinioService,
12+
private readonly configService: ConfigService,
13+
) {
14+
this.cleanupDelay =
15+
parseInt(this.configService.get('OBB_CHUNK_CLEANUP_DELAY', '60'), 10) *
16+
1000; // Convert to milliseconds
17+
}
18+
19+
async storeChunk(
20+
taskId: string,
21+
chunkIndex: number,
22+
totalChunks: number,
23+
data: string,
24+
): Promise<void> {
25+
const chunkPath = this.getChunkPath(taskId, chunkIndex);
26+
const buffer = Buffer.from(data, 'base64');
27+
28+
try {
29+
await this.minioService.putChunkObject(chunkPath, buffer, buffer.length);
30+
this.logger.debug(
31+
`Stored chunk ${chunkIndex + 1}/${totalChunks} for task ${taskId}`,
32+
);
33+
} catch (error) {
34+
this.logger.error(
35+
`Failed to store chunk ${chunkIndex} for task ${taskId}:`,
36+
error,
37+
);
38+
throw error;
39+
}
40+
}
41+
42+
async assembleChunks(taskId: string, totalChunks: number): Promise<string> {
43+
const assembledPath = this.getAssembledPath(taskId);
44+
const chunkPaths = Array.from({ length: totalChunks }, (_, i) =>
45+
this.getChunkPath(taskId, i),
46+
);
47+
48+
try {
49+
// Use MinIO's composeObject to merge all chunks
50+
await this.minioService.composeObject(assembledPath, chunkPaths);
51+
52+
// Retrieve the assembled data
53+
const stream = await this.minioService.getObject(assembledPath);
54+
const chunks: Buffer[] = [];
55+
56+
return new Promise((resolve, reject) => {
57+
stream.on('data', (chunk) => chunks.push(chunk));
58+
stream.on('end', () => {
59+
const assembledBuffer = Buffer.concat(chunks);
60+
const assembledData = assembledBuffer.toString('utf-8');
61+
resolve(assembledData);
62+
});
63+
stream.on('error', reject);
64+
});
65+
} catch (error) {
66+
this.logger.error(`Failed to assemble chunks for task ${taskId}:`, error);
67+
throw error;
68+
}
69+
}
70+
71+
cleanupChunks(taskId: string, totalChunks: number): void {
72+
// Schedule cleanup after delay to allow for any final processing
73+
setTimeout(() => {
74+
this.performCleanup(taskId, totalChunks).catch((error) => {
75+
this.logger.error(
76+
`Failed to cleanup chunks for task ${taskId}:`,
77+
error,
78+
);
79+
});
80+
}, this.cleanupDelay);
81+
}
82+
83+
private async performCleanup(
84+
taskId: string,
85+
totalChunks: number,
86+
): Promise<void> {
87+
try {
88+
const objectsToRemove: string[] = [];
89+
90+
// Add chunk paths
91+
for (let i = 0; i < totalChunks; i++) {
92+
objectsToRemove.push(this.getChunkPath(taskId, i));
93+
}
94+
95+
// Add assembled path
96+
objectsToRemove.push(this.getAssembledPath(taskId));
97+
98+
// Remove all objects
99+
await Promise.all(
100+
objectsToRemove.map((objectName) =>
101+
this.minioService.removeObject(objectName).catch((error) => {
102+
this.logger.warn(`Failed to remove object ${objectName}:`, error);
103+
}),
104+
),
105+
);
106+
107+
this.logger.debug(`Cleaned up chunks for task ${taskId}`);
108+
} catch (error) {
109+
this.logger.error(`Failed to cleanup chunks for task ${taskId}:`, error);
110+
}
111+
}
112+
113+
private getChunkPath(taskId: string, chunkIndex: number): string {
114+
return `wizard-chunks/${taskId}/chunk-${chunkIndex.toString().padStart(6, '0')}`;
115+
}
116+
117+
private getAssembledPath(taskId: string): string {
118+
return `wizard-chunks/${taskId}/assembled`;
119+
}
120+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import {
2+
IsBoolean,
3+
IsNotEmpty,
4+
IsNumber,
5+
IsString,
6+
Min,
7+
} from 'class-validator';
8+
9+
export class ChunkCallbackDto {
10+
@IsString()
11+
@IsNotEmpty()
12+
id: string;
13+
14+
@IsNumber()
15+
@Min(0)
16+
chunk_index: number;
17+
18+
@IsNumber()
19+
@Min(1)
20+
total_chunks: number;
21+
22+
@IsString()
23+
@IsNotEmpty()
24+
data: string;
25+
26+
@IsBoolean()
27+
is_final_chunk: boolean;
28+
}

src/wizard/internal.wizard.controller.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,16 @@ import { WizardService } from 'omniboxd/wizard/wizard.service';
33
import { Public } from 'omniboxd/auth/decorators/public.auth.decorator';
44
import { transformKeysToSnakeCase } from 'omniboxd/interceptor/utils';
55
import { TaskCallbackDto } from 'omniboxd/wizard/dto/task-callback.dto';
6+
import { ChunkCallbackDto } from 'omniboxd/wizard/dto/chunk-callback.dto';
7+
import { ChunkManagerService } from 'omniboxd/wizard/chunk-manager.service';
68
import { Body, Controller, Get, Post, Res } from '@nestjs/common';
79

810
@Controller('internal/api/v1/wizard')
911
export class InternalWizardController {
10-
constructor(private readonly wizardService: WizardService) {}
12+
constructor(
13+
private readonly wizardService: WizardService,
14+
private readonly chunkManagerService: ChunkManagerService,
15+
) {}
1116

1217
@Public()
1318
@Get('/task')
@@ -23,4 +28,46 @@ export class InternalWizardController {
2328
): Promise<Record<string, any>> {
2429
return await this.wizardService.taskDoneCallback(taskCallback);
2530
}
31+
32+
@Public()
33+
@Post('/callback/chunk')
34+
async handleChunkCallback(
35+
@Body() chunkCallback: ChunkCallbackDto,
36+
): Promise<Record<string, any>> {
37+
const { id, chunk_index, total_chunks, data, is_final_chunk } =
38+
chunkCallback;
39+
40+
// Store the chunk
41+
await this.chunkManagerService.storeChunk(
42+
id,
43+
chunk_index,
44+
total_chunks,
45+
data,
46+
);
47+
48+
if (is_final_chunk) {
49+
try {
50+
// Assemble all chunks
51+
const assembledData = await this.chunkManagerService.assembleChunks(
52+
id,
53+
total_chunks,
54+
);
55+
56+
// Parse the assembled data and call the regular callback
57+
const taskCallback: TaskCallbackDto = JSON.parse(assembledData);
58+
const result = await this.wizardService.taskDoneCallback(taskCallback);
59+
60+
// Schedule cleanup
61+
this.chunkManagerService.cleanupChunks(id, total_chunks);
62+
63+
return result;
64+
} catch (error) {
65+
// Clean up on error
66+
this.chunkManagerService.cleanupChunks(id, total_chunks);
67+
throw error;
68+
}
69+
}
70+
71+
return { message: 'Chunk received', chunk_index, total_chunks };
72+
}
2673
}

0 commit comments

Comments
 (0)