Skip to content

Declarative connection cleanup + improvements #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ ENV NEXT_TELEMETRY_DISABLED=1
ENV DATA_DIR=/data
ENV DATA_CACHE_DIR=$DATA_DIR/.sourcebot
ENV DB_DATA_DIR=$DATA_CACHE_DIR/db
ENV REDIS_DATA_DIR=$DATA_CACHE_DIR/redis
ENV DB_NAME=sourcebot
ENV DATABASE_URL="postgresql://postgres@localhost:5432/sourcebot"
ENV DATABASE_URL="postgresql://postgres@localhost:5432/$DB_NAME"
ENV REDIS_URL="redis://localhost:6379"
ENV SRC_TENANT_ENFORCEMENT_MODE=strict

Expand Down
5 changes: 5 additions & 0 deletions entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ if [ ! -d "$DB_DATA_DIR" ]; then
su postgres -c "initdb -D $DB_DATA_DIR"
fi

# Create the redis data directory if it doesn't exist
if [ ! -d "$REDIS_DATA_DIR" ]; then
mkdir -p $REDIS_DATA_DIR
fi

if [ -z "$SOURCEBOT_ENCRYPTION_KEY" ]; then
echo -e "\e[33m[Warning] SOURCEBOT_ENCRYPTION_KEY is not set.\e[0m"

Expand Down
10 changes: 4 additions & 6 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma, RepoIndexingStatus } from "@sourcebot/db";
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq';
import { Settings } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "./logger.js";
import os from 'os';
import { Redis } from 'ioredis';
import { RepoData, compileGithubConfig, compileGitlabConfig, compileGiteaConfig, compileGerritConfig } from "./repoCompileUtils.js";
import { BackendError, BackendException } from "@sourcebot/error";
Expand Down Expand Up @@ -41,10 +40,9 @@ export class ConnectionManager implements IConnectionManager {
this.queue = new Queue<JobPayload>(QUEUE_NAME, {
connection: redis,
});
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis,
concurrency: numCores * this.settings.configSyncConcurrencyMultiple,
concurrency: this.settings.maxConnectionSyncJobConcurrency,
});
this.worker.on('completed', this.onSyncJobCompleted.bind(this));
this.worker.on('failed', this.onSyncJobFailed.bind(this));
Expand Down Expand Up @@ -261,11 +259,11 @@ export class ConnectionManager implements IConnectionManager {
});
}

private async onSyncJobFailed(job: Job | undefined, err: unknown) {
private async onSyncJobFailed(job: Job<JobPayload> | undefined, err: unknown) {
this.logger.info(`Connection sync job failed with error: ${err}`);
Sentry.captureException(err, {
tags: {
repoId: job?.data.repo.id,
connectionid: job?.data.connectionId,
jobId: job?.id,
queue: QUEUE_NAME,
}
Expand Down
14 changes: 7 additions & 7 deletions packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import { Settings } from "./types.js";
*/
export const DEFAULT_SETTINGS: Settings = {
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
maxTrigramCount: 20000,
reindexIntervalMs: 1000 * 60 * 60, // 1 hour
resyncConnectionPollingIntervalMs: 1000,
reindexRepoPollingIntervalMs: 1000,
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
gcConcurrencyMultiple: 1,
gcGracePeriodMs: 10 * 1000, // 10 seconds
resyncConnectionPollingIntervalMs: 1000 * 1, // 1 second
reindexRepoPollingIntervalMs: 1000 * 1, // 1 second
maxConnectionSyncJobConcurrency: 8,
maxRepoIndexingJobConcurrency: 8,
maxRepoGarbageCollectionJobConcurrency: 8,
repoGarbageCollectionGracePeriodMs: 10 * 1000, // 10 seconds
repoIndexTimeoutMs: 1000 * 60 * 60 * 2, // 2 hours
maxTrigramCount: 20000,
}
3 changes: 3 additions & 0 deletions packages/backend/src/github.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ const getReposOwnedByUsers = async (users: string[], isAuthenticated: boolean, o
};
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to fetch repositories for user ${user}.`, error);

if (isHttpError(error, 404)) {
logger.error(`User ${user} not found or no access`);
Expand Down Expand Up @@ -302,6 +303,7 @@ const getReposForOrgs = async (orgs: string[], octokit: Octokit, signal: AbortSi
};
} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to fetch repositories for org ${org}.`, error);

if (isHttpError(error, 404)) {
logger.error(`Organization ${org} not found or no access`);
Expand Down Expand Up @@ -349,6 +351,7 @@ const getRepos = async (repoList: string[], octokit: Octokit, signal: AbortSigna

} catch (error) {
Sentry.captureException(error);
logger.error(`Failed to fetch repository ${repo}.`, error);

if (isHttpError(error, 404)) {
logger.error(`Repository ${repo} not found or no access`);
Expand Down
3 changes: 3 additions & 0 deletions packages/backend/src/gitlab.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
};
} catch (e: any) {
Sentry.captureException(e);
logger.error(`Failed to fetch projects for group ${group}.`, e);

const status = e?.cause?.response?.status;
if (status === 404) {
Expand Down Expand Up @@ -118,6 +119,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
};
} catch (e: any) {
Sentry.captureException(e);
logger.error(`Failed to fetch projects for user ${user}.`, e);

const status = e?.cause?.response?.status;
if (status === 404) {
Expand Down Expand Up @@ -152,6 +154,7 @@ export const getGitLabReposFromConfig = async (config: GitlabConnectionConfig, o
};
} catch (e: any) {
Sentry.captureException(e);
logger.error(`Failed to fetch project ${project}.`, e);

const status = e?.cause?.response?.status;

Expand Down
9 changes: 3 additions & 6 deletions packages/backend/src/repoManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import { getRepoPath, getTokenFromConfig, measure, getShardPrefix } from "./util
import { cloneRepository, fetchRepository } from "./git.js";
import { existsSync, readdirSync, promises } from 'fs';
import { indexGitRepository } from "./zoekt.js";
import os from 'os';
import { PromClient } from './promClient.js';
import * as Sentry from "@sentry/node";

Expand Down Expand Up @@ -43,15 +42,13 @@ export class RepoManager implements IRepoManager {
private promClient: PromClient,
private ctx: AppContext,
) {
const numCores = os.cpus().length;

// Repo indexing
this.indexQueue = new Queue<RepoIndexingPayload>(REPO_INDEXING_QUEUE, {
connection: redis,
});
this.indexWorker = new Worker(REPO_INDEXING_QUEUE, this.runIndexJob.bind(this), {
connection: redis,
concurrency: numCores * this.settings.indexConcurrencyMultiple,
concurrency: this.settings.maxRepoIndexingJobConcurrency,
});
this.indexWorker.on('completed', this.onIndexJobCompleted.bind(this));
this.indexWorker.on('failed', this.onIndexJobFailed.bind(this));
Expand All @@ -62,7 +59,7 @@ export class RepoManager implements IRepoManager {
});
this.gcWorker = new Worker(REPO_GC_QUEUE, this.runGarbageCollectionJob.bind(this), {
connection: redis,
concurrency: numCores * this.settings.gcConcurrencyMultiple,
concurrency: this.settings.maxRepoGarbageCollectionJobConcurrency,
});
this.gcWorker.on('completed', this.onGarbageCollectionJobCompleted.bind(this));
this.gcWorker.on('failed', this.onGarbageCollectionJobFailed.bind(this));
Expand Down Expand Up @@ -396,7 +393,7 @@ export class RepoManager implements IRepoManager {
////////////////////////////////////


const thresholdDate = new Date(Date.now() - this.settings.gcGracePeriodMs);
const thresholdDate = new Date(Date.now() - this.settings.repoGarbageCollectionGracePeriodMs);
const reposWithNoConnections = await this.db.repo.findMany({
where: {
repoIndexingStatus: {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "Connection" ADD COLUMN "isDeclarative" BOOLEAN NOT NULL DEFAULT false;
1 change: 1 addition & 0 deletions packages/db/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ model Connection {
id Int @id @default(autoincrement())
name String
config Json
isDeclarative Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
syncedAt DateTime?
Expand Down
32 changes: 20 additions & 12 deletions packages/schemas/src/v3/index.schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,45 @@ const schema = {
"properties": {
"maxFileSize": {
"type": "number",
"description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed."
"description": "The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.",
"minimum": 1
},
"maxTrigramCount": {
"type": "number",
"description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed."
"description": "The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.",
"minimum": 1
},
"reindexIntervalMs": {
"type": "number",
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories."
"description": "The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.",
"minimum": 1
},
"resyncConnectionPollingIntervalMs": {
"type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced."
"description": "The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds.",
"minimum": 1
},
"reindexRepoPollingIntervalMs": {
"type": "number",
"description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed."
"description": "The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds.",
"minimum": 1
},
"indexConcurrencyMultiple": {
"maxConnectionSyncJobConcurrency": {
"type": "number",
"description": "The multiple of the number of CPUs to use for indexing."
"description": "The number of connection sync jobs to run concurrently. Defaults to 8.",
"minimum": 1
},
"configSyncConcurrencyMultiple": {
"maxRepoIndexingJobConcurrency": {
"type": "number",
"description": "The multiple of the number of CPUs to use for syncing the configuration."
"description": "The number of repo indexing jobs to run concurrently. Defaults to 8.",
"minimum": 1
},
"gcConcurrencyMultiple": {
"maxRepoGarbageCollectionJobConcurrency": {
"type": "number",
"description": "The multiple of the number of CPUs to use for garbage collection."
"description": "The number of repo GC jobs to run concurrently. Defaults to 8.",
"minimum": 1
},
"gcGracePeriodMs": {
"repoGarbageCollectionGracePeriodMs": {
"type": "number",
"description": "The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded."
},
Expand Down
24 changes: 12 additions & 12 deletions packages/schemas/src/v3/index.type.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,41 +28,41 @@ export interface SourcebotConfig {
*/
export interface Settings {
/**
* The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed.
* The maximum size of a file (in bytes) to be indexed. Files that exceed this maximum will not be indexed. Defaults to 2MB.
*/
maxFileSize?: number;
/**
* The maximum number of trigrams per document. Files that exceed this maximum will not be indexed.
* The maximum number of trigrams per document. Files that exceed this maximum will not be indexed. Default to 20000.
*/
maxTrigramCount?: number;
/**
* The interval (in milliseconds) at which the indexer should re-index all repositories.
* The interval (in milliseconds) at which the indexer should re-index all repositories. Defaults to 1 hour.
*/
reindexIntervalMs?: number;
/**
* The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced.
* The polling rate (in milliseconds) at which the db should be checked for connections that need to be re-synced. Defaults to 5 seconds.
*/
resyncConnectionPollingIntervalMs?: number;
/**
* The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed.
* The polling rate (in milliseconds) at which the db should be checked for repos that should be re-indexed. Defaults to 5 seconds.
*/
reindexRepoPollingIntervalMs?: number;
/**
* The multiple of the number of CPUs to use for indexing.
* The number of connection sync jobs to run concurrently. Defaults to 8.
*/
indexConcurrencyMultiple?: number;
maxConnectionSyncJobConcurrency?: number;
/**
* The multiple of the number of CPUs to use for syncing the configuration.
* The number of repo indexing jobs to run concurrently. Defaults to 8.
*/
configSyncConcurrencyMultiple?: number;
maxRepoIndexingJobConcurrency?: number;
/**
* The multiple of the number of CPUs to use for garbage collection.
* The number of repo GC jobs to run concurrently. Defaults to 8.
*/
gcConcurrencyMultiple?: number;
maxRepoGarbageCollectionJobConcurrency?: number;
/**
* The grace period (in milliseconds) for garbage collection. Used to prevent deleting shards while they're being loaded.
*/
gcGracePeriodMs?: number;
repoGarbageCollectionGracePeriodMs?: number;
/**
* The timeout (in milliseconds) for a repo indexing to timeout.
*/
Expand Down
1 change: 1 addition & 0 deletions packages/web/src/app/error.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { SourcebotLogo } from './components/sourcebotLogo';
export default function Error({ error, reset }: { error: Error & { digest?: string }, reset: () => void }) {
useEffect(() => {
Sentry.captureException(error);
console.error(error);
}, [error]);

const { message, errorCode, statusCode } = useMemo(() => {
Expand Down
21 changes: 21 additions & 0 deletions packages/web/src/initialize.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,13 @@ const initSingleTenancy = async () => {
update: {
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
syncStatus: syncNeededOnUpdate ? ConnectionSyncStatus.SYNC_NEEDED : undefined,
isDeclarative: true,
},
create: {
name: key,
connectionType: newConnectionConfig.type,
config: newConnectionConfig as unknown as Prisma.InputJsonValue,
isDeclarative: true,
org: {
connect: {
id: SINGLE_TENANT_ORG_ID,
Expand All @@ -160,6 +162,25 @@ const initSingleTenancy = async () => {
})
}
}

const deletedConnections = await prisma.connection.findMany({
where: {
isDeclarative: true,
name: {
notIn: Object.keys(config.connections),
},
orgId: SINGLE_TENANT_ORG_ID,
}
});

for (const connection of deletedConnections) {
console.log(`Deleting connection with name '${connection.name}'. Connection ID: ${connection.id}`);
await prisma.connection.delete({
where: {
id: connection.id,
}
})
}
}
}
}
Expand Down
Loading