Skip to content

parallelize repo indexing #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 23 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
9e5a6fd
hacked together a example of using zoekt grpc api
brendan-kellam Jan 9, 2025
9a77ee0
provide tenant id to zoekt git indexer
brendan-kellam Jan 9, 2025
8112153
update zoekt version to point to multitenant branch
brendan-kellam Jan 9, 2025
b153dd2
Merge branch 'main' into multi_tenancy_experiments
msukkari Jan 10, 2025
f6a3c00
merge main into mt branch
msukkari Jan 14, 2025
338a3b6
pipe tenant id through header to zoekt
msukkari Jan 14, 2025
f0408c0
remove incorrect submodule reference and settings typo
msukkari Jan 14, 2025
f5a602f
update zoekt commit
msukkari Jan 14, 2025
668e21b
remove unused yarn script
msukkari Jan 14, 2025
8f38153
remove unused grpc client in web server
msukkari Jan 14, 2025
fa29817
remove unneeded deps and improve tenant id log
msukkari Jan 14, 2025
fdc4aac
merge v3 into mt branch
msukkari Jan 14, 2025
3c9d36e
pass tenant id when creating repo in db
msukkari Jan 14, 2025
3b6bfd3
Merge branch 'v3' into multi_tenancy_experiments
msukkari Jan 14, 2025
4fad4c5
add mt yarn script
msukkari Jan 14, 2025
8d52006
merge v3 changes
msukkari Jan 15, 2025
a576b0a
add pol of bullmq into backend
msukkari Jan 15, 2025
1b06328
add better error handling and concurrency setting
msukkari Jan 15, 2025
39896b8
spin up redis instance in dockerfile
msukkari Jan 15, 2025
d95bc1c
cleanup transaction logic when adding repos to index queue
msukkari Jan 15, 2025
1cec06c
add NEW index status fetch condition
msukkari Jan 15, 2025
b4527e3
Merge branch 'v3' into decouple_indexing
msukkari Jan 15, 2025
0456833
move bullmq deps to backend
msukkari Jan 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ ENV POSTHOG_PAPIK=$POSTHOG_PAPIK
# ENV SOURCEBOT_TELEMETRY_DISABLED=1

# Configure dependencies
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq
RUN apk add --no-cache git ca-certificates bind-tools tini jansson wget supervisor uuidgen curl perl jq redis

# Configure zoekt
COPY vendor/zoekt/install-ctags-alpine.sh .
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
"scripts": {
"build": "yarn workspaces run build",
"test": "yarn workspaces run test",
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web",
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web",
"dev": "npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
"dev:web": "yarn workspace @sourcebot/web dev"
"dev:web": "yarn workspace @sourcebot/web dev",
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"

},
"devDependencies": {
"npm-run-all": "^4.1.5"
Expand Down
4 changes: 3 additions & 1 deletion packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
"posthog-node": "^4.2.1",
"simple-git": "^3.27.0",
"strip-json-comments": "^5.0.1",
"winston": "^3.15.0"
"winston": "^3.15.0",
"bullmq": "^5.34.10",
"ioredis": "^5.4.2"
}
}
3 changes: 2 additions & 1 deletion packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { Settings } from "./types.js";
export const DEFAULT_SETTINGS: Settings = {
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
autoDeleteStaleRepos: true,
reindexIntervalMs: 1000 * 60 * 60, // 1 hour in milliseconds
reindexIntervalMs: 1000 * 60,
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
indexConcurrencyMultiple: 3,
}
132 changes: 96 additions & 36 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PrismaClient, Repo } from '@sourcebot/db';
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
import { existsSync, watch } from 'fs';
import { syncConfig } from "./config.js";
import { cloneRepository, fetchRepository } from "./git.js";
Expand All @@ -8,6 +8,9 @@ import { AppContext } from "./types.js";
import { getRepoPath, isRemotePath, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';

const logger = createLogger('main');

Expand Down Expand Up @@ -53,6 +56,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
}
}

async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
await tx.repo.update({
where: { id: repo.id },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
});

// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
}

export const main = async (db: PrismaClient, context: AppContext) => {
let abortController = new AbortController();
let isSyncing = false;
Expand Down Expand Up @@ -97,50 +117,90 @@ export const main = async (db: PrismaClient, context: AppContext) => {
// Sync immediately on startup
await _syncConfig();

while (true) {
const repos = await db.repo.findMany();

for (const repo of repos) {
const lastIndexed = repo.indexedAt ?? new Date(0);

if (lastIndexed.getTime() > (Date.now() - DEFAULT_SETTINGS.reindexIntervalMs)) {
continue;
}
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err) => {
logger.error('Failed to connect to redis');
console.error(err);
process.exit(1);
});

const indexQueue = new Queue('indexQueue');

const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job) => {
const repo = job.data as Repo;

let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;

const stats = await syncGitRepository(repo, context);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;

captureEvent('repo_synced', {
vcs: 'git',
codeHost: repo.external_codeHostType,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});

try {
let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;

const stats = await syncGitRepository(repo, context);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;

captureEvent('repo_synced', {
vcs: 'git',
codeHost: repo.external_codeHostType,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});
} catch (err: any) {
// @todo : better error handling here..
logger.error(err);
continue;
await db.repo.update({
where: {
id: repo.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.INDEXED,
}

});
}, { connection: redis, concurrency: numWorkers });

worker.on('completed', (job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
where: {
id: repo.id,
id: job.data.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.FAILED,
}
});
})
}
});

await new Promise(resolve => setTimeout(resolve, 1000));
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({
where: {
repoIndexingStatus: {
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
{ repoIndexingStatus: RepoIndexingStatus.NEW }
]
}
});
logger.info(`Found ${repos.length} repos to index...`);
addReposToQueue(db, indexQueue, repos);


await new Promise(resolve => setTimeout(resolve, 1000));
}
}
4 changes: 4 additions & 0 deletions packages/backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ export type Settings = {
* The interval (in milliseconds) at which the configuration file should be re-synced.
*/
resyncIntervalMs: number;
/**
* The multiple of the number of CPUs to use for indexing.
*/
indexConcurrencyMultiple: number;
}

// @see : https://stackoverflow.com/a/61132308
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
-- RedefineTables
PRAGMA defer_foreign_keys=ON;
PRAGMA foreign_keys=OFF;
CREATE TABLE "new_Repo" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"name" TEXT NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"indexedAt" DATETIME,
"isFork" BOOLEAN NOT NULL,
"isArchived" BOOLEAN NOT NULL,
"metadata" JSONB NOT NULL,
"cloneUrl" TEXT NOT NULL,
"tenantId" INTEGER NOT NULL,
"repoIndexingStatus" TEXT NOT NULL DEFAULT 'NEW',
"external_id" TEXT NOT NULL,
"external_codeHostType" TEXT NOT NULL,
"external_codeHostUrl" TEXT NOT NULL
);
INSERT INTO "new_Repo" ("cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt") SELECT "cloneUrl", "createdAt", "external_codeHostType", "external_codeHostUrl", "external_id", "id", "indexedAt", "isArchived", "isFork", "metadata", "name", "tenantId", "updatedAt" FROM "Repo";
DROP TABLE "Repo";
ALTER TABLE "new_Repo" RENAME TO "Repo";
CREATE UNIQUE INDEX "Repo_external_id_external_codeHostUrl_key" ON "Repo"("external_id", "external_codeHostUrl");
PRAGMA foreign_keys=ON;
PRAGMA defer_foreign_keys=OFF;
10 changes: 10 additions & 0 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ datasource db {
url = env("DATABASE_URL")
}

enum RepoIndexingStatus {
NEW
IN_INDEX_QUEUE
INDEXING
INDEXED
FAILED
}

model Repo {
id Int @id @default(autoincrement())
name String
Expand All @@ -22,6 +30,8 @@ model Repo {
cloneUrl String
tenantId Int

repoIndexingStatus RepoIndexingStatus @default(NEW)

// The id of the repo in the external service
external_id String
// The type of the external service (e.g., github, gitlab, etc.)
Expand Down
9 changes: 9 additions & 0 deletions supervisord.conf
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,13 @@ autorestart=true
startretries=3
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
redirect_stderr=true

[program:redis]
command=redis-server
autostart=true
autorestart=true
startretries=3
stdout_logfile=/dev/fd/1
stdout_logfile_maxbytes=0
redirect_stderr=true
Loading