Skip to content

Multi tenancy support in config syncer #171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
"scripts": {
"build": "yarn workspaces run build",
"test": "yarn workspaces run test",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=single npm-run-all --print-label --parallel dev:zoekt dev:backend dev:web dev:redis",
"dev:mt": "yarn workspace @sourcebot/db prisma:migrate:dev && cross-env SOURCEBOT_TENANT_MODE=multi npm-run-all --print-label --parallel dev:zoekt:mt dev:backend dev:web dev:redis",
"dev:zoekt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=none && zoekt-webserver -index .sourcebot/index -rpc",
"dev:zoekt:mt": "export PATH=\"$PWD/bin:$PATH\" && export SRC_TENANT_ENFORCEMENT_MODE=strict && zoekt-webserver -index .sourcebot/index -rpc",
"dev:backend": "yarn workspace @sourcebot/backend dev:watch",
"dev:web": "yarn workspace @sourcebot/web dev",
"dev:redis": "docker ps --filter \"name=redis\" --format \"{{.Names}}\" | grep -q \"^redis$\" && docker rm -f redis; docker run -d --name redis -p 6379:6379 redis"

},
"devDependencies": {
"cross-env": "^7.0.3",
"npm-run-all": "^4.1.5"
}
}
6 changes: 4 additions & 2 deletions packages/backend/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { SourcebotConfigurationSchema } from "./schemas/v2.js";
import { AppContext } from "./types.js";
import { getTokenFromConfig, isRemotePath, marshalBool } from "./utils.js";

export const syncConfig = async (configPath: string, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
export const fetchConfigFromPath = async (configPath: string, signal: AbortSignal) => {
const configContent = await (async () => {
if (isRemotePath(configPath)) {
const response = await fetch(configPath, {
Expand All @@ -25,9 +25,11 @@ export const syncConfig = async (configPath: string, db: PrismaClient, signal: A
}
})();

// @todo: we should validate the configuration file's structure here.
const config = JSON.parse(stripJsonComments(configContent)) as SourcebotConfigurationSchema;
return config;
}

export const syncConfig = async (config: SourcebotConfigurationSchema, db: PrismaClient, signal: AbortSignal, ctx: AppContext) => {
for (const repoConfig of config.repos ?? []) {
switch (repoConfig.type) {
case 'github': {
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ export const DEFAULT_SETTINGS: Settings = {
reindexIntervalMs: 1000 * 60,
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
}
8 changes: 7 additions & 1 deletion packages/backend/src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import dotenv from 'dotenv';

export const getEnv = (env: string | undefined, defaultValue?: string) => {
export const getEnv = (env: string | undefined, defaultValue?: string, required?: boolean) => {
if (required && !env && !defaultValue) {
throw new Error(`Missing required environment variable`);
}

return env ?? defaultValue;
}

Expand All @@ -15,6 +19,8 @@ dotenv.config({
path: './.env',
});


export const SOURCEBOT_TENANT_MODE = getEnv(process.env.SOURCEBOT_TENANT_MODE, undefined, true);
export const SOURCEBOT_LOG_LEVEL = getEnv(process.env.SOURCEBOT_LOG_LEVEL, 'info')!;
export const SOURCEBOT_TELEMETRY_DISABLED = getEnvBoolean(process.env.SOURCEBOT_TELEMETRY_DISABLED, false)!;
export const SOURCEBOT_INSTALL_ID = getEnv(process.env.SOURCEBOT_INSTALL_ID, 'unknown')!;
Expand Down
7 changes: 4 additions & 3 deletions packages/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { isRemotePath } from "./utils.js";
import { AppContext } from "./types.js";
import { main } from "./main.js"
import { PrismaClient } from "@sourcebot/db";
import { SOURCEBOT_TENANT_MODE } from "./environment.js";


const parser = new ArgumentParser({
Expand All @@ -19,7 +20,7 @@ type Arguments = {

parser.add_argument("--configPath", {
help: "Path to config file",
required: true,
required: SOURCEBOT_TENANT_MODE === "single",
});

parser.add_argument("--cacheDir", {
Expand All @@ -28,8 +29,8 @@ parser.add_argument("--cacheDir", {
});
const args = parser.parse_args() as Arguments;

if (!isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist`);
if (SOURCEBOT_TENANT_MODE === "single" && !isRemotePath(args.configPath) && !existsSync(args.configPath)) {
console.error(`Config file ${args.configPath} does not exist, and is required in single tenant mode`);
process.exit(1);
}

Expand Down
200 changes: 160 additions & 40 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { PrismaClient, Repo, RepoIndexingStatus } from '@sourcebot/db';
import { ConfigSyncStatus, PrismaClient, Repo, Config, RepoIndexingStatus, Prisma } from '@sourcebot/db';
import { existsSync, watch } from 'fs';
import { syncConfig } from "./config.js";
import { fetchConfigFromPath, syncConfig } from "./config.js";
import { cloneRepository, fetchRepository } from "./git.js";
import { createLogger } from "./logger.js";
import { captureEvent } from "./posthog.js";
Expand All @@ -11,6 +11,8 @@ import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';
import { SOURCEBOT_TENANT_MODE } from './environment.js';
import { SourcebotConfigurationSchema } from './schemas/v2.js';

const logger = createLogger('main');

Expand Down Expand Up @@ -56,6 +58,23 @@ const syncGitRepository = async (repo: Repo, ctx: AppContext) => {
}
}

async function addConfigsToQueue(db: PrismaClient, queue: Queue, configs: Config[]) {
for (const config of configs) {
await db.$transaction(async (tx) => {
await tx.config.update({
where: { id: config.id },
data: { syncStatus: ConfigSyncStatus.IN_SYNC_QUEUE },
});

// Add the job to the queue
await queue.add('configSyncJob', config);
logger.info(`Added job to queue for config ${config.id}`);
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for config ${config.id}: ${err}`);
});
}
}

async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
Expand All @@ -67,7 +86,7 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err) => {
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
Expand All @@ -76,66 +95,166 @@ async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
export const main = async (db: PrismaClient, context: AppContext) => {
let abortController = new AbortController();
let isSyncing = false;
const _syncConfig = async () => {
if (isSyncing) {
abortController.abort();
abortController = new AbortController();
}
const _syncConfig = async (dbConfig?: Config | undefined) => {

logger.info(`Syncing configuration file ${context.configPath} ...`);
isSyncing = true;
// Fetch config object and update syncing status
let config: SourcebotConfigurationSchema;
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
logger.info(`Syncing configuration file ${context.configPath} ...`);

if (isSyncing) {
abortController.abort();
abortController = new AbortController();
}
config = await fetchConfigFromPath(context.configPath, abortController.signal);
isSyncing = true;
break;
case 'multi':
if(!dbConfig) {
throw new Error('config object is required in multi tenant mode');
}
config = dbConfig.data as SourcebotConfigurationSchema
db.config.update({
where: {
id: dbConfig.id,
},
data: {
syncStatus: ConfigSyncStatus.SYNCING,
}
})
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}

// Attempt to sync the config, handle failure cases
try {
const { durationMs } = await measure(() => syncConfig(context.configPath, db, abortController.signal, context))
logger.info(`Synced configuration file ${context.configPath} in ${durationMs / 1000}s`);
const { durationMs } = await measure(() => syncConfig(config, db, abortController.signal, context))
logger.info(`Synced configuration in ${durationMs / 1000}s`);
isSyncing = false;
} catch (err: any) {
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file ${context.configPath} with error:`);
console.log(err);
switch(SOURCEBOT_TENANT_MODE) {
case 'single':
if (err.name === "AbortError") {
// @note: If we're aborting, we don't want to set isSyncing to false
// since it implies another sync is in progress.
} else {
isSyncing = false;
logger.error(`Failed to sync configuration file with error:`);
console.log(err);
}
break;
case 'multi':
if (dbConfig) {
await db.config.update({
where: {
id: dbConfig.id,
},
data: {
syncStatus: ConfigSyncStatus.FAILED,
}
})
logger.error(`Failed to sync configuration ${dbConfig.id} with error: ${err}`);
} else {
logger.error(`DB config undefined. Failed to sync configuration with error: ${err}`);
}
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}
}
}

// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}

// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);

// Sync immediately on startup
await _syncConfig();

/////////////////////////////
// Init Redis
/////////////////////////////
const redis = new Redis({
host: 'localhost',
port: 6379,
maxRetriesPerRequest: null
});
redis.ping().then(() => {
logger.info('Connected to redis');
}).catch((err) => {
}).catch((err: unknown) => {
logger.error('Failed to connect to redis');
console.error(err);
process.exit(1);
});

/////////////////////////////
// Setup config sync watchers
/////////////////////////////
switch (SOURCEBOT_TENANT_MODE) {
case 'single':
// Re-sync on file changes if the config file is local
if (!isRemotePath(context.configPath)) {
watch(context.configPath, () => {
logger.info(`Config file ${context.configPath} changed. Re-syncing...`);
_syncConfig();
});
}

// Re-sync at a fixed interval
setInterval(() => {
_syncConfig();
}, DEFAULT_SETTINGS.resyncIntervalMs);

// Sync immediately on startup
await _syncConfig();
break;
case 'multi':
// Setup config sync queue and workers
const configSyncQueue = new Queue('configSyncQueue');
const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.configSyncConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting config sync max concurrency to ${numWorkers}`);
const configSyncWorker = new Worker('configSyncQueue', async (job: Job) => {
const config = job.data as Config;
await _syncConfig(config);
}, { connection: redis, concurrency: numWorkers });
configSyncWorker.on('completed', async (job: Job) => {
logger.info(`Config sync job ${job.id} completed`);

const config = job.data as Config;
await db.config.update({
where: {
id: config.id,
},
data: {
syncStatus: ConfigSyncStatus.SYNCED,
}
})
});
configSyncWorker.on('failed', (job: Job | undefined, err: unknown) => {
logger.info(`Config sync job failed with error: ${err}`);
});

setInterval(async () => {
const configs = await db.config.findMany({
where: {
syncStatus: ConfigSyncStatus.SYNC_NEEDED,
}
});

logger.info(`Found ${configs.length} configs to sync...`);
addConfigsToQueue(db, configSyncQueue, configs);
}, 1000);
break;
default:
throw new Error(`Invalid SOURCEBOT_TENANT_MODE: ${SOURCEBOT_TENANT_MODE}`);
}


/////////////////////////
// Setup repo indexing
/////////////////////////
const indexQueue = new Queue('indexQueue');

const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job) => {
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job: Job) => {
const repo = job.data as Repo;

let indexDuration_s: number | undefined;
Expand Down Expand Up @@ -166,10 +285,10 @@ export const main = async (db: PrismaClient, context: AppContext) => {
});
}, { connection: redis, concurrency: numWorkers });

worker.on('completed', (job) => {
worker.on('completed', (job: Job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err) => {
worker.on('failed', async (job: Job | undefined, err: unknown) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
Expand All @@ -183,6 +302,7 @@ export const main = async (db: PrismaClient, context: AppContext) => {
}
});

// Repo indexing loop
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({
Expand Down
4 changes: 4 additions & 0 deletions packages/backend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ export type Settings = {
* The multiple of the number of CPUs to use for indexing.
*/
indexConcurrencyMultiple: number;
/**
* The multiple of the number of CPUs to use for syncing the configuration.
*/
configSyncConcurrencyMultiple: number;
}

// @see : https://stackoverflow.com/a/61132308
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
-- CreateTable
CREATE TABLE "Config" (
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
"data" JSONB NOT NULL,
"createdAt" DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
"updatedAt" DATETIME NOT NULL,
"syncedAt" DATETIME,
"syncStatus" TEXT NOT NULL DEFAULT 'SYNC_NEEDED',
"orgId" INTEGER NOT NULL,
CONSTRAINT "Config_orgId_fkey" FOREIGN KEY ("orgId") REFERENCES "Org" ("id") ON DELETE CASCADE ON UPDATE CASCADE
);
Loading