Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/app/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { UserRoleModule } from 'src/user-role/user-role.module';
import { NamespacesModule } from 'src/namespaces/namespaces.module';
import { ResourcesModule } from 'src/resources/resources.module';
import { TasksModule } from 'src/tasks/tasks.module';
import { WizardModule } from 'src/wizard/wizard.module';

@Module({
controllers: [AppController],
Expand All @@ -23,6 +24,7 @@ import { TasksModule } from 'src/tasks/tasks.module';
NamespacesModule,
ResourcesModule,
TasksModule,
WizardModule,
ConfigModule.forRoot({
cache: true,
isGlobal: true,
Expand Down
6 changes: 5 additions & 1 deletion src/resources/resources.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ import { TypeOrmModule } from '@nestjs/typeorm';
import { Resource } from 'src/resources/resources.entity';
import { ResourcesService } from 'src/resources/resources.service';
import { ResourcesController } from 'src/resources/resources.controller';
import { Task } from 'src/tasks/tasks.entity';

@Module({
exports: [ResourcesService],
providers: [ResourcesService],
controllers: [ResourcesController],
imports: [TypeOrmModule.forFeature([Resource])],
imports: [
TypeOrmModule.forFeature([Resource]),
TypeOrmModule.forFeature([Task]),
],
})
export class ResourcesModule {}
64 changes: 53 additions & 11 deletions src/resources/resources.service.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { InjectRepository } from '@nestjs/typeorm';
import { In, FindOptionsWhere, Repository } from 'typeorm';
import { FindOptionsWhere, In, Repository } from 'typeorm';
import { Resource } from 'src/resources/resources.entity';
import { CreateResourceDto } from 'src/resources/dto/create-resource.dto';
import { UpdateResourceDto } from 'src/resources/dto/update-resource.dto';
import {
BadRequestException,
Injectable,
NotFoundException,
BadRequestException,
} from '@nestjs/common';
import { Task } from 'src/tasks/tasks.entity';

export interface IQuery {
namespace: string;
Expand All @@ -22,6 +23,8 @@ export class ResourcesService {
constructor(
@InjectRepository(Resource)
private readonly resourceRepository: Repository<Resource>,
@InjectRepository(Task)
private readonly taskRepository: Repository<Task>,
) {}

async create(userId: string, data: CreateResourceDto) {
Expand Down Expand Up @@ -54,7 +57,7 @@ export class ResourcesService {
...data,
user: { id: userId },
namespace: { id: data.namespace },
parentId: parentResource ? parentResource.id : 0,
parentId: parentResource ? parentResource.id : '',
});

if (parentResource) {
Expand All @@ -63,7 +66,43 @@ export class ResourcesService {
await this.resourceRepository.save(parentResourceRepo);
}

return await this.resourceRepository.save(resource);
const savedResource = await this.resourceRepository.save(resource);
await this.index(savedResource);
return savedResource;
}

async index(resource: Resource) {
if (resource.resourceType === 'folder' || !resource.content) {
return;
}
const task = this.taskRepository.create({
function: 'create_or_update_index',
input: {
title: resource.name,
content: resource.content,
meta_info: {
user_id: resource.user.id,
space_type: resource.spaceType,
resource_id: resource.id,
parent_id: resource.parentId,
},
},
namespace: resource.namespace,
user: resource.user,
});
return await this.taskRepository.save(task);
}

async deleteIndex(resource: Resource) {
const task = this.taskRepository.create({
function: 'delete_index',
input: {
resource_id: resource.id,
},
namespace: resource.namespace,
user: resource.user,
});
return await this.taskRepository.save(task);
}

async getRoot(namespace: string, spaceType: string, userId: string) {
Expand Down Expand Up @@ -133,9 +172,10 @@ export class ResourcesService {
}

async update(id: string, data: UpdateResourceDto) {
console.debug({ id, data });
const resource = await this.resourceRepository.findOne({
where: { id, namespace: { id: data.namespace } },
relations: ['namespace'],
relations: ['namespace', 'user'],
});

if (!resource) {
Expand All @@ -147,7 +187,9 @@ export class ResourcesService {
...data,
namespace: { id: data.namespace },
});
return await this.resourceRepository.save(newResource);
const savedNewResource = await this.resourceRepository.save(newResource);
await this.index(savedNewResource);
return savedNewResource;
}

async deleteChildren(id: string) {
Expand All @@ -166,7 +208,7 @@ export class ResourcesService {
}

async delete(id: string) {
// 更新父级 childCount
// Update parent's childCount
const resource = await this.get(id);
const parent = await this.resourceRepository.findOne({
where: {
Expand All @@ -178,9 +220,9 @@ export class ResourcesService {
const parentResource = this.resourceRepository.create(parent);
await this.resourceRepository.save(parentResource);
}
// 删除自身
await this.resourceRepository.softDelete(id);
// 递归删除子级
await this.deleteChildren(id);
await this.resourceRepository.softDelete(id); // Delete itself
await this.deleteChildren(id); // Delete its children

await this.deleteIndex(resource);
}
}
7 changes: 0 additions & 7 deletions src/tasks/tasks.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import {
Param,
Post,
Query,
Req,
} from '@nestjs/common';
import { CollectRequestDto } from 'src/tasks/dto/collect-request.dto';

@Controller('api/v1/tasks')
export class TasksController {
Expand All @@ -21,11 +19,6 @@ export class TasksController {
return await this.tasksService.create(data);
}

@Post('collect')
async collect(@Req() req, @Body() data: CollectRequestDto) {
return await this.tasksService.collect(req.user, data);
}

@Get()
async listTasks(
@Query('namespace') namespace: string,
Expand Down
10 changes: 2 additions & 8 deletions src/tasks/tasks.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,11 @@ import { Task } from 'src/tasks/tasks.entity';
import { TypeOrmModule } from '@nestjs/typeorm';
import { TasksService } from 'src/tasks/tasks.service';
import { TasksController } from 'src/tasks/tasks.controller';
import { InternalTasksController } from 'src/tasks/internal.tasks.controller';
import { NamespacesModule } from 'src/namespaces/namespaces.module';
import { ResourcesModule } from 'src/resources/resources.module';

@Module({
providers: [TasksService],
imports: [
NamespacesModule,
TypeOrmModule.forFeature([Task]),
ResourcesModule,
],
controllers: [TasksController, InternalTasksController],
imports: [NamespacesModule, TypeOrmModule.forFeature([Task])],
controllers: [TasksController],
})
export class TasksModule {}
158 changes: 1 addition & 157 deletions src/tasks/tasks.service.ts
Original file line number Diff line number Diff line change
@@ -1,133 +1,22 @@
import { Repository } from 'typeorm';
import { Task } from 'src/tasks/tasks.entity';
import { InjectRepository } from '@nestjs/typeorm';
import {
BadRequestException,
Injectable,
NotFoundException,
} from '@nestjs/common';
import { Injectable, NotFoundException } from '@nestjs/common';
import { NamespacesService } from 'src/namespaces/namespaces.service';
import { ResourcesService } from 'src/resources/resources.service';
import { CreateResourceDto } from 'src/resources/dto/create-resource.dto';
import { CollectRequestDto } from 'src/tasks/dto/collect-request.dto';
import { User } from 'src/user/user.entity';
import { TaskCallbackDto } from './dto/task-callback.dto';

@Injectable()
export class TasksService {
constructor(
@InjectRepository(Task)
private taskRepository: Repository<Task>,
private readonly namespacesService: NamespacesService,
private readonly resourcesService: ResourcesService, // inject ResourcesService
) {}

async create(data: Partial<Task>) {
const newTask = this.taskRepository.create(data);
return await this.taskRepository.save(newTask);
}

async collect(user: User, data: CollectRequestDto) {
const { html, url, title, namespace, spaceType } = data;
if (!namespace || !spaceType || !url || !html) {
throw new BadRequestException('Missing required fields');
}
const ns = await this.namespacesService.findByName(namespace);
if (!ns) {
throw new NotFoundException('Namespace not found');
}

// Actually create a resource using ResourcesService
const resourceDto: CreateResourceDto = {
name: title || url,
namespace: ns.id,
resourceType: 'link',
spaceType,
parentId: '',
tags: [],
content: 'Processing...',
attrs: { url },
};
// You may need to provide a userId, here assumed as 0 or fetch from context if available
const resource = await this.resourcesService.create(user.id, resourceDto);

// Add resourceId to payload
const payload = { spaceType, namespace, resourceId: resource.id };

// Create a new task with function "collect"
const task = this.taskRepository.create({
function: 'collect',
input: { html, url, title },
namespace: ns,
payload,
user,
});
await this.taskRepository.save(task);
return { taskId: task.id, resourceId: resource.id };
}

async taskDoneCallback(data: TaskCallbackDto) {
const task = await this.taskRepository.findOne({
where: { id: data.id },
relations: ['namespace'],
});
if (!task) {
throw new NotFoundException(`Task ${data.id} not found`);
}

const endedAt: Date = new Date(data.endedAt);

task.endedAt = endedAt;
task.exception = data.exception;
task.output = data.output;
await this.taskRepository.save(task);

// Calculate cost and wait (if timestamps are present)
const cost: number = task.endedAt.getTime() - task.startedAt.getTime();
const wait: number = task.startedAt.getTime() - task.createdAt.getTime();
console.debug(`Task ${task.id} cost: ${cost}ms, wait: ${wait}ms`);

// Delegate postprocess logic to a separate method
const postprocessResult = await this.postprocess(task);

return { taskId: task.id, function: task.function, ...postprocessResult };
}

async postprocess(task: Task): Promise<Record<string, any>> {
// Dispatch postprocess logic based on task.function
if (task.function === 'collect') {
return await this.postprocessCollect(task);
}
// Add more function types here as needed
return {};
}

private async postprocessCollect(task: Task): Promise<Record<string, any>> {
if (!task.payload?.resourceId) {
throw new BadRequestException('Invalid task payload');
}
const resourceId = task.payload.resourceId;
if (task.exception) {
// If there was an exception, update resource content with error
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
content: task.exception.error,
});
return {};
} else if (task.output) {
// If successful, update resource with output
const { markdown, title, ...attrs } = task.output || {};
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
name: title,
content: markdown,
attrs,
});
return { resourceId };
}
return {};
}

async list(namespaceId: string, offset: number, limit: number) {
const namespace = await this.namespacesService.get(namespaceId);

Expand Down Expand Up @@ -162,49 +51,4 @@ export class TasksService {
}
await this.taskRepository.softRemove(task);
}

async fetch(): Promise<Task | null> {
const rawQuery = `
WITH running_tasks_sub_query AS (SELECT namespace_id,
COUNT(id) AS running_count
FROM tasks
WHERE started_at IS NOT NULL
AND ended_at IS NULL
AND canceled_at IS NULL
GROUP BY namespace_id),
id_subquery AS (SELECT tasks.id
FROM tasks
LEFT OUTER JOIN running_tasks_sub_query
ON tasks.namespace_id = running_tasks_sub_query.namespace_id
LEFT OUTER JOIN namespaces
ON tasks.namespace_id = namespaces.id
WHERE tasks.started_at IS NULL
AND tasks.canceled_at IS NULL
AND COALESCE(running_tasks_sub_query.running_count, 0) <
COALESCE(namespaces.max_running_tasks, 0)
ORDER BY priority DESC,
tasks.created_at
LIMIT 1
)
SELECT *
FROM tasks
WHERE id IN (SELECT id FROM id_subquery)
FOR UPDATE SKIP LOCKED;
`;

const queryResult = await this.taskRepository.query(rawQuery);

if (queryResult.length > 0) {
const task = this.taskRepository.create({
...(queryResult[0] as Task),
startedAt: new Date(),
user: { id: queryResult[0].user_id },
namespace: { id: queryResult[0].namespace_id },
});
await this.taskRepository.save(task);
return task;
}

return null;
}
}
File renamed without changes.
Loading