Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 0 additions & 155 deletions apps/job-service/src/scheduled-jobs/scheduled-jobs.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,30 @@ import { Queue } from "bullmq";
import { Test } from "@nestjs/testing";
import { getQueueToken } from "@nestjs/bullmq";
import { Op } from "sequelize";
import { Transaction } from "sequelize";
import { ScheduledJobsService } from "./scheduled-jobs.service";
import {
Framework,
FundingProgramme,
Notification,
PasswordReset,
ScheduledJob,
Task,
Verification
} from "@terramatch-microservices/database/entities";
import { ScheduledJobFactory } from "@terramatch-microservices/database/factories/scheduled-job.factory";
import { REPORT_REMINDER_EVENT, SITE_AND_NURSERY_REMINDER_EVENT, TASK_DUE_EVENT } from "./scheduled-jobs.processor";
import { WeeklyPolygonUpdateEmail } from "@terramatch-microservices/common/email/weekly-polygon-update.email";
import { FrameworkFactory, FundingProgrammeFactory } from "@terramatch-microservices/database/factories";
import { CACHED_EXPORT_ENTITY_TYPES } from "@terramatch-microservices/database/constants/entities";
import { TaskDigestEmail } from "@terramatch-microservices/common/email/task-digest.email";

describe("ScheduledJobsService", () => {
let service: ScheduledJobsService;
let queue: DeepMocked<Queue>;
let emailQueue: DeepMocked<Queue>;
let entitiesQueue: DeepMocked<Queue>;

beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
ScheduledJobsService,
{ provide: getQueueToken("scheduled-jobs"), useValue: (queue = createMock<Queue>()) },
{ provide: getQueueToken("email"), useValue: (emailQueue = createMock<Queue>()) },
{ provide: getQueueToken("entities"), useValue: (entitiesQueue = createMock<Queue>()) }
]
}).compile();
Expand Down Expand Up @@ -134,124 +128,6 @@ describe("ScheduledJobsService", () => {
});
});

describe("enqueueTaskDigestEmails", () => {
let transactionSpy: jest.SpyInstance | undefined;
let querySpy: jest.SpyInstance | undefined;

beforeEach(() => {
const sequelize = Task.sequelize;
if (sequelize == null) {
throw new Error("Task.sequelize is not initialized");
}
const mockTransaction = async (arg?: unknown) => {
if (typeof arg === "function") {
return await (arg as (t: Transaction) => Promise<void>)({} as Transaction);
}
return {} as Transaction;
};
const mockQuery = async (sql: string | { query: string }) => {
const q = typeof sql === "string" ? sql : sql.query;
if (q.includes("GET_LOCK")) {
return [{ got: 1 }];
}
if (q.includes("RELEASE_LOCK")) {
return [{ rel: 1 }];
}
return [];
};
transactionSpy = jest.spyOn(sequelize, "transaction").mockImplementation(mockTransaction as never);
querySpy = jest.spyOn(sequelize, "query").mockImplementation(mockQuery as never);
});

afterEach(() => {
transactionSpy?.mockRestore();
querySpy?.mockRestore();
});

it("should not enqueue when no incomplete tasks", async () => {
const countSpy = jest.spyOn(Task, "count").mockResolvedValue(0);
await service.enqueueTaskDigestEmails();
expect(emailQueue.add).not.toHaveBeenCalled();
countSpy.mockRestore();
});

it("should enqueue task digest jobs using paginated batches", async () => {
const countSpy = jest.spyOn(Task, "count").mockResolvedValue(3);
const findAllSpy = jest.spyOn(Task, "findAll").mockResolvedValue([{ id: 10 }, { id: 20 }, { id: 30 }] as Task[]);

await service.enqueueTaskDigestEmails();

expect(emailQueue.add).toHaveBeenCalledTimes(1);
expect(emailQueue.add).toHaveBeenCalledWith(TaskDigestEmail.NAME, { taskIds: [10, 20, 30] });
countSpy.mockRestore();
findAllSpy.mockRestore();
});
});

describe("enqueueWeeklyPolygonUpdateEmails", () => {
let transactionSpy: jest.SpyInstance | undefined;
let querySpy: jest.SpyInstance | undefined;

beforeEach(() => {
const sequelize = Task.sequelize;
if (sequelize == null) {
throw new Error("Task.sequelize is not initialized");
}
const mockTransaction = async (arg?: unknown) => {
if (typeof arg === "function") {
return await (arg as (t: Transaction) => Promise<void>)({} as Transaction);
}
return {} as Transaction;
};
const mockQuery = async (sql: string | { query: string }) => {
const q = typeof sql === "string" ? sql : sql.query;
if (q.includes("GET_LOCK")) {
return [{ got: 1 }];
}
if (q.includes("RELEASE_LOCK")) {
return [{ rel: 1 }];
}
return [];
};
transactionSpy = jest.spyOn(sequelize, "transaction").mockImplementation(mockTransaction as never);
querySpy = jest.spyOn(sequelize, "query").mockImplementation(mockQuery as never);
});

afterEach(() => {
transactionSpy?.mockRestore();
querySpy?.mockRestore();
});

it("should not enqueue when no polygon UUIDs in window", async () => {
const loadSpy = jest.spyOn(WeeklyPolygonUpdateEmail, "loadRecentSitePolygonUuids").mockResolvedValue([]);
await service.enqueueWeeklyPolygonUpdateEmails();
expect(emailQueue.add).not.toHaveBeenCalled();
loadSpy.mockRestore();
});

it("should enqueue polygon digest jobs in uuid chunks", async () => {
const uuids = ["uuid-a", "uuid-b"];
const loadSpy = jest.spyOn(WeeklyPolygonUpdateEmail, "loadRecentSitePolygonUuids").mockResolvedValue(uuids);
await service.enqueueWeeklyPolygonUpdateEmails();
expect(emailQueue.add).toHaveBeenCalledWith(WeeklyPolygonUpdateEmail.NAME, { sitePolygonUuids: uuids });
loadSpy.mockRestore();
});

it("should split polygon UUIDs into multiple queue jobs when above chunk size", async () => {
const uuids = Array.from({ length: 51 }, (_, i) => `uuid-${i}`);
const loadSpy = jest.spyOn(WeeklyPolygonUpdateEmail, "loadRecentSitePolygonUuids").mockResolvedValue(uuids);
await service.enqueueWeeklyPolygonUpdateEmails();
expect(emailQueue.add).toHaveBeenCalledTimes(2);
expect(emailQueue.add).toHaveBeenNthCalledWith(1, WeeklyPolygonUpdateEmail.NAME, {
sitePolygonUuids: uuids.slice(0, 50)
});
expect(emailQueue.add).toHaveBeenNthCalledWith(2, WeeklyPolygonUpdateEmail.NAME, {
sitePolygonUuids: uuids.slice(50, 51)
});
loadSpy.mockRestore();
});
});

it("queues export generation for every framework and entity type", async () => {
await Promise.all((await Framework.findAll()).map(framework => framework.destroy()));
const frameworks = await FrameworkFactory.createMany(2);
Expand Down Expand Up @@ -284,47 +160,16 @@ describe("ScheduledJobsService", () => {

describe("ScheduledJobsService maintenance cleanup", () => {
let service: ScheduledJobsService;
let transactionSpy: jest.SpyInstance | undefined;
let querySpy: jest.SpyInstance | undefined;

beforeEach(async () => {
const module = await Test.createTestingModule({
providers: [
ScheduledJobsService,
{ provide: getQueueToken("scheduled-jobs"), useValue: createMock<Queue>() },
{ provide: getQueueToken("email"), useValue: createMock<Queue>() },
{ provide: getQueueToken("entities"), useValue: createMock<Queue>() }
]
}).compile();
service = module.get(ScheduledJobsService);

const sequelize = Task.sequelize;
if (sequelize == null) {
throw new Error("Task.sequelize is not initialized");
}
const mockTransaction = async (arg?: unknown) => {
if (typeof arg === "function") {
return await (arg as (t: Transaction) => Promise<void>)({} as Transaction);
}
return {} as Transaction;
};
const mockQuery = async (sql: string | { query: string }) => {
const q = typeof sql === "string" ? sql : sql.query;
if (q.includes("GET_LOCK")) {
return [{ got: 1 }];
}
if (q.includes("RELEASE_LOCK")) {
return [{ rel: 1 }];
}
return [];
};
transactionSpy = jest.spyOn(sequelize, "transaction").mockImplementation(mockTransaction as never);
querySpy = jest.spyOn(sequelize, "query").mockImplementation(mockQuery as never);
});

afterEach(() => {
transactionSpy?.mockRestore();
querySpy?.mockRestore();
});

it("removeStaleVerifications deletes rows past retention", async () => {
Expand Down
90 changes: 1 addition & 89 deletions apps/job-service/src/scheduled-jobs/scheduled-jobs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import {
Notification,
PasswordReset,
ScheduledJob,
Task,
Verification
} from "@terramatch-microservices/database/entities";
import { ENTERPRISES, LANDSCAPES } from "@terramatch-microservices/database/constants";
import { APPROVED, AWAITING_APPROVAL } from "@terramatch-microservices/database/constants/status";
import { Op, QueryTypes, Transaction } from "sequelize";
import { Op, Transaction } from "sequelize";
import type { TaskDue } from "@terramatch-microservices/database/constants/scheduled-jobs";
import {
REPORT_REMINDER,
Expand All @@ -23,19 +21,8 @@ import { Queue } from "bullmq";
import { InjectQueue } from "@nestjs/bullmq";
import { DateTime } from "luxon";
import { REPORT_REMINDER_EVENT, SITE_AND_NURSERY_REMINDER_EVENT, TASK_DUE_EVENT } from "./scheduled-jobs.processor";
import { WeeklyPolygonUpdateEmail } from "@terramatch-microservices/common/email/weekly-polygon-update.email";
import { batchFindAll } from "@terramatch-microservices/common/util/batch-find-all";
import { PaginatedQueryBuilder } from "@terramatch-microservices/common/util/paginated-query.builder";
import { CACHED_EXPORT_ENTITY_TYPES } from "@terramatch-microservices/database/constants/entities";
import { isNotNull } from "@terramatch-microservices/database/types/array";
import { TaskDigestEmail } from "@terramatch-microservices/common/email/task-digest.email";

const TASK_DIGEST_CHUNK_SIZE = 100;
const POLYGON_DIGEST_CHUNK_SIZE = 50;

/** MySQL/MariaDB named locks so only one job-service replica runs each digest cron at a time. */
const MYSQL_LOCK_TASK_DIGEST_WEEKLY = "tm_job_svc_task_digest_wk";
const MYSQL_LOCK_POLYGON_UPDATES_WEEKLY = "tm_job_svc_polygon_update_wk";

const VERIFICATION_RETENTION_HOURS = 48;
const PASSWORD_RESET_RETENTION_DAYS = 7;
Expand All @@ -47,7 +34,6 @@ export class ScheduledJobsService {

constructor(
@InjectQueue("scheduled-jobs") private readonly scheduledJobsQueue: Queue,
@InjectQueue("email") private readonly emailQueue: Queue,
@InjectQueue("entities") private readonly entitiesQueue: Queue
) {}

Expand Down Expand Up @@ -133,49 +119,6 @@ export class ScheduledJobsService {
}
}

/**
* Weekly incomplete-task digest. Uses a MySQL named lock so only one replica enqueues when
* multiple job-service instances are deployed.
*/
@Cron("0 17 * * 1", { name: "taskDigestWeekly" })
async enqueueTaskDigestEmails() {
await Task.sql.transaction(async transaction => {
await this.runWithMysqlNamedLock(MYSQL_LOCK_TASK_DIGEST_WEEKLY, transaction, async () => {
this.logger.log("Enqueueing task digest email jobs (incomplete tasks)");
let total = 0;
const builder = new PaginatedQueryBuilder(Task, TASK_DIGEST_CHUNK_SIZE).attributes(["id"]).where({
status: { [Op.notIn]: [AWAITING_APPROVAL, APPROVED] }
});
for await (const page of batchFindAll(builder)) {
const taskIds = page.map(({ id }) => id);
total += taskIds.length;
await new TaskDigestEmail({ taskIds }).sendLater(this.emailQueue);
}
this.logger.log(`Task digest: enqueued chunks covering up to ${total} incomplete tasks`);
});
});
}

/**
* Weekly polygon update digest. Uses a MySQL named lock so only one replica enqueues when
* multiple job-service instances are deployed.
*/
@Cron("0 0 * * 1", { name: "weeklyPolygonUpdates" })
async enqueueWeeklyPolygonUpdateEmails() {
await Task.sql.transaction(async transaction => {
await this.runWithMysqlNamedLock(MYSQL_LOCK_POLYGON_UPDATES_WEEKLY, transaction, async () => {
this.logger.log("Enqueueing weekly polygon update email jobs");
const weekAgo = DateTime.now().minus({ days: 7 }).toJSDate();
const uuids = await WeeklyPolygonUpdateEmail.loadRecentSitePolygonUuids(weekAgo);
for (let i = 0; i < uuids.length; i += POLYGON_DIGEST_CHUNK_SIZE) {
const sitePolygonUuids = uuids.slice(i, i + POLYGON_DIGEST_CHUNK_SIZE);
await new WeeklyPolygonUpdateEmail({ sitePolygonUuids }).sendLater(this.emailQueue);
}
this.logger.log(`Weekly polygon updates: enqueued ${uuids.length} UUIDs in chunks`);
});
});
}

@Cron(CronExpression.EVERY_5_MINUTES, { name: "removeStaleVerifications" })
async removeStaleVerifications() {
const cutoff = DateTime.utc().minus({ hours: VERIFICATION_RETENTION_HOURS }).toJSDate();
Expand Down Expand Up @@ -229,35 +172,4 @@ export class ScheduledJobsService {
await this.entitiesQueue.add("generateApplicationExport", { fundingProgrammeId });
}
}

/**
* Runs `fn` while holding a MySQL/MariaDB named lock on the current transaction connection.
* If the lock is not acquired (another node holds it), `fn` is skipped.
*/
private async runWithMysqlNamedLock(
lockName: string,
transaction: Transaction,
fn: () => Promise<void>
): Promise<void> {
const sequelize = Task.sql;
const rows = await sequelize.query<{ got: number }>("SELECT GET_LOCK(:name, 0) AS got", {
replacements: { name: lockName },
transaction,
type: QueryTypes.SELECT
});
const first = rows[0];
if (first == null || Number(first.got) !== 1) {
this.logger.debug(`Cron skipped: could not acquire named lock [${lockName}]`);
return;
}
try {
await fn();
} finally {
await sequelize.query("SELECT RELEASE_LOCK(:name) AS rel", {
replacements: { name: lockName },
transaction,
type: QueryTypes.SELECT
});
}
}
}
3 changes: 2 additions & 1 deletion libs/common/src/lib/email/entity-status-update.email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ export class EntityStatusUpdateEmail extends EmailSender<SpecificEntityData> {
return [];
}

// Project managers perform approvals; PDs (non-managers) receive status update emails.
return await User.findAll({
where: { id: { [Op.in]: ProjectUser.projectUsersSubquery(projectId) } },
where: { id: { [Op.in]: ProjectUser.projectNonManagersSubquery(projectId) } },
attributes: ["emailAddress", "locale"]
});
}
Expand Down
5 changes: 2 additions & 3 deletions libs/common/src/lib/email/task-digest.email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ export class TaskDigestEmail extends EmailSender<TaskDigestEmailData> {
}

async send(emailService: EmailService) {
for (const taskId of this.data.taskIds) {
await this.sendForTask(taskId, emailService);
}
void emailService;
this.logger.log("Task digest emails are disabled");
}

private async sendForTask(taskId: number, emailService: EmailService) {
Expand Down
7 changes: 2 additions & 5 deletions libs/common/src/lib/email/weekly-polygon-update.email.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { TMLogger } from "../util/tm-logger";
import { Dictionary, groupBy } from "lodash";
import { ValidLocale } from "@terramatch-microservices/database/constants/locale";
import { Op, col, fn } from "sequelize";
import { DateTime } from "luxon";

export type WeeklyPolygonUpdateEmailData = {
sitePolygonUuids: string[];
Expand Down Expand Up @@ -49,10 +48,8 @@ export class WeeklyPolygonUpdateEmail extends EmailSender<WeeklyPolygonUpdateEma
}

async send(emailService: EmailService) {
const weekAgo = DateTime.now().minus({ days: 7 }).toJSDate();
for (const sitePolygonUuid of this.data.sitePolygonUuids) {
await this.sendForPolygon(sitePolygonUuid, weekAgo, emailService);
}
void emailService;
this.logger.log("Weekly polygon digest emails are disabled");
}

private async sendForPolygon(sitePolygonUuid: string, weekAgo: Date, emailService: EmailService) {
Expand Down
4 changes: 4 additions & 0 deletions libs/database/src/lib/entities/project-user.entity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ export class ProjectUser extends Model<InferAttributes<ProjectUser>, InferCreati
return Subquery.select(ProjectUser, "userId").eq("projectId", projectId).eq("isManaging", true).literal;
}

static projectNonManagersSubquery(projectId: number) {
return Subquery.select(ProjectUser, "userId").eq("projectId", projectId).eq("isManaging", false).literal;
}

@PrimaryKey
@AutoIncrement
@Column(BIGINT.UNSIGNED)
Expand Down
Loading