Skip to content

Commit 00e4f93

Browse files
authored
Merge pull request #157 from import-ai/feature/task_management
Feature/task management
2 parents 6bb79fc + 16cf74d commit 00e4f93

File tree

8 files changed

+170
-27
lines changed

8 files changed

+170
-27
lines changed

src/namespaces/namespaces.e2e-spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ describe('NamespacesController (e2e)', () => {
444444

445445
it('should handle concurrent namespace operations', async () => {
446446
// Create multiple namespaces concurrently
447-
const createPromises = Array.from({ length: 5 }, (_, i) =>
447+
const createPromises = Array.from({ length: 2 }, (_, i) =>
448448
client
449449
.post('/api/v1/namespaces')
450450
.send({ name: `Concurrent Test Workspace ${i}` }),

src/tasks/dto/task.dto.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import { Task } from 'omniboxd/tasks/tasks.entity';
2+
import { isEmpty } from 'omniboxd/utils/is-empty';
3+
4+
function getTaskStatus(task: Task): string {
5+
if (task.canceledAt) {
6+
return 'canceled';
7+
}
8+
if (!isEmpty(task.exception)) {
9+
return 'error';
10+
}
11+
if (task.endedAt) {
12+
return 'finished';
13+
}
14+
if (task.startedAt) {
15+
return 'running';
16+
}
17+
return 'pending';
18+
}
19+
20+
export class TaskDto {
21+
id: string;
22+
namespace_id: string;
23+
user_id: string;
24+
priority: number;
25+
function: string;
26+
input: Record<string, any>;
27+
payload: Record<string, any> | null;
28+
output: Record<string, any> | null;
29+
exception: Record<string, any> | null;
30+
status: string;
31+
created_at: string;
32+
updated_at: string;
33+
started_at: string | null;
34+
ended_at: string | null;
35+
canceled_at: string | null;
36+
37+
static fromEntity(task: Task): TaskDto {
38+
const dto = new TaskDto();
39+
dto.id = task.id;
40+
dto.namespace_id = task.namespaceId;
41+
dto.user_id = task.userId;
42+
dto.priority = task.priority;
43+
dto.function = task.function;
44+
dto.input = task.input;
45+
dto.payload = task.payload;
46+
dto.output = task.output;
47+
dto.exception = task.exception;
48+
dto.status = getTaskStatus(task);
49+
dto.created_at = task.createdAt.toISOString();
50+
dto.updated_at = task.updatedAt.toISOString();
51+
dto.started_at = task.startedAt?.toISOString() || null;
52+
dto.ended_at = task.endedAt?.toISOString() || null;
53+
dto.canceled_at = task.canceledAt?.toISOString() || null;
54+
return dto;
55+
}
56+
}

src/tasks/task-pipeline.e2e-spec.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { HttpStatus } from '@nestjs/common';
33
import { Task } from 'omniboxd/tasks/tasks.entity';
44
import { TaskCallbackDto } from 'omniboxd/wizard/dto/task-callback.dto';
55
import { isEmpty } from 'omniboxd/utils/is-empty';
6+
import { TaskDto } from 'omniboxd/tasks/dto/task.dto';
67

78
/**
89
* Mock wizard worker that simulates the wizard worker service behavior
@@ -46,8 +47,10 @@ class MockWizardWorker {
4647
async pollOnce(): Promise<void> {
4748
const task = await this.fetchTask();
4849
if (task) {
49-
const result = this.processTask(task);
50-
await this.sendCallback(task.id, result);
50+
if (task.namespace_id === this.client.namespace.id) {
51+
const result = this.processTask(task);
52+
await this.sendCallback(task.id, result);
53+
}
5154
}
5255
}
5356

@@ -77,7 +80,7 @@ class MockWizardWorker {
7780
/**
7881
* Fetches a task from the backend (simulates wizard worker fetching)
7982
*/
80-
private async fetchTask(): Promise<Task | null> {
83+
private async fetchTask(): Promise<TaskDto | null> {
8184
try {
8285
const response = await this.makeRequest()
8386
.get('/internal/api/v1/wizard/task')
@@ -91,7 +94,7 @@ class MockWizardWorker {
9194
throw new Error(`Failed to fetch task: ${response.status}`);
9295
}
9396

94-
return response.body as Task;
97+
return response.body as TaskDto;
9598
} catch (error) {
9699
if (error.code === 'ECONNRESET' || error.timeout) {
97100
console.warn('Connection issue when fetching task, retrying...');
@@ -104,7 +107,7 @@ class MockWizardWorker {
104107
/**
105108
* Processes a task based on its function type
106109
*/
107-
private processTask(task: Task): { output?: any; exception?: string } {
110+
private processTask(task: TaskDto): { output?: any; exception?: string } {
108111
try {
109112
switch (task.function) {
110113
case 'collect':
@@ -130,7 +133,7 @@ class MockWizardWorker {
130133
/**
131134
* Simulates collect task processing
132135
*/
133-
private processCollectTask(task: Task): { output: any } {
136+
private processCollectTask(task: TaskDto): { output: any } {
134137
const input = task.input as { html: string; url: string; title?: string };
135138

136139
return {
@@ -145,7 +148,7 @@ class MockWizardWorker {
145148
/**
146149
* Simulates extract_tags task processing
147150
*/
148-
private processExtractTagsTask(task: Task): { output: any } {
151+
private processExtractTagsTask(task: TaskDto): { output: any } {
149152
console.log({ taskId: task.id, function: 'extractTags' });
150153
return {
151154
output: {
@@ -157,7 +160,7 @@ class MockWizardWorker {
157160
/**
158161
* Simulates generate_title task processing
159162
*/
160-
private processGenerateTitleTask(task: Task): { output: any } {
163+
private processGenerateTitleTask(task: TaskDto): { output: any } {
161164
console.log({ taskId: task.id, function: 'generateTitle' });
162165
return {
163166
output: {
@@ -170,7 +173,7 @@ class MockWizardWorker {
170173
/**
171174
* Simulates file_reader task processing
172175
*/
173-
private processFileReaderTask(task: Task): { output: any } {
176+
private processFileReaderTask(task: TaskDto): { output: any } {
174177
const input = task.input as {
175178
title: string;
176179
original_name?: string;
@@ -194,7 +197,7 @@ class MockWizardWorker {
194197
/**
195198
* Simulates upsert_index task processing
196199
*/
197-
private processUpsertIndexTask(task: Task): { output: any } {
200+
private processUpsertIndexTask(task: TaskDto): { output: any } {
198201
console.log({ taskId: task.id, function: 'upsertIndex' });
199202
return {
200203
output: {

src/tasks/tasks.controller.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
Delete,
77
Get,
88
Param,
9+
Patch,
910
Post,
1011
Query,
1112
} from '@nestjs/common';
@@ -21,11 +22,11 @@ export class TasksController {
2122

2223
@Get()
2324
async listTasks(
24-
@Param('namespace') namespace: string,
25+
@Param('namespaceId') namespaceId: string,
2526
@Query('offset') offset: number = 0,
2627
@Query('limit') limit: number = 10,
2728
) {
28-
return await this.tasksService.list(namespace, offset, limit);
29+
return await this.tasksService.list(namespaceId, offset, limit);
2930
}
3031

3132
@Get(':id')
@@ -38,4 +39,14 @@ export class TasksController {
3839
await this.tasksService.delete(id);
3940
return { detail: 'Task deleted' };
4041
}
42+
43+
@Patch(':id/cancel')
44+
async cancelTask(@Param('id') id: string) {
45+
return await this.tasksService.cancelTask(id);
46+
}
47+
48+
@Post(':id/rerun')
49+
async rerunTask(@Param('id') id: string) {
50+
return await this.tasksService.rerunTask(id);
51+
}
4152
}

src/tasks/tasks.service.ts

Lines changed: 50 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import { Repository } from 'typeorm';
22
import { Task } from 'omniboxd/tasks/tasks.entity';
33
import { InjectRepository } from '@nestjs/typeorm';
4-
import { Injectable, NotFoundException } from '@nestjs/common';
4+
import {
5+
BadRequestException,
6+
Injectable,
7+
NotFoundException,
8+
} from '@nestjs/common';
9+
import { TaskDto } from './dto/task.dto';
510

611
@Injectable()
712
export class TasksService {
@@ -15,12 +20,19 @@ export class TasksService {
1520
return await this.taskRepository.save(newTask);
1621
}
1722

18-
async list(namespaceId: string, offset: number, limit: number) {
19-
return this.taskRepository.find({
23+
async list(
24+
namespaceId: string,
25+
offset: number,
26+
limit: number,
27+
): Promise<TaskDto[]> {
28+
const tasks = await this.taskRepository.find({
2029
where: { namespaceId },
2130
skip: offset,
2231
take: limit,
32+
order: { createdAt: 'DESC' },
2333
});
34+
35+
return tasks.map((task) => TaskDto.fromEntity(task));
2436
}
2537

2638
async get(id: string) {
@@ -42,4 +54,39 @@ export class TasksService {
4254
}
4355
await this.taskRepository.softRemove(task);
4456
}
57+
58+
async cancelTask(id: string): Promise<TaskDto> {
59+
const task = await this.get(id);
60+
61+
if (task.canceledAt) {
62+
throw new BadRequestException('Task is already canceled');
63+
}
64+
if (task.endedAt) {
65+
throw new BadRequestException('Cannot cancel a finished task');
66+
}
67+
68+
task.canceledAt = new Date();
69+
const updatedTask = await this.taskRepository.save(task);
70+
71+
return TaskDto.fromEntity(updatedTask);
72+
}
73+
74+
async rerunTask(id: string): Promise<TaskDto> {
75+
const originalTask = await this.get(id);
76+
77+
if (!originalTask.canceledAt) {
78+
throw new BadRequestException('Can only rerun canceled tasks');
79+
}
80+
81+
const newTask = await this.create({
82+
namespaceId: originalTask.namespaceId,
83+
userId: originalTask.userId,
84+
priority: originalTask.priority,
85+
function: originalTask.function,
86+
input: originalTask.input,
87+
payload: originalTask.payload,
88+
});
89+
90+
return TaskDto.fromEntity(newTask);
91+
}
4592
}

src/wizard/processors/reader.processor.spec.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ describe('ReaderProcessor', () => {
2424
update: jest.fn(),
2525
};
2626

27-
2827
const module: TestingModule = await Test.createTestingModule({
2928
providers: [
3029
{
@@ -215,10 +214,11 @@ describe('ReaderProcessor', () => {
215214

216215
const result = await processor.process(task);
217216

218-
expect(task.output!.markdown).toBe('![Image](attachments/attachment-id)');
217+
expect(task.output!.markdown).toBe(
218+
'![Image](attachments/attachment-id)',
219+
);
219220
expect(result).toEqual({ resourceId: 'test-resource-id' });
220221
});
221-
222222
});
223223

224224
describe('inheritance from CollectProcessor', () => {
@@ -295,7 +295,9 @@ describe('ReaderProcessor', () => {
295295
await processor.process(task);
296296

297297
// Markdown should remain unchanged since no image links found
298-
expect(task.output!.markdown).toBe('# Test Document\n\nNo images here.');
298+
expect(task.output!.markdown).toBe(
299+
'# Test Document\n\nNo images here.',
300+
);
299301
expect(task.output!.images).toBeUndefined();
300302
});
301303

src/wizard/wizard.service.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { ResourceType } from 'omniboxd/resources/resources.entity';
2020
import { AttachmentsService } from 'omniboxd/attachments/attachments.service';
2121
import { WizardTaskService } from 'omniboxd/tasks/wizard-task.service';
2222
import { Image, ProcessedImage } from 'omniboxd/wizard/types/wizard.types';
23+
import { TaskDto } from 'omniboxd/tasks/dto/task.dto';
2324

2425
@Injectable()
2526
export class WizardService {
@@ -108,6 +109,11 @@ export class WizardService {
108109
const wait: number = task.startedAt.getTime() - task.createdAt.getTime();
109110
this.logger.debug({ taskId: task.id, cost, wait });
110111

112+
if (task.canceledAt) {
113+
this.logger.warn(`Task ${task.id} was canceled.`);
114+
return { taskId: task.id, function: task.function, status: 'canceled' };
115+
}
116+
111117
const postprocessResult = await this.postprocess(task);
112118

113119
return { taskId: task.id, function: task.function, ...postprocessResult };
@@ -223,7 +229,7 @@ export class WizardService {
223229
task.output.images = processedImages;
224230
}
225231

226-
async fetchTask(): Promise<Task | null> {
232+
async fetchTask(): Promise<TaskDto | null> {
227233
const rawQuery = `
228234
WITH running_tasks_sub_query AS (SELECT namespace_id,
229235
COUNT(id) AS running_count
@@ -257,14 +263,17 @@ export class WizardService {
257263
await this.wizardTaskService.taskRepository.query(rawQuery);
258264

259265
if (queryResult.length > 0) {
266+
const record = queryResult[0];
260267
const task = this.wizardTaskService.taskRepository.create({
261-
...(queryResult[0] as Task),
268+
...(record as Task),
269+
createdAt: record.created_at,
270+
updatedAt: record.updated_at,
262271
startedAt: new Date(),
263-
userId: queryResult[0].user_id,
264-
namespaceId: queryResult[0].namespace_id,
272+
userId: record.user_id,
273+
namespaceId: record.namespace_id,
265274
});
266-
await this.wizardTaskService.taskRepository.save(task);
267-
return task;
275+
const newTask = await this.wizardTaskService.taskRepository.save(task);
276+
return TaskDto.fromEntity(newTask);
268277
}
269278

270279
return null;

test/jest-e2e-setup.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { GenericContainer, StartedTestContainer } from 'testcontainers';
1+
import { GenericContainer, StartedTestContainer, Wait } from 'testcontainers';
22

33
let postgresContainer: StartedTestContainer;
44
let minioContainer: StartedTestContainer;
@@ -13,6 +13,14 @@ export default async () => {
1313
POSTGRES_USER: 'omnibox',
1414
POSTGRES_PASSWORD: 'omnibox',
1515
})
16+
.withHealthCheck({
17+
test: ['CMD', 'pg_isready', '-q', '-d', 'omnibox', '-U', 'omnibox'],
18+
interval: 30000,
19+
timeout: 3000,
20+
retries: 5,
21+
startPeriod: 5000,
22+
})
23+
.withWaitStrategy(Wait.forHealthCheck())
1624
.start();
1725
console.log('PostgreSQL container started');
1826

@@ -25,6 +33,13 @@ export default async () => {
2533
MINIO_ROOT_PASSWORD: 'minioadmin',
2634
})
2735
.withCommand(['server', '/data'])
36+
.withHealthCheck({
37+
test: ['CMD', 'curl', '-I', 'http://127.0.0.1:9000/minio/health/live'],
38+
interval: 5000,
39+
timeout: 3000,
40+
retries: 5,
41+
})
42+
.withWaitStrategy(Wait.forHealthCheck())
2843
.start();
2944
console.log('MinIO container started');
3045

0 commit comments

Comments
 (0)