Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ services:
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
# - ./init.sql:/docker-entrypoint-initdb.d/init.sql
healthcheck:
test: ['CMD', 'pg_isready', '-q', '-d', 'omnibox', '-U', 'omnibox']
test: ["CMD", "pg_isready", "-q", "-d", "${DB_DATABASE}", "-U", "${DB_USERNAME}"]
interval: 30s
timeout: 10s
retries: 5
Expand Down
124 changes: 70 additions & 54 deletions src/wizard/wizard.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,63 @@ import {
Injectable,
NotFoundException,
} from '@nestjs/common';
import { Resource } from 'src/resources/resources.entity';
import { InjectRepository } from '@nestjs/typeorm';
import { Task } from '../tasks/tasks.entity';
import { Task } from 'src/tasks/tasks.entity';
import { Repository } from 'typeorm';
import { NamespacesService } from 'src/namespaces/namespaces.service';
import { ResourcesService } from 'src/resources/resources.service';
import { CreateResourceDto } from 'src/resources/dto/create-resource.dto';
import { CollectRequestDto } from './dto/collect-request.dto';
import { CollectRequestDto } from 'src/wizard/dto/collect-request.dto';
import { User } from 'src/user/user.entity';
import { TaskCallbackDto } from 'src/wizard/dto/task-callback.dto';

abstract class Processor {
abstract process(task: Task): Promise<Record<string, any>>;
}

class CollectProcessor extends Processor {
constructor(private readonly resourcesService: ResourcesService) {
super();
}

async process(task: Task): Promise<Record<string, any>> {
if (!task.payload?.resourceId) {
throw new BadRequestException('Invalid task payload');
}
const resourceId = task.payload.resourceId;
if (task.exception) {
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
content: task.exception.error,
});
return {};
} else if (task.output) {
const { markdown, title, ...attrs } = task.output || {};
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
name: title,
content: markdown,
attrs,
});
return { resourceId };
}
return {};
}
}

@Injectable()
export class WizardService {
private readonly processors: Record<string, Processor>;

constructor(
@InjectRepository(Task) private taskRepository: Repository<Task>,
private readonly namespacesService: NamespacesService,
private readonly resourcesService: ResourcesService,
) {}
) {
this.processors = {
collect: new CollectProcessor(resourcesService),
};
}

async create(partialTask: Partial<Task>) {
const task = this.taskRepository.create(partialTask);
Expand Down Expand Up @@ -87,62 +126,39 @@ export class WizardService {
}

async postprocess(task: Task): Promise<Record<string, any>> {
if (task.function === 'collect') {
return await this.postprocessCollect(task);
}
return {};
}

private async postprocessCollect(task: Task): Promise<Record<string, any>> {
if (!task.payload?.resourceId) {
throw new BadRequestException('Invalid task payload');
}
const resourceId = task.payload.resourceId;
if (task.exception) {
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
content: task.exception.error,
});
return {};
} else if (task.output) {
const { markdown, title, ...attrs } = task.output || {};
await this.resourcesService.update(resourceId, {
namespace: task.namespace.id,
name: title,
content: markdown,
attrs,
});
return { resourceId };
if (task.function in this.processors) {
const processor = this.processors[task.function];
return await processor.process(task);
}
return {};
}

async fetch(): Promise<Task | null> {
const rawQuery = `
WITH running_tasks_sub_query AS (SELECT namespace_id,
COUNT(id) AS running_count
FROM tasks
WHERE started_at IS NOT NULL
AND ended_at IS NULL
AND canceled_at IS NULL
GROUP BY namespace_id),
id_subquery AS (SELECT tasks.id
FROM tasks
LEFT OUTER JOIN running_tasks_sub_query
ON tasks.namespace_id = running_tasks_sub_query.namespace_id
LEFT OUTER JOIN namespaces
ON tasks.namespace_id = namespaces.id
WHERE tasks.started_at IS NULL
AND tasks.canceled_at IS NULL
AND COALESCE(running_tasks_sub_query.running_count, 0) <
COALESCE(namespaces.max_running_tasks, 0)
ORDER BY priority DESC,
tasks.created_at
LIMIT 1)
SELECT *
FROM tasks
WHERE id IN (SELECT id FROM id_subquery)
FOR UPDATE SKIP LOCKED;
WITH running_tasks_sub_query AS (SELECT namespace_id,
COUNT(id) AS running_count
FROM tasks
WHERE started_at IS NOT NULL
AND ended_at IS NULL
AND canceled_at IS NULL
GROUP BY namespace_id),
id_subquery AS (SELECT tasks.id
FROM tasks
LEFT OUTER JOIN running_tasks_sub_query
ON tasks.namespace_id = running_tasks_sub_query.namespace_id
LEFT OUTER JOIN namespaces
ON tasks.namespace_id = namespaces.id
WHERE tasks.started_at IS NULL
AND tasks.canceled_at IS NULL
AND COALESCE(running_tasks_sub_query.running_count, 0) <
COALESCE(namespaces.max_running_tasks, 0)
ORDER BY priority DESC,
tasks.created_at
LIMIT 1)
SELECT *
FROM tasks
WHERE id IN (SELECT id FROM id_subquery)
FOR UPDATE SKIP LOCKED;
`;

const queryResult = await this.taskRepository.query(rawQuery);
Expand Down