Skip to content

Commit 5d7a77b

Browse files
authored
db performance improvements and job resilience (#200)
* replace upsert with seperate create many and raw update many calls * add bulk repo status update and queue addition with priority * add support for managed redis * add note for changing raw sql on schema change
1 parent 390d92d commit 5d7a77b

File tree

5 files changed

+166
-37
lines changed

5 files changed

+166
-37
lines changed

Dockerfile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot
8080
ENV DB_DATA_DIR=$DATA_CACHE_DIR/db
8181
ENV DB_NAME=sourcebot
8282
ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot"
83+
ENV REDIS_URL="redis://localhost:6379"
8384
ENV SRC_TENANT_ENFORCEMENT_MODE=strict
8485

8586
ARG SOURCEBOT_VERSION=unknown

packages/backend/src/connectionManager.ts

Lines changed: 120 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
1-
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
1+
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, Repo } from "@sourcebot/db";
22
import { Job, Queue, Worker } from 'bullmq';
33
import { Settings, WithRequired } from "./types.js";
44
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
55
import { createLogger } from "./logger.js";
66
import os from 'os';
77
import { Redis } from 'ioredis';
88
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js";
9-
import { CONFIG_REPO_UPSERT_TIMEOUT_MS } from "./environment.js";
109

1110
interface IConnectionManager {
1211
scheduleConnectionSync: (connection: Connection) => Promise<void>;
@@ -23,15 +22,18 @@ type JobPayload = {
2322
};
2423

2524
export class ConnectionManager implements IConnectionManager {
26-
private queue = new Queue<JobPayload>(QUEUE_NAME);
2725
private worker: Worker;
26+
private queue: Queue<JobPayload>;
2827
private logger = createLogger('ConnectionManager');
2928

3029
constructor(
3130
private db: PrismaClient,
3231
private settings: Settings,
3332
redis: Redis,
3433
) {
34+
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
35+
connection: redis,
36+
});
3537
const numCores = os.cpus().length;
3638
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
3739
connection: redis,
@@ -113,6 +115,7 @@ export class ConnectionManager implements IConnectionManager {
113115
// appear in the repoData array above, and so the RepoToConnection record won't be re-created.
114116
// Repos that have no RepoToConnection records are considered orphaned and can be deleted.
115117
await this.db.$transaction(async (tx) => {
118+
const deleteStart = performance.now();
116119
await tx.connection.update({
117120
where: {
118121
id: job.data.connectionId,
@@ -123,21 +126,124 @@ export class ConnectionManager implements IConnectionManager {
123126
}
124127
}
125128
});
129+
const deleteDuration = performance.now() - deleteStart;
130+
this.logger.info(`Deleted all RepoToConnection records for connection ${job.data.connectionId} in ${deleteDuration}ms`);
126131

127-
await Promise.all(repoData.map((repo) => {
128-
return tx.repo.upsert({
129-
where: {
130-
external_id_external_codeHostUrl: {
131-
external_id: repo.external_id,
132-
external_codeHostUrl: repo.external_codeHostUrl,
133-
},
132+
const existingRepos: Repo[] = await tx.repo.findMany({
133+
where: {
134+
external_id: {
135+
in: repoData.map(repo => repo.external_id),
134136
},
135-
create: repo,
136-
update: repo as Prisma.RepoUpdateInput,
137+
external_codeHostUrl: {
138+
in: repoData.map(repo => repo.external_codeHostUrl),
139+
},
140+
},
141+
});
142+
const existingRepoKeys = existingRepos.map(repo => `${repo.external_id}-${repo.external_codeHostUrl}`);
143+
144+
const existingRepoData = repoData.filter(repo => existingRepoKeys.includes(`${repo.external_id}-${repo.external_codeHostUrl}`));
145+
const [toCreate, toUpdate] = repoData.reduce<[Prisma.RepoCreateManyInput[], Prisma.RepoUpdateManyMutationInput[]]>(([toCreate, toUpdate], repo) => {
146+
const existingRepo = existingRepoData.find((r: RepoData) => r.external_id === repo.external_id && r.external_codeHostUrl === repo.external_codeHostUrl);
147+
if (existingRepo) {
148+
// @note: make sure to reflect any changes here in the raw sql update below
149+
const updateRepo: Prisma.RepoUpdateManyMutationInput = {
150+
name: repo.name,
151+
cloneUrl: repo.cloneUrl,
152+
imageUrl: repo.imageUrl,
153+
isFork: repo.isFork,
154+
isArchived: repo.isArchived,
155+
metadata: repo.metadata,
156+
external_id: repo.external_id,
157+
external_codeHostType: repo.external_codeHostType,
158+
external_codeHostUrl: repo.external_codeHostUrl,
159+
}
160+
toUpdate.push(updateRepo);
161+
} else {
162+
const createRepo: Prisma.RepoCreateManyInput = {
163+
name: repo.name,
164+
cloneUrl: repo.cloneUrl,
165+
imageUrl: repo.imageUrl,
166+
isFork: repo.isFork,
167+
isArchived: repo.isArchived,
168+
metadata: repo.metadata,
169+
orgId: job.data.orgId,
170+
external_id: repo.external_id,
171+
external_codeHostType: repo.external_codeHostType,
172+
external_codeHostUrl: repo.external_codeHostUrl,
173+
}
174+
toCreate.push(createRepo);
175+
}
176+
return [toCreate, toUpdate];
177+
}, [[], []]);
178+
179+
if (toCreate.length > 0) {
180+
const createStart = performance.now();
181+
const createdRepos = await tx.repo.createManyAndReturn({
182+
data: toCreate,
137183
});
138-
}));
139184

140-
}, { timeout: parseInt(CONFIG_REPO_UPSERT_TIMEOUT_MS) });
185+
await tx.repoToConnection.createMany({
186+
data: createdRepos.map(repo => ({
187+
repoId: repo.id,
188+
connectionId: job.data.connectionId,
189+
})),
190+
});
191+
192+
const createDuration = performance.now() - createStart;
193+
this.logger.info(`Created ${toCreate.length} repos in ${createDuration}ms`);
194+
}
195+
196+
if (toUpdate.length > 0) {
197+
const updateStart = performance.now();
198+
199+
// Build values string for update query
200+
const updateValues = toUpdate.map(repo => `(
201+
'${repo.name}',
202+
'${repo.cloneUrl}',
203+
${repo.imageUrl ? `'${repo.imageUrl}'` : 'NULL'},
204+
${repo.isFork},
205+
${repo.isArchived},
206+
'${JSON.stringify(repo.metadata)}'::jsonb,
207+
'${repo.external_id}',
208+
'${repo.external_codeHostType}',
209+
'${repo.external_codeHostUrl}'
210+
)`).join(',');
211+
212+
// Update repos and get their IDs in one quercy
213+
const updateSql = `
214+
WITH updated AS (
215+
UPDATE "Repo" r
216+
SET
217+
name = v.name,
218+
"cloneUrl" = v.clone_url,
219+
"imageUrl" = v.image_url,
220+
"isFork" = v.is_fork,
221+
"isArchived" = v.is_archived,
222+
metadata = v.metadata,
223+
"updatedAt" = NOW()
224+
FROM (
225+
VALUES ${updateValues}
226+
) AS v(name, clone_url, image_url, is_fork, is_archived, metadata, external_id, external_code_host_type, external_code_host_url)
227+
WHERE r.external_id = v.external_id
228+
AND r."external_codeHostUrl" = v.external_code_host_url
229+
RETURNING r.id
230+
)
231+
SELECT id FROM updated
232+
`;
233+
const updatedRepoIds = await tx.$queryRawUnsafe<{id: number}[]>(updateSql);
234+
235+
// Insert repo-connection mappings
236+
const createConnectionSql = `
237+
INSERT INTO "RepoToConnection" ("repoId", "connectionId", "addedAt")
238+
SELECT id, ${job.data.connectionId}, NOW()
239+
FROM unnest(ARRAY[${updatedRepoIds.map(r => r.id).join(',')}]) AS id
240+
`;
241+
await tx.$executeRawUnsafe(createConnectionSql);
242+
243+
const updateDuration = performance.now() - updateStart;
244+
this.logger.info(`Updated ${toUpdate.length} repos in ${updateDuration}ms`);
245+
}
246+
});
141247
}
142248

143249

packages/backend/src/environment.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,5 @@ export const FALLBACK_GITHUB_TOKEN = getEnv(process.env.FALLBACK_GITHUB_TOKEN);
3535
export const FALLBACK_GITLAB_TOKEN = getEnv(process.env.FALLBACK_GITLAB_TOKEN);
3636
export const FALLBACK_GITEA_TOKEN = getEnv(process.env.FALLBACK_GITEA_TOKEN);
3737

38-
export const CONFIG_REPO_UPSERT_TIMEOUT_MS = getEnv(process.env.CONFIG_REPO_UPSERT_TIMEOUT_MS, '15000')!;
39-
40-
export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
38+
export const INDEX_CONCURRENCY_MULTIPLE = getEnv(process.env.INDEX_CONCURRENCY_MULTIPLE);
39+
export const REDIS_URL = getEnv(process.env.REDIS_URL, 'redis://localhost:6379')!;

packages/backend/src/main.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,12 @@ import { DEFAULT_SETTINGS } from './constants.js';
55
import { Redis } from 'ioredis';
66
import { ConnectionManager } from './connectionManager.js';
77
import { RepoManager } from './repoManager.js';
8-
import { INDEX_CONCURRENCY_MULTIPLE } from './environment.js';
8+
import { INDEX_CONCURRENCY_MULTIPLE, REDIS_URL } from './environment.js';
99

1010
const logger = createLogger('main');
1111

1212
export const main = async (db: PrismaClient, context: AppContext) => {
13-
const redis = new Redis({
14-
host: 'localhost',
15-
port: 6379,
13+
const redis = new Redis(REDIS_URL, {
1614
maxRetriesPerRequest: null
1715
});
1816
redis.ping().then(() => {

packages/backend/src/repoManager.ts

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import os from 'os';
1313

1414
interface IRepoManager {
1515
blockingPollLoop: () => void;
16-
scheduleRepoIndexing: (repo: RepoWithConnections) => Promise<void>;
16+
scheduleRepoIndexingBulk: (repos: RepoWithConnections[]) => Promise<void>;
1717
dispose: () => void;
1818
}
1919

@@ -25,8 +25,8 @@ type JobPayload = {
2525
}
2626

2727
export class RepoManager implements IRepoManager {
28-
private queue = new Queue<JobPayload>(QUEUE_NAME);
2928
private worker: Worker;
29+
private queue: Queue<JobPayload>;
3030
private logger = createLogger('RepoManager');
3131

3232
constructor(
@@ -35,6 +35,9 @@ export class RepoManager implements IRepoManager {
3535
redis: Redis,
3636
private ctx: AppContext,
3737
) {
38+
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
39+
connection: redis,
40+
});
3841
const numCores = os.cpus().length;
3942
this.worker = new Worker(QUEUE_NAME, this.runIndexJob.bind(this), {
4043
connection: redis,
@@ -46,26 +49,48 @@ export class RepoManager implements IRepoManager {
4649

4750
public async blockingPollLoop() {
4851
while(true) {
49-
this.fetchAndScheduleRepoIndexing();
50-
this.garbageCollectRepo();
52+
await this.fetchAndScheduleRepoIndexing();
53+
await this.garbageCollectRepo();
5154

5255
await new Promise(resolve => setTimeout(resolve, this.settings.reindexRepoPollingIntervalMs));
5356
}
5457
}
5558

56-
public async scheduleRepoIndexing(repo: RepoWithConnections) {
59+
public async scheduleRepoIndexingBulk(repos: RepoWithConnections[]) {
5760
await this.db.$transaction(async (tx) => {
58-
await tx.repo.update({
59-
where: { id: repo.id },
60-
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
61+
await tx.repo.updateMany({
62+
where: { id: { in: repos.map(repo => repo.id) } },
63+
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE }
6164
});
65+
66+
const reposByOrg = repos.reduce<Record<number, RepoWithConnections[]>>((acc, repo) => {
67+
if (!acc[repo.orgId]) {
68+
acc[repo.orgId] = [];
69+
}
70+
acc[repo.orgId].push(repo);
71+
return acc;
72+
}, {});
73+
74+
for (const orgId in reposByOrg) {
75+
const orgRepos = reposByOrg[orgId];
76+
// Set priority based on number of repos (more repos = lower priority)
77+
// This helps prevent large orgs from overwhelming the queue
78+
const priority = Math.min(Math.ceil(orgRepos.length / 10), 2097152);
79+
80+
await this.queue.addBulk(orgRepos.map(repo => ({
81+
name: 'repoIndexJob',
82+
data: { repo },
83+
opts: {
84+
priority: priority
85+
}
86+
})));
87+
88+
this.logger.info(`Added ${orgRepos.length} jobs to queue for org ${orgId} with priority ${priority}`);
89+
}
90+
6291

63-
await this.queue.add('repoIndexJob', {
64-
repo
65-
});
66-
this.logger.info(`Added job to queue for repo ${repo.id}`);
6792
}).catch((err: unknown) => {
68-
this.logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
93+
this.logger.error(`Failed to add jobs to queue for repos ${repos.map(repo => repo.id).join(', ')}: ${err}`);
6994
});
7095
}
7196

@@ -91,9 +116,9 @@ export class RepoManager implements IRepoManager {
91116
}
92117
});
93118

94-
for (const repo of repos) {
95-
await this.scheduleRepoIndexing(repo);
96-
}
119+
if (repos.length > 0) {
120+
await this.scheduleRepoIndexingBulk(repos);
121+
}
97122
}
98123

99124
private async garbageCollectRepo() {

0 commit comments

Comments
 (0)