Skip to content

Commit

Permalink
feat: allows for post service to start/stop posts and update cron sch…
Browse files Browse the repository at this point in the history
…edule (#280)
  • Loading branch information
mvdicarlo authored Apr 12, 2024
1 parent 37d5a0e commit d045f37
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
14 changes: 5 additions & 9 deletions apps/client-server/src/app/post/post-manager.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* eslint-disable no-param-reassign */
import { Loaded, wrap } from '@mikro-orm/core';
import { InjectRepository } from '@mikro-orm/nestjs';
import { BadRequestException, Injectable } from '@nestjs/common';
import { Inject, Injectable, forwardRef } from '@nestjs/common';
import { Logger } from '@postybirb/logger';
import {
EntityId,
Expand Down Expand Up @@ -64,6 +64,7 @@ export class PostManagerService {
private readonly postRepository: PostyBirbRepository<PostRecord>,
@InjectRepository(WebsitePostRecord)
private readonly websitePostRecordRepository: PostyBirbRepository<WebsitePostRecord>,
@Inject(forwardRef(() => PostService))
private readonly postService: PostService,
private readonly websiteRegistry: WebsiteRegistryService,
private readonly resizerService: PostFileResizerService,
Expand All @@ -79,7 +80,7 @@ export class PostManagerService {
if (!IsTestEnvironment()) {
const nextToPost = await this.postService.getNext();
if (nextToPost && this.currentPost?.id !== nextToPost.id) {
this.logger.info(`Found next post to post: ${nextToPost.id}`);
this.logger.info(`Found next to post: ${nextToPost.id}`);
this.startPost(nextToPost);
}
}
Expand Down Expand Up @@ -115,7 +116,7 @@ export class PostManagerService {
this.logger.withMetadata(entity.toJSON()).info(`Initializing post`);
this.currentPost = entity;
await this.postRepository.update(entity.id, {
state: PostRecordState.PROCESSING,
state: PostRecordState.RUNNING,
});

// Ensure parent (submission) is loaded
Expand Down Expand Up @@ -150,11 +151,6 @@ export class PostManagerService {
}

private async finishPost(entity: LoadedPostRecord) {
if (this.currentPost) {
this.currentPost = null;
this.cancelToken = null;
}

this.currentPost = null;
this.cancelToken = null;

Expand Down Expand Up @@ -612,7 +608,7 @@ export class PostManagerService {
}

if (result?.errors?.length) {
throw new BadRequestException('Submission contains validation errors');
throw new Error('Submission contains validation errors');
}
}
}
25 changes: 24 additions & 1 deletion apps/client-server/src/app/post/post.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { InjectRepository } from '@mikro-orm/nestjs';
import { Injectable, Optional } from '@nestjs/common';
import { Inject, Injectable, Optional, forwardRef } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { SubmissionId } from '@postybirb/types';
import { Cron as CronGenerator } from 'croner';
import { uniq } from 'lodash';
import { PostyBirbService } from '../common/service/postybirb-service';
import { PostRecord, Submission } from '../database/entities';
Expand All @@ -10,6 +11,7 @@ import { IsTestEnvironment } from '../utils/test.util';
import { WSGateway } from '../web-socket/web-socket-gateway';
import { WebsiteOptionsService } from '../website-options/website-options.service';
import { QueuePostRecordRequestDto } from './dtos/queue-post-record.dto';
import { PostManagerService } from './post-manager.service';

/**
* Handles enqueue and dequeue of post records.
Expand All @@ -18,6 +20,8 @@ import { QueuePostRecordRequestDto } from './dtos/queue-post-record.dto';
@Injectable()
export class PostService extends PostyBirbService<PostRecord> {
constructor(
@Inject(forwardRef(() => PostManagerService))
private readonly postManagerService: PostManagerService,
@InjectRepository(PostRecord)
repository: PostyBirbRepository<PostRecord>,
@InjectRepository(Submission)
Expand Down Expand Up @@ -46,6 +50,17 @@ export class PostService extends PostyBirbService<PostRecord> {
new Date(b.schedule.scheduledFor).getTime()
); // Sort by oldest first.
this.enqueue({ ids: sorted.map((s) => s.id) });

sorted
.filter((s) => s.schedule.cron)
.forEach((s) => {
const next = CronGenerator(s.schedule.cron).nextRun()?.toISOString();
if (next) {
// eslint-disable-next-line no-param-reassign
s.schedule.scheduledFor = next;
this.submissionRepository.persistAndFlush(s);
}
});
}
}

Expand All @@ -61,6 +76,7 @@ export class PostService extends PostyBirbService<PostRecord> {
});

// Filter out any already queued that are not in a completed state.
// It may be better to move completed to a separate table to avoid this check.
const unqueued = uniq(
request.ids.filter(
(id) => !existing.some((e) => e.parent.id === id && !e.completedAt)
Expand All @@ -81,6 +97,11 @@ export class PostService extends PostyBirbService<PostRecord> {
}
}

if (created.length > 0) {
// Attempt to start the post manager if it is not already running.
this.postManagerService.startPost(await this.getNext());
}

return created;
}

Expand All @@ -99,6 +120,8 @@ export class PostService extends PostyBirbService<PostRecord> {
const incomplete = existing.filter(
(e: PostRecord) => e.completedAt !== undefined
);

request.ids.forEach(this.postManagerService.cancelIfRunning);
await this.repository.removeAndFlush(incomplete);
}

Expand Down
2 changes: 1 addition & 1 deletion libs/types/src/enums/post-record-state.enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
export enum PostRecordState {
PENDING = 'PENDING',
PROCESSING = 'PROCESSING',
RUNNING = 'RUNNING',
DONE = 'DONE',
FAILED = 'FAILED',
}

0 comments on commit d045f37

Please sign in to comment.