Skip to content

Commit 7029aa7

Browse files
parallelize repo indexing (#163)
* hacked together a example of using zoekt grpc api * provide tenant id to zoekt git indexer * update zoekt version to point to multitenant branch * pipe tenant id through header to zoekt * remove incorrect submodule reference and settings typo * update zoekt commit * remove unused yarn script * remove unused grpc client in web server * remove unneeded deps and improve tenant id log * pass tenant id when creating repo in db * add mt yarn script * add pol of bullmq into backend * add better error handling and concurrency setting * spin up redis instance in dockerfile * cleanup transaction logic when adding repos to index queue * add NEW index status fetch condition * move bullmq deps to backend --------- Co-authored-by: bkellam <bshizzle1234@gmail.com>
1 parent bd027f7 commit 7029aa7

File tree

10 files changed

+306
-43
lines changed

10 files changed

+306
-43
lines changed

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ ENV POSTHOG_PAPIK=$POSTHOG_PAPIK
8484
# ENV SOURCEBOT_TELEMETRY_DISABLED=1
8585

8686
# Configure dependencies
87-
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq
87+
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq redis
8888

8989
# Configure zoekt
9090
COPY vendor/zoekt/install-ctags-alpine.sh .

package.json

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
"scripts": {
77
"build": "yarn workspaces run build",
88
"test": "yarn workspaces run test",
9-
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web",
10-
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web",
9+
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
10+
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
1111
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
1212
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
1313
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
14-
"dev:web": "yarn workspace @sourcebot/web dev"
14+
"dev:web": "yarn workspace @sourcebot/web dev",
15+
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"
16+
1517
},
1618
"devDependencies": {
1719
"npm-run-all": "^4.1.5"

packages/backend/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
"posthog-node": "^4.2.1",
3535
"simple-git": "^3.27.0",
3636
"strip-json-comments": "^5.0.1",
37-
"winston": "^3.15.0"
37+
"winston": "^3.15.0",
38+
"bullmq": "^5.34.10",
39+
"ioredis": "^5.4.2"
3840
}
3941
}

packages/backend/src/constants.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Settings } from "./types.js";
66
export const DEFAULT_SETTINGS: Settings = {
77
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
88
autoDeleteStaleRepos: true,
9-
reindexIntervalMs: 1000 * 60 * 60, // 1 hour in milliseconds
9+
reindexIntervalMs: 1000 * 60,
1010
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
11+
indexConcurrencyMultiple: 3,
1112
}

packages/backend/src/main.ts

Lines changed: 96 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { PrismaClient, Repo } from '@sourcebot/db';
1+
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
22
import { existsSync, watch } from 'fs';
33
import { syncConfig } from "./config.js";
44
import { cloneRepository, fetchRepository } from "./git.js";
@@ -8,6 +8,9 @@ import { AppContext } from "./types.js";
88
import { getRepoPath, isRemotePath, measure } from "./utils.js";
99
import { indexGitRepository } from "./zoekt.js";
1010
import { DEFAULT_SETTINGS } from './constants.js';
11+
import { Queue, Worker, Job } from 'bullmq';
12+
import { Redis } from 'ioredis';
13+
import * as os from 'os';
1114

1215
const logger = createLogger('main');
1316

@@ -53,6 +56,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
5356
}
5457
}
5558

59+
async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
60+
for (const repo of repos) {
61+
await db.$transaction(async (tx) => {
62+
await tx.repo.update({
63+
where: { id: repo.id },
64+
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
65+
});
66+
67+
// Add the job to the queue
68+
await queue.add('indexJob', repo);
69+
logger.info(`Added job to queue for repo ${repo.id}`);
70+
}).catch((err) => {
71+
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
72+
});
73+
}
74+
}
75+
5676
export const main = async (db: PrismaClient, context: AppContext) => {
5777
let abortController = new AbortController();
5878
let isSyncing = false;
@@ -97,50 +117,90 @@ export const main = async (db: PrismaClient, context: AppContext) => {
97117
// Sync immediately on startup
98118
await _syncConfig();
99119

100-
while (true) {
101-
const repos = await db.repo.findMany();
102-
103-
for (const repo of repos) {
104-
const lastIndexed = repo.indexedAt ?? new Date(0);
105-
106-
if (lastIndexed.getTime() > (Date.now() - DEFAULT_SETTINGS.reindexIntervalMs)) {
107-
continue;
108-
}
120+
const redis = new Redis({
121+
host: 'localhost',
122+
port: 6379,
123+
maxRetriesPerRequest: null
124+
});
125+
redis.ping().then(() => {
126+
logger.info('Connected to redis');
127+
}).catch((err) => {
128+
logger.error('Failed to connect to redis');
129+
console.error(err);
130+
process.exit(1);
131+
});
132+
133+
const indexQueue = new Queue('indexQueue');
134+
135+
const numCores = os.cpus().length;
136+
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
137+
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
138+
const worker = new Worker('indexQueue', async (job) => {
139+
const repo = job.data as Repo;
140+
141+
let indexDuration_s: number | undefined;
142+
let fetchDuration_s: number | undefined;
143+
let cloneDuration_s: number | undefined;
144+
145+
const stats = await syncGitRepository(repo, context);
146+
indexDuration_s = stats.indexDuration_s;
147+
fetchDuration_s = stats.fetchDuration_s;
148+
cloneDuration_s = stats.cloneDuration_s;
149+
150+
captureEvent('repo_synced', {
151+
vcs: 'git',
152+
codeHost: repo.external_codeHostType,
153+
indexDuration_s,
154+
fetchDuration_s,
155+
cloneDuration_s,
156+
});
109157

110-
try {
111-
let indexDuration_s: number | undefined;
112-
let fetchDuration_s: number | undefined;
113-
let cloneDuration_s: number | undefined;
114-
115-
const stats = await syncGitRepository(repo, context);
116-
indexDuration_s = stats.indexDuration_s;
117-
fetchDuration_s = stats.fetchDuration_s;
118-
cloneDuration_s = stats.cloneDuration_s;
119-
120-
captureEvent('repo_synced', {
121-
vcs: 'git',
122-
codeHost: repo.external_codeHostType,
123-
indexDuration_s,
124-
fetchDuration_s,
125-
cloneDuration_s,
126-
});
127-
} catch (err: any) {
128-
// @todo : better error handling here..
129-
logger.error(err);
130-
continue;
158+
await db.repo.update({
159+
where: {
160+
id: repo.id,
161+
},
162+
data: {
163+
indexedAt: new Date(),
164+
repoIndexingStatus: RepoIndexingStatus.INDEXED,
131165
}
132-
166+
});
167+
}, { connection: redis, concurrency: numWorkers });
168+
169+
worker.on('completed', (job) => {
170+
logger.info(`Job ${job.id} completed`);
171+
});
172+
worker.on('failed', async (job: Job | undefined, err) => {
173+
logger.info(`Job failed with error: ${err}`);
174+
if (job) {
133175
await db.repo.update({
134176
where: {
135-
id: repo.id,
177+
id: job.data.id,
136178
},
137179
data: {
138-
indexedAt: new Date(),
180+
repoIndexingStatus: RepoIndexingStatus.FAILED,
139181
}
140-
});
182+
})
141183
}
184+
});
142185

143-
await new Promise(resolve => setTimeout(resolve, 1000));
186+
while (true) {
187+
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
188+
const repos = await db.repo.findMany({
189+
where: {
190+
repoIndexingStatus: {
191+
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
192+
},
193+
OR: [
194+
{ indexedAt: null },
195+
{ indexedAt: { lt: thresholdDate } },
196+
{ repoIndexingStatus: RepoIndexingStatus.NEW }
197+
]
198+
}
199+
});
200+
logger.info(`Found ${repos.length} repos to index...`);
201+
addReposToQueue(db, indexQueue, repos);
144202

203+
204+
await new Promise(resolve => setTimeout(resolve, 1000));
145205
}
146206
}

packages/backend/src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ export type Settings = {
7474
* The interval (in milliseconds) at which the configuration file should be re-synced.
7575
*/
7676
resyncIntervalMs: number;
77+
/**
78+
* The multiple of the number of CPUs to use for indexing.
79+
*/
80+
indexConcurrencyMultiple: number;
7781
}
7882

7983
// @see : https://stackoverflow.com/a/61132308
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
-- RedefineTables
2+
PRAGMA defer_foreign_keys=ON;
3+
PRAGMA foreign_keys=OFF;
4+
CREATE TABLE "new_Repo" (
5+
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
6+
"name" TEXT NOT NULL,
7+
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
8+
"updatedAt" DATETIME NOT NULL,
9+
"indexedAt" DATETIME,
10+
"isFork" BOOLEAN NOT NULL,
11+
"isArchived" BOOLEAN NOT NULL,
12+
"metadata" JSONB NOT NULL,
13+
"cloneUrl" TEXT NOT NULL,
14+
"tenantId" INTEGER NOT NULL,
15+
"repoIndexingStatus" TEXT NOT NULL DEFAULT 'NEW',
16+
"external_id" TEXT NOT NULL,
17+
"external_codeHostType" TEXT NOT NULL,
18+
"external_codeHostUrl" TEXT NOT NULL
19+
);
20+
INSERT INTO "new_Repo" ("cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt") SELECT "cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt" FROM "Repo";
21+
DROP TABLE "Repo";
22+
ALTER TABLE "new_Repo" RENAME TO "Repo";
23+
CREATE UNIQUE INDEX "Repo_external_id_external_codeHostUrl_key" ON "Repo"("external_id", "external_codeHostUrl");
24+
PRAGMA foreign_keys=ON;
25+
PRAGMA defer_foreign_keys=OFF;

packages/db/prisma/schema.prisma

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,14 @@ datasource db {
1010
url = env("DATABASE_URL")
1111
}
1212

13+
enum RepoIndexingStatus {
14+
NEW
15+
IN_INDEX_QUEUE
16+
INDEXING
17+
INDEXED
18+
FAILED
19+
}
20+
1321
model Repo {
1422
id Int @id @default(autoincrement())
1523
name String
@@ -22,6 +30,8 @@ model Repo {
2230
cloneUrl String
2331
tenantId Int
2432

33+
repoIndexingStatus RepoIndexingStatus @default(NEW)
34+
2535
// The id of the repo in the external service
2636
external_id String
2737
// The type of the external service (e.g., github, gitlab, etc.)

supervisord.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,13 @@ autorestart=true
2929
startretries=3
3030
stdout_logfile=/dev/fd/1
3131
stdout_logfile_maxbytes=0
32+
redirect_stderr=true
33+
34+
[program:redis]
35+
command=redis-server
36+
autostart=true
37+
autorestart=true
38+
startretries=3
39+
stdout_logfile=/dev/fd/1
40+
stdout_logfile_maxbytes=0
3241
redirect_stderr=true

0 commit comments

Comments
 (0)