Skip to content

Commit fde51e3

Browse files
committed
[wip] initial mt support in config syncer
1 parent 5d253ff commit fde51e3

File tree

10 files changed

+190
-43
lines changed

10 files changed

+190
-43
lines changed

package.json

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,16 @@
66
"scripts": {
77
"build": "yarn workspaces run build",
88
"test": "yarn workspaces run test",
9-
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
10-
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
9+
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=single npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
10+
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=multi npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
1111
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
1212
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
1313
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
1414
"dev:web": "yarn workspace @sourcebot/web dev",
1515
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"
16-
1716
},
1817
"devDependencies": {
18+
"cross-env": "^7.0.3",
1919
"npm-run-all": "^4.1.5"
2020
}
2121
}

packages/backend/src/config.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { SourcebotConfigurationSchema } from "./schemas/v2.js";
77
import { AppContext } from "./types.js";
88
import { getTokenFromConfig, isRemotePath, marshalBool } from "./utils.js";
99

10-
export const syncConfig = async (configPath: string, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
10+
export const fetchConfigFromPath = async (configPath: string, signal: AbortSignal) => {
1111
const configContent = await (async () => {
1212
if (isRemotePath(configPath)) {
1313
const response = await fetch(configPath, {
@@ -25,9 +25,11 @@ export const syncConfig = async (configPath: string, db: PrismaClient, signal: A
2525
}
2626
})();
2727

28-
// @todo: we should validate the configuration file's structure here.
2928
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema;
29+
return config;
30+
}
3031

32+
export const syncConfig = async (config: SourcebotConfigurationSchema, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
3133
for (const repoConfig of config.repos ?? []) {
3234
switch (repoConfig.type) {
3335
case 'github': {

packages/backend/src/constants.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ export const DEFAULT_SETTINGS: Settings = {
99
reindexIntervalMs: 1000 * 60,
1010
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
1111
indexConcurrencyMultiple: 3,
12+
configSyncConcurrencyMultiple: 3,
1213
}

packages/backend/src/environment.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import dotenv from 'dotenv';
22

3-
export const getEnv = (env: string | undefined, defaultValue?: string) => {
3+
export const getEnv = (env: string | undefined, defaultValue?: string, required?: boolean) => {
4+
if (required && !env && !defaultValue) {
5+
throw new Error(`Missing required environment variable`);
6+
}
7+
48
return env ?? defaultValue;
59
}
610

@@ -15,6 +19,8 @@ dotenv.config({
1519
path: './.env',
1620
});
1721

22+
23+
export const SOURCEBOT_TENANT_MODE = getEnv(process.env.SOURCEBOT_TENANT_MODE, undefined, true);
1824
export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!;
1925
export const SOURCEBOT_TELEMETRY_DISABLED = getEnvBoolean(process.env.SOURCEBOT_TELEMETRY_DISABLED, false)!;
2026
export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'unknown')!;

packages/backend/src/index.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { isRemotePath } from "./utils.js";
66
import { AppContext } from "./types.js";
77
import { main } from "./main.js"
88
import { PrismaClient } from "@sourcebot/db";
9+
import { SOURCEBOT_TENANT_MODE } from "./environment.js";
910

1011

1112
const parser = new ArgumentParser({
@@ -19,7 +20,7 @@ type Arguments = {
1920

2021
parser.add_argument("--configPath", {
2122
help: "Path to config file",
22-
required: true,
23+
required: false,
2324
});
2425

2526
parser.add_argument("--cacheDir", {
@@ -28,8 +29,8 @@ parser.add_argument("--cacheDir", {
2829
});
2930
const args = parser.parse_args() as Arguments;
3031

31-
if (!isRemotePath(args.configPath) && !existsSync(args.configPath)) {
32-
console.error(`Config file ${args.configPath} does not exist`);
32+
if (SOURCEBOT_TENANT_MODE === "single" && !isRemotePath(args.configPath) && !existsSync(args.configPath)) {
33+
console.error(`Config file ${args.configPath} does not exist, and is required in single tenant mode`);
3334
process.exit(1);
3435
}
3536

packages/backend/src/main.ts

Lines changed: 107 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
1+
import { ConfigSyncStatus, PrismaClient, Repo, Config, RepoIndexingStatus, Prisma } from '@sourcebot/db';
22
import { existsSync, watch } from 'fs';
3-
import { syncConfig } from "./config.js";
3+
import { fetchConfigFromPath, syncConfig } from "./config.js";
44
import { cloneRepository, fetchRepository } from "./git.js";
55
import { createLogger } from "./logger.js";
66
import { captureEvent } from "./posthog.js";
@@ -11,6 +11,8 @@ import { DEFAULT_SETTINGS } from './constants.js';
1111
import { Queue, Worker, Job } from 'bullmq';
1212
import { Redis } from 'ioredis';
1313
import * as os from 'os';
14+
import { SOURCEBOT_TENANT_MODE } from './environment.js';
15+
import { SourcebotConfigurationSchema } from './schemas/v2.js';
1416

1517
const logger = createLogger('main');
1618

@@ -56,6 +58,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
5658
}
5759
}
5860

61+
async function addConfigsToQueue(db: PrismaClient, queue: Queue, configs: Config[]) {
62+
for (const config of configs) {
63+
await db.$transaction(async (tx) => {
64+
await tx.config.update({
65+
where: { id: config.id },
66+
data: { syncStatus: ConfigSyncStatus.IN_SYNC_QUEUE },
67+
});
68+
69+
// Add the job to the queue
70+
await queue.add('configSyncJob', config);
71+
logger.info(`Added job to queue for config ${config.id}`);
72+
}).catch((err: unknown) => {
73+
logger.error(`Failed to add job to queue for config ${config.id}: ${err}`);
74+
});
75+
}
76+
}
77+
5978
async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
6079
for (const repo of repos) {
6180
await db.$transaction(async (tx) => {
@@ -67,7 +86,7 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
6786
// Add the job to the queue
6887
await queue.add('indexJob', repo);
6988
logger.info(`Added job to queue for repo ${repo.id}`);
70-
}).catch((err) => {
89+
}).catch((err: unknown) => {
7190
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
7291
});
7392
}
@@ -76,66 +95,123 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
7695
export const main = async (db: PrismaClient, context: AppContext) => {
7796
let abortController = new AbortController();
7897
let isSyncing = false;
79-
const _syncConfig = async () => {
98+
const _syncConfig = async (dbConfig?: Prisma.JsonValue | undefined) => {
8099
if (isSyncing) {
81100
abortController.abort();
82101
abortController = new AbortController();
83102
}
103+
104+
let config: SourcebotConfigurationSchema;
105+
switch (SOURCEBOT_TENANT_MODE) {
106+
case 'single':
107+
logger.info(`Syncing configuration file ${context.configPath} ...`);
108+
config = await fetchConfigFromPath(context.configPath, abortController.signal);
109+
break;
110+
case 'multi':
111+
if(!dbConfig) {
112+
throw new Error('config object is required in multi tenant mode');
113+
}
114+
config = dbConfig as SourcebotConfigurationSchema
115+
break;
116+
default:
117+
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
118+
}
84119

85-
logger.info(`Syncing configuration file ${context.configPath} ...`);
86120
isSyncing = true;
87-
88121
try {
89-
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
90-
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
122+
const { durationMs } = await measure(() => syncConfig(config, db, abortController.signal, context))
123+
logger.info(`Synced configuration file in ${durationMs / 1000}s`);
91124
isSyncing = false;
92125
} catch (err: any) {
93126
if (err.name === "AbortError") {
94127
// @note: If we're aborting, we don't want to set isSyncing to false
95128
// since it implies another sync is in progress.
96129
} else {
97130
isSyncing = false;
98-
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
131+
logger.error(`Failed to sync configuration file with error:`);
99132
console.log(err);
100133
}
101134
}
102135
}
103136

104-
// Re-sync on file changes if the config file is local
105-
if (!isRemotePath(context.configPath)) {
106-
watch(context.configPath, () => {
107-
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
108-
_syncConfig();
109-
});
110-
}
111-
112-
// Re-sync at a fixed interval
113-
setInterval(() => {
114-
_syncConfig();
115-
}, DEFAULT_SETTINGS.resyncIntervalMs);
116-
117-
// Sync immediately on startup
118-
await _syncConfig();
119-
137+
/////////////////////////////
138+
// Init Redis
139+
/////////////////////////////
120140
const redis = new Redis({
121141
host: 'localhost',
122142
port: 6379,
123143
maxRetriesPerRequest: null
124144
});
125145
redis.ping().then(() => {
126146
logger.info('Connected to redis');
127-
}).catch((err) => {
147+
}).catch((err: unknown) => {
128148
logger.error('Failed to connect to redis');
129149
console.error(err);
130150
process.exit(1);
131151
});
132152

153+
/////////////////////////////
154+
// Setup config sync watchers
155+
/////////////////////////////
156+
switch (SOURCEBOT_TENANT_MODE) {
157+
case 'single':
158+
// Re-sync on file changes if the config file is local
159+
if (!isRemotePath(context.configPath)) {
160+
watch(context.configPath, () => {
161+
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
162+
_syncConfig();
163+
});
164+
}
165+
166+
// Re-sync at a fixed interval
167+
setInterval(() => {
168+
_syncConfig();
169+
}, DEFAULT_SETTINGS.resyncIntervalMs);
170+
171+
// Sync immediately on startup
172+
await _syncConfig();
173+
break;
174+
case 'multi':
175+
const configSyncQueue = new Queue('configSyncQueue');
176+
const numCores = os.cpus().length;
177+
const numWorkers = numCores * DEFAULT_SETTINGS.configSyncConcurrencyMultiple;
178+
logger.info(`Detected ${numCores} cores. Setting config sync max concurrency to ${numWorkers}`);
179+
const configSyncWorker = new Worker('configSyncQueue', async (job: Job) => {
180+
const config = job.data as Config;
181+
await _syncConfig(config.data);
182+
}, { connection: redis, concurrency: numWorkers });
183+
configSyncWorker.on('completed', (job: Job) => {
184+
logger.info(`Config sync job ${job.id} completed`);
185+
});
186+
configSyncWorker.on('failed', (job: Job | undefined, err: unknown) => {
187+
logger.info(`Config sync job failed with error: ${err}`);
188+
});
189+
190+
setInterval(async () => {
191+
const configs = await db.config.findMany({
192+
where: {
193+
syncStatus: ConfigSyncStatus.SYNC_NEEDED,
194+
}
195+
});
196+
197+
logger.info(`Found ${configs.length} configs to sync...`);
198+
addConfigsToQueue(db, configSyncQueue, configs);
199+
}, 1000);
200+
break;
201+
default:
202+
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
203+
}
204+
205+
206+
/////////////////////////
207+
// Setup repo indexing
208+
/////////////////////////
133209
const indexQueue = new Queue('indexQueue');
134210

135211
const numCores = os.cpus().length;
136212
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
137-
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
138-
const worker = new Worker('indexQueue', async (job) => {
213+
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
214+
const worker = new Worker('indexQueue', async (job: Job) => {
139215
const repo = job.data as Repo;
140216

141217
let indexDuration_s: number | undefined;
@@ -166,10 +242,10 @@ export const main = async (db: PrismaClient, context: AppContext) => {
166242
});
167243
}, { connection: redis, concurrency: numWorkers });
168244

169-
worker.on('completed', (job) => {
245+
worker.on('completed', (job: Job) => {
170246
logger.info(`Job ${job.id} completed`);
171247
});
172-
worker.on('failed', async (job: Job | undefined, err) => {
248+
worker.on('failed', async (job: Job | undefined, err: unknown) => {
173249
logger.info(`Job failed with error: ${err}`);
174250
if (job) {
175251
await db.repo.update({
@@ -183,6 +259,7 @@ export const main = async (db: PrismaClient, context: AppContext) => {
183259
}
184260
});
185261

262+
// Repo indexing loop
186263
while (true) {
187264
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
188265
const repos = await db.repo.findMany({

packages/backend/src/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ export type Settings = {
7878
* The multiple of the number of CPUs to use for indexing.
7979
*/
8080
indexConcurrencyMultiple: number;
81+
/**
82+
* The multiple of the number of CPUs to use for syncing the configuration.
83+
*/
84+
configSyncConcurrencyMultiple: number;
8185
}
8286

8387
// @see : https://stackoverflow.com/a/61132308
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- CreateTable
2+
CREATE TABLE "Config" (
3+
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
4+
"data" JSONB NOT NULL,
5+
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
6+
"updatedAt" DATETIME NOT NULL,
7+
"syncedAt" DATETIME,
8+
"syncStatus" TEXT NOT NULL DEFAULT 'SYNC_NEEDED',
9+
"orgId" INTEGER NOT NULL,
10+
CONSTRAINT "Config_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org" ("id") ON DELETE CASCADE ON UPDATE CASCADE
11+
);

packages/db/prisma/schema.prisma

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ enum RepoIndexingStatus {
1818
FAILED
1919
}
2020

21+
enum ConfigSyncStatus {
22+
SYNC_NEEDED
23+
IN_SYNC_QUEUE
24+
SYNCING
25+
SYNCED
26+
FAILED
27+
}
28+
2129
model Repo {
2230
id Int @id @default(autoincrement())
2331
name String
@@ -42,12 +50,27 @@ model Repo {
4250
@@unique([external_id, external_codeHostUrl])
4351
}
4452

53+
model Config {
54+
id Int @id @default(autoincrement())
55+
data Json
56+
createdAt DateTime @default(now())
57+
updatedAt DateTime @updatedAt
58+
syncedAt DateTime?
59+
60+
syncStatus ConfigSyncStatus @default(SYNC_NEEDED)
61+
62+
// The organization that owns this config
63+
org Org @relation(fields: [orgId], references: [id], onDelete: Cascade)
64+
orgId Int
65+
}
66+
4567
model Org {
4668
id Int @id @default(autoincrement())
4769
name String
4870
createdAt DateTime @default(now())
4971
updatedAt DateTime @updatedAt
5072
members UserToOrg[]
73+
configs Config[]
5174
}
5275

5376
model UserToOrg {

0 commit comments

Comments
 (0)