Skip to content

Commit 4323a4a

Browse files
authored
Merge pull request #162 from import-ai/chore/tasks
Chore/tasks
2 parents 8e82fa1 + 78bdc9f commit 4323a4a

File tree

8 files changed

+161
-62
lines changed

8 files changed

+161
-62
lines changed

src/tasks/dto/task.dto.ts

Lines changed: 64 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,87 @@ function getTaskStatus(task: Task): string {
1717
return 'pending';
1818
}
1919

20-
export class TaskDto {
20+
class TaskDtoBase {
2121
id: string;
2222
namespace_id: string;
2323
user_id: string;
2424
priority: number;
2525
function: string;
26-
input: Record<string, any>;
27-
payload: Record<string, any> | null;
28-
output: Record<string, any> | null;
29-
exception: Record<string, any> | null;
3026
status: string;
3127
created_at: string;
3228
updated_at: string;
3329
started_at: string | null;
3430
ended_at: string | null;
3531
canceled_at: string | null;
3632

33+
protected static setValue(obj: TaskDtoBase, task: Task) {
34+
obj.id = task.id;
35+
obj.namespace_id = task.namespaceId;
36+
obj.user_id = task.userId;
37+
obj.priority = task.priority;
38+
obj.function = task.function;
39+
obj.status = getTaskStatus(task);
40+
obj.created_at = task.createdAt.toISOString();
41+
obj.updated_at = task.updatedAt.toISOString();
42+
obj.started_at = task.startedAt?.toISOString() || null;
43+
obj.ended_at = task.endedAt?.toISOString() || null;
44+
obj.canceled_at = task.canceledAt?.toISOString() || null;
45+
}
46+
}
47+
48+
export class InternalTaskDto extends TaskDtoBase {
49+
payload: Record<string, any> | null;
50+
input: Record<string, any>;
51+
output: Record<string, any> | null;
52+
exception: Record<string, any> | null;
53+
54+
static fromEntity(task: Task): InternalTaskDto {
55+
const dto = new InternalTaskDto();
56+
this.setValue(dto, task);
57+
58+
dto.payload = task.payload;
59+
dto.input = task.input;
60+
dto.output = task.output;
61+
dto.exception = task.exception;
62+
return dto;
63+
}
64+
}
65+
66+
export class TaskMetaDto extends TaskDtoBase {
67+
attrs: Record<string, any> | null;
68+
69+
protected static setValue(obj: TaskMetaDto, task: Task) {
70+
super.setValue(obj, task);
71+
72+
if (task.payload) {
73+
obj.attrs = {};
74+
for (const [key, value] of Object.entries(task.payload)) {
75+
if (key !== 'trace_headers') {
76+
obj.attrs[key] = value;
77+
}
78+
}
79+
}
80+
}
81+
82+
static fromEntity(task: Task): TaskMetaDto {
83+
const dto = new TaskMetaDto();
84+
this.setValue(dto, task);
85+
return dto;
86+
}
87+
}
88+
89+
export class TaskDto extends TaskMetaDto {
90+
input: Record<string, any>;
91+
output: Record<string, any> | null;
92+
exception: Record<string, any> | null;
93+
3794
static fromEntity(task: Task): TaskDto {
3895
const dto = new TaskDto();
39-
dto.id = task.id;
40-
dto.namespace_id = task.namespaceId;
41-
dto.user_id = task.userId;
42-
dto.priority = task.priority;
43-
dto.function = task.function;
96+
this.setValue(dto, task);
97+
4498
dto.input = task.input;
45-
dto.payload = task.payload;
4699
dto.output = task.output;
47100
dto.exception = task.exception;
48-
dto.status = getTaskStatus(task);
49-
dto.created_at = task.createdAt.toISOString();
50-
dto.updated_at = task.updatedAt.toISOString();
51-
dto.started_at = task.startedAt?.toISOString() || null;
52-
dto.ended_at = task.endedAt?.toISOString() || null;
53-
dto.canceled_at = task.canceledAt?.toISOString() || null;
54101
return dto;
55102
}
56103
}

src/tasks/task-pipeline.e2e-spec.ts

Lines changed: 33 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { HttpStatus } from '@nestjs/common';
33
import { Task } from 'omniboxd/tasks/tasks.entity';
44
import { TaskCallbackDto } from 'omniboxd/wizard/dto/task-callback.dto';
55
import { isEmpty } from 'omniboxd/utils/is-empty';
6-
import { TaskDto } from 'omniboxd/tasks/dto/task.dto';
6+
import { TaskDto, TaskMetaDto } from 'omniboxd/tasks/dto/task.dto';
77

88
/**
99
* Mock wizard worker that simulates the wizard worker service behavior
@@ -515,20 +515,22 @@ describe('Task Pipeline (e2e)', () => {
515515
await MockWizardWorker.waitFor(async () => {
516516
// Check if extract_tags task was created and completed
517517
const tasksResponse = await client.get(
518-
`/api/v1/namespaces/${client.namespace.id}/tasks?namespace=${client.namespace.id}`,
518+
`/api/v1/namespaces/${client.namespace.id}/tasks`,
519519
);
520520
if (tasksResponse.status !== 200) return false;
521521

522-
const tasks = tasksResponse.body;
523-
const collectTask = tasks.find((t: any) => t.id === collectTaskId);
524-
const extractTagsTask = tasks.find(
525-
(t: any) =>
522+
const tasks: TaskMetaDto[] = tasksResponse.body;
523+
const collectTask: TaskMetaDto | undefined = tasks.find(
524+
(t: any) => t.id === collectTaskId,
525+
);
526+
const extractTagsTask: TaskMetaDto | undefined = tasks.find(
527+
(t: TaskMetaDto) =>
526528
t.function === 'extract_tags' &&
527-
t.payload?.parent_task_id === collectTaskId,
529+
t.attrs?.parent_task_id === collectTaskId,
528530
);
529531

530-
expect(isEmpty(collectTask?.exception)).toBe(true);
531-
expect(isEmpty(extractTagsTask?.exception)).toBe(true);
532+
expect(collectTask?.status).not.toBe('error');
533+
expect(extractTagsTask?.status).not.toBe('error');
532534

533535
return (
534536
!isEmpty(collectTask?.ended_at) && !isEmpty(extractTagsTask?.ended_at)
@@ -537,25 +539,31 @@ describe('Task Pipeline (e2e)', () => {
537539

538540
// Verify both tasks completed successfully
539541
const tasksResponse = await client.get(
540-
`/api/v1/namespaces/${client.namespace.id}/tasks?namespace=${client.namespace.id}`,
542+
`/api/v1/namespaces/${client.namespace.id}/tasks`,
541543
);
542-
const tasks = tasksResponse.body;
543-
544-
const collectTask = tasks.find((t: any) => t.id === collectTaskId);
545-
const extractTagsTask = tasks.find(
546-
(t: any) =>
544+
const taskMetaList: TaskMetaDto[] = tasksResponse.body;
545+
const extractTagsTaskMeta = taskMetaList.find(
546+
(t: TaskMetaDto) =>
547547
t.function === 'extract_tags' &&
548-
t.payload?.parent_task_id === collectTaskId,
549-
);
548+
t.attrs?.parent_task_id === collectTaskId,
549+
)!;
550+
const collectTask: TaskDto = await client
551+
.get(`/api/v1/namespaces/${client.namespace.id}/tasks/${collectTaskId}`)
552+
.then((res) => res.body);
553+
const extractTagsTask: TaskDto = await client
554+
.get(
555+
`/api/v1/namespaces/${client.namespace.id}/tasks/${extractTagsTaskMeta.id}`,
556+
)
557+
.then((res) => res.body);
550558

551559
expect(collectTask).toBeDefined();
552560
expect(collectTask.ended_at).toBeDefined();
553-
expect(collectTask.output.markdown).toBeDefined();
561+
expect(collectTask.output?.markdown).toBeDefined();
554562

555563
expect(extractTagsTask).toBeDefined();
556564
expect(extractTagsTask.ended_at).toBeDefined();
557-
expect(extractTagsTask.output.tags).toBeDefined();
558-
expect(extractTagsTask.payload.resource_id).toBe(resourceId);
565+
expect(extractTagsTask.output?.tags).toBeDefined();
566+
expect(extractTagsTask.attrs?.resource_id).toBe(resourceId);
559567

560568
const resource = (
561569
await client.get(
@@ -588,16 +596,16 @@ describe('Task Pipeline (e2e)', () => {
588596

589597
await MockWizardWorker.waitFor(async () => {
590598
const tasksResponse = await client.get(
591-
`/api/v1/namespaces/${client.namespace.id}/tasks?namespace=${client.namespace.id}`,
599+
`/api/v1/namespaces/${client.namespace.id}/tasks`,
592600
);
593-
const tasks = tasksResponse.body;
601+
const tasks: TaskMetaDto[] = tasksResponse.body;
594602
const generateTitleTask = tasks.find(
595-
(t: any) =>
603+
(t: TaskMetaDto) =>
596604
t.function === 'generate_title' &&
597-
t.payload.resource_id === resourceId,
605+
t.attrs?.resource_id === resourceId,
598606
);
599607
if (!generateTitleTask) return false;
600-
expect(isEmpty(generateTitleTask.exception)).toBe(true);
608+
expect(generateTitleTask.status).not.toBe('error');
601609
return !isEmpty(generateTitleTask.ended_at);
602610
});
603611

src/tasks/tasks.controller.ts

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ import {
1010
Post,
1111
Query,
1212
} from '@nestjs/common';
13+
import { TaskDto } from 'omniboxd/tasks/dto/task.dto';
1314

1415
@Controller('api/v1/namespaces/:namespaceId/tasks')
1516
export class TasksController {
1617
constructor(private readonly tasksService: TasksService) {}
1718

1819
@Post()
1920
async createTask(@Body() data: Partial<Task>) {
20-
return await this.tasksService.create(data);
21+
return TaskDto.fromEntity(await this.tasksService.create(data));
2122
}
2223

2324
@Get()
@@ -31,7 +32,7 @@ export class TasksController {
3132

3233
@Get(':id')
3334
async getTaskById(@Param('id') id: string) {
34-
return await this.tasksService.get(id);
35+
return TaskDto.fromEntity(await this.tasksService.get(id));
3536
}
3637

3738
@Delete(':id')
@@ -50,3 +51,19 @@ export class TasksController {
5051
return await this.tasksService.rerunTask(id);
5152
}
5253
}
54+
55+
@Controller('api/v1/namespaces/:namespaceId/resources/:resourceId/tasks')
56+
export class ResourceTasksController {
57+
constructor(private readonly tasksService: TasksService) {}
58+
59+
@Get()
60+
async getResourceTasks(
61+
@Param('namespaceId') namespaceId: string,
62+
@Param('resourceId') resourceId: string,
63+
) {
64+
return await this.tasksService.getTasksByResourceId(
65+
namespaceId,
66+
resourceId,
67+
);
68+
}
69+
}

src/tasks/tasks.module.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ import { Module } from '@nestjs/common';
22
import { Task } from 'omniboxd/tasks/tasks.entity';
33
import { TypeOrmModule } from '@nestjs/typeorm';
44
import { TasksService } from 'omniboxd/tasks/tasks.service';
5-
import { TasksController } from 'omniboxd/tasks/tasks.controller';
5+
import {
6+
TasksController,
7+
ResourceTasksController,
8+
} from 'omniboxd/tasks/tasks.controller';
69
import { WizardTaskService } from 'omniboxd/tasks/wizard-task.service';
710

811
@Module({
912
providers: [TasksService, WizardTaskService],
1013
imports: [TypeOrmModule.forFeature([Task])],
11-
controllers: [TasksController],
14+
controllers: [TasksController, ResourceTasksController],
1215
exports: [TasksService, WizardTaskService],
1316
})
1417
export class TasksModule {}

src/tasks/tasks.service.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
Injectable,
77
NotFoundException,
88
} from '@nestjs/common';
9-
import { TaskDto } from './dto/task.dto';
9+
import { TaskDto, TaskMetaDto } from './dto/task.dto';
1010

1111
@Injectable()
1212
export class TasksService {
@@ -24,15 +24,15 @@ export class TasksService {
2424
namespaceId: string,
2525
offset: number,
2626
limit: number,
27-
): Promise<TaskDto[]> {
27+
): Promise<TaskMetaDto[]> {
2828
const tasks = await this.taskRepository.find({
2929
where: { namespaceId },
3030
skip: offset,
3131
take: limit,
3232
order: { createdAt: 'DESC' },
3333
});
3434

35-
return tasks.map((task) => TaskDto.fromEntity(task));
35+
return tasks.map((task) => TaskMetaDto.fromEntity(task));
3636
}
3737

3838
async get(id: string) {
@@ -89,4 +89,18 @@ export class TasksService {
8989

9090
return TaskDto.fromEntity(newTask);
9191
}
92+
93+
async getTasksByResourceId(
94+
namespaceId: string,
95+
resourceId: string,
96+
): Promise<TaskMetaDto[]> {
97+
const tasks = await this.taskRepository
98+
.createQueryBuilder('task')
99+
.where('task.namespaceId = :namespaceId', { namespaceId })
100+
.andWhere("task.payload->>'resource_id' = :resourceId", { resourceId })
101+
.orderBy('task.createdAt', 'DESC')
102+
.getMany();
103+
104+
return tasks.map((task) => TaskMetaDto.fromEntity(task));
105+
}
92106
}

src/tasks/wizard-task.service.ts

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,24 @@ import {
1010
Message,
1111
OpenAIMessageRole,
1212
} from 'omniboxd/messages/entities/message.entity';
13+
import { context, propagation } from '@opentelemetry/api';
1314

1415
@Injectable()
1516
export class WizardTaskService {
1617
constructor(
1718
@InjectRepository(Task) public taskRepository: Repository<Task>,
1819
) {}
1920

21+
injectTraceHeaders(task: Partial<Task>) {
22+
const traceHeaders: Record<string, string> = {};
23+
propagation.inject(context.active(), traceHeaders);
24+
task.payload = { ...(task.payload || {}), trace_headers: traceHeaders };
25+
return task;
26+
}
27+
2028
async create(data: Partial<Task>, repo?: Repository<Task>) {
2129
const repository = repo || this.taskRepository;
22-
const task = repository.create(data);
30+
const task = repository.create(this.injectTraceHeaders(data));
2331
return await repository.save(task);
2432
}
2533

@@ -127,6 +135,7 @@ export class WizardTaskService {
127135
parent_id: resource.parentId,
128136
},
129137
},
138+
payload: { resource_id: resource.id },
130139
namespaceId: resource.namespaceId,
131140
userId: userId,
132141
},
@@ -147,6 +156,7 @@ export class WizardTaskService {
147156
},
148157
namespaceId: resource.namespaceId,
149158
userId,
159+
payload: { resource_id: resource.id },
150160
},
151161
repo,
152162
);
@@ -179,6 +189,7 @@ export class WizardTaskService {
179189
message_id: message.id,
180190
message: message.message,
181191
},
192+
payload: { conversation_id: conversationId, message_id: message.id },
182193
namespaceId,
183194
userId,
184195
},
@@ -197,9 +208,8 @@ export class WizardTaskService {
197208
{
198209
function: 'delete_conversation',
199210
priority,
200-
input: {
201-
conversation_id: conversationId,
202-
},
211+
input: { conversation_id: conversationId },
212+
payload: { conversation_id: conversationId },
203213
namespaceId,
204214
userId,
205215
},

0 commit comments

Comments
 (0)