|
1 | 1 | import { Repository } from 'typeorm'; |
2 | 2 | import { Task } from 'src/tasks/tasks.entity'; |
3 | 3 | import { InjectRepository } from '@nestjs/typeorm'; |
4 | | -import { |
5 | | - BadRequestException, |
6 | | - Injectable, |
7 | | - NotFoundException, |
8 | | -} from '@nestjs/common'; |
| 4 | +import { Injectable, NotFoundException } from '@nestjs/common'; |
9 | 5 | import { NamespacesService } from 'src/namespaces/namespaces.service'; |
10 | | -import { ResourcesService } from 'src/resources/resources.service'; |
11 | | -import { CreateResourceDto } from 'src/resources/dto/create-resource.dto'; |
12 | | -import { CollectRequestDto } from 'src/tasks/dto/collect-request.dto'; |
13 | | -import { User } from 'src/user/user.entity'; |
14 | | -import { TaskCallbackDto } from './dto/task-callback.dto'; |
15 | 6 |
|
16 | 7 | @Injectable() |
17 | 8 | export class TasksService { |
18 | 9 | constructor( |
19 | 10 | @InjectRepository(Task) |
20 | 11 | private taskRepository: Repository<Task>, |
21 | 12 | private readonly namespacesService: NamespacesService, |
22 | | - private readonly resourcesService: ResourcesService, // inject ResourcesService |
23 | 13 | ) {} |
24 | 14 |
|
25 | 15 | async create(data: Partial<Task>) { |
26 | 16 | const newTask = this.taskRepository.create(data); |
27 | 17 | return await this.taskRepository.save(newTask); |
28 | 18 | } |
29 | 19 |
|
30 | | - async collect(user: User, data: CollectRequestDto) { |
31 | | - const { html, url, title, namespace, spaceType } = data; |
32 | | - if (!namespace || !spaceType || !url || !html) { |
33 | | - throw new BadRequestException('Missing required fields'); |
34 | | - } |
35 | | - const ns = await this.namespacesService.findByName(namespace); |
36 | | - if (!ns) { |
37 | | - throw new NotFoundException('Namespace not found'); |
38 | | - } |
39 | | - |
40 | | - // Actually create a resource using ResourcesService |
41 | | - const resourceDto: CreateResourceDto = { |
42 | | - name: title || url, |
43 | | - namespace: ns.id, |
44 | | - resourceType: 'link', |
45 | | - spaceType, |
46 | | - parentId: '', |
47 | | - tags: [], |
48 | | - content: 'Processing...', |
49 | | - attrs: { url }, |
50 | | - }; |
51 | | - // You may need to provide a userId, here assumed as 0 or fetch from context if available |
52 | | - const resource = await this.resourcesService.create(user.id, resourceDto); |
53 | | - |
54 | | - // Add resourceId to payload |
55 | | - const payload = { spaceType, namespace, resourceId: resource.id }; |
56 | | - |
57 | | - // Create a new task with function "collect" |
58 | | - const task = this.taskRepository.create({ |
59 | | - function: 'collect', |
60 | | - input: { html, url, title }, |
61 | | - namespace: ns, |
62 | | - payload, |
63 | | - user, |
64 | | - }); |
65 | | - await this.taskRepository.save(task); |
66 | | - return { taskId: task.id, resourceId: resource.id }; |
67 | | - } |
68 | | - |
69 | | - async taskDoneCallback(data: TaskCallbackDto) { |
70 | | - const task = await this.taskRepository.findOne({ |
71 | | - where: { id: data.id }, |
72 | | - relations: ['namespace'], |
73 | | - }); |
74 | | - if (!task) { |
75 | | - throw new NotFoundException(`Task ${data.id} not found`); |
76 | | - } |
77 | | - |
78 | | - const endedAt: Date = new Date(data.endedAt); |
79 | | - |
80 | | - task.endedAt = endedAt; |
81 | | - task.exception = data.exception; |
82 | | - task.output = data.output; |
83 | | - await this.taskRepository.save(task); |
84 | | - |
85 | | - // Calculate cost and wait (if timestamps are present) |
86 | | - const cost: number = task.endedAt.getTime() - task.startedAt.getTime(); |
87 | | - const wait: number = task.startedAt.getTime() - task.createdAt.getTime(); |
88 | | - console.debug(`Task ${task.id} cost: ${cost}ms, wait: ${wait}ms`); |
89 | | - |
90 | | - // Delegate postprocess logic to a separate method |
91 | | - const postprocessResult = await this.postprocess(task); |
92 | | - |
93 | | - return { taskId: task.id, function: task.function, ...postprocessResult }; |
94 | | - } |
95 | | - |
96 | | - async postprocess(task: Task): Promise<Record<string, any>> { |
97 | | - // Dispatch postprocess logic based on task.function |
98 | | - if (task.function === 'collect') { |
99 | | - return await this.postprocessCollect(task); |
100 | | - } |
101 | | - // Add more function types here as needed |
102 | | - return {}; |
103 | | - } |
104 | | - |
105 | | - private async postprocessCollect(task: Task): Promise<Record<string, any>> { |
106 | | - if (!task.payload?.resourceId) { |
107 | | - throw new BadRequestException('Invalid task payload'); |
108 | | - } |
109 | | - const resourceId = task.payload.resourceId; |
110 | | - if (task.exception) { |
111 | | - // If there was an exception, update resource content with error |
112 | | - await this.resourcesService.update(resourceId, { |
113 | | - namespace: task.namespace.id, |
114 | | - content: task.exception.error, |
115 | | - }); |
116 | | - return {}; |
117 | | - } else if (task.output) { |
118 | | - // If successful, update resource with output |
119 | | - const { markdown, title, ...attrs } = task.output || {}; |
120 | | - await this.resourcesService.update(resourceId, { |
121 | | - namespace: task.namespace.id, |
122 | | - name: title, |
123 | | - content: markdown, |
124 | | - attrs, |
125 | | - }); |
126 | | - return { resourceId }; |
127 | | - } |
128 | | - return {}; |
129 | | - } |
130 | | - |
131 | 20 | async list(namespaceId: string, offset: number, limit: number) { |
132 | 21 | const namespace = await this.namespacesService.get(namespaceId); |
133 | 22 |
|
@@ -162,49 +51,4 @@ export class TasksService { |
162 | 51 | } |
163 | 52 | await this.taskRepository.softRemove(task); |
164 | 53 | } |
165 | | - |
166 | | - async fetch(): Promise<Task | null> { |
167 | | - const rawQuery = ` |
168 | | - WITH running_tasks_sub_query AS (SELECT namespace_id, |
169 | | - COUNT(id) AS running_count |
170 | | - FROM tasks |
171 | | - WHERE started_at IS NOT NULL |
172 | | - AND ended_at IS NULL |
173 | | - AND canceled_at IS NULL |
174 | | - GROUP BY namespace_id), |
175 | | - id_subquery AS (SELECT tasks.id |
176 | | - FROM tasks |
177 | | - LEFT OUTER JOIN running_tasks_sub_query |
178 | | - ON tasks.namespace_id = running_tasks_sub_query.namespace_id |
179 | | - LEFT OUTER JOIN namespaces |
180 | | - ON tasks.namespace_id = namespaces.id |
181 | | - WHERE tasks.started_at IS NULL |
182 | | - AND tasks.canceled_at IS NULL |
183 | | - AND COALESCE(running_tasks_sub_query.running_count, 0) < |
184 | | - COALESCE(namespaces.max_running_tasks, 0) |
185 | | - ORDER BY priority DESC, |
186 | | - tasks.created_at |
187 | | - LIMIT 1 |
188 | | - ) |
189 | | - SELECT * |
190 | | - FROM tasks |
191 | | - WHERE id IN (SELECT id FROM id_subquery) |
192 | | - FOR UPDATE SKIP LOCKED; |
193 | | - `; |
194 | | - |
195 | | - const queryResult = await this.taskRepository.query(rawQuery); |
196 | | - |
197 | | - if (queryResult.length > 0) { |
198 | | - const task = this.taskRepository.create({ |
199 | | - ...(queryResult[0] as Task), |
200 | | - startedAt: new Date(), |
201 | | - user: { id: queryResult[0].user_id }, |
202 | | - namespace: { id: queryResult[0].namespace_id }, |
203 | | - }); |
204 | | - await this.taskRepository.save(task); |
205 | | - return task; |
206 | | - } |
207 | | - |
208 | | - return null; |
209 | | - } |
210 | 54 | } |
0 commit comments