Skip to content

adds garbage collection for repos #182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions packages/backend/src/connectionManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
import { Job, Queue, Worker } from 'bullmq';
import { AppContext, Settings, WithRequired } from "./types.js";
import { Settings, WithRequired } from "./types.js";
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
import { createLogger } from "./logger.js";
import os from 'os';
Expand All @@ -10,6 +10,7 @@ import { getGitHubReposFromConfig } from "./github.js";

interface IConnectionManager {
scheduleConnectionSync: (connection: Connection) => Promise<void>;
registerPollingCallback: () => void;
dispose: () => void;
}

Expand All @@ -28,14 +29,13 @@ export class ConnectionManager implements IConnectionManager {

constructor(
private db: PrismaClient,
settings: Settings,
private settings: Settings,
redis: Redis,
private context: AppContext,
) {
const numCores = os.cpus().length;
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
connection: redis,
concurrency: numCores * settings.configSyncConcurrencyMultiple,
concurrency: numCores * this.settings.configSyncConcurrencyMultiple,
});
this.worker.on('completed', this.onSyncJobCompleted.bind(this));
this.worker.on('failed', this.onSyncJobFailed.bind(this));
Expand All @@ -61,6 +61,19 @@ export class ConnectionManager implements IConnectionManager {
});
}

public async registerPollingCallback() {
setInterval(async () => {
const connections = await this.db.connection.findMany({
where: {
syncStatus: ConnectionSyncStatus.SYNC_NEEDED,
}
});
for (const connection of connections) {
await this.scheduleConnectionSync(connection);
}
}, this.settings.resyncConnectionPollingIntervalMs);
}

private async runSyncJob(job: Job<JobPayload>) {
const { config, orgId } = job.data;
// @note: We aren't actually doing anything with this atm.
Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ export const DEFAULT_SETTINGS: Settings = {
autoDeleteStaleRepos: true,
reindexIntervalMs: 1000 * 60,
resyncConnectionPollingIntervalMs: 1000,
reindexRepoPollingInternvalMs: 1000,
indexConcurrencyMultiple: 3,
configSyncConcurrencyMultiple: 3,
}
208 changes: 6 additions & 202 deletions packages/backend/src/main.ts
Original file line number Diff line number Diff line change
@@ -1,120 +1,14 @@
import { ConnectionSyncStatus, PrismaClient, Repo, RepoIndexingStatus, RepoToConnection, Connection } from '@sourcebot/db';
import { existsSync } from 'fs';
import { cloneRepository, fetchRepository } from "./git.js";
import { PrismaClient } from '@sourcebot/db';
import { createLogger } from "./logger.js";
import { captureEvent } from "./posthog.js";
import { AppContext } from "./types.js";
import { getRepoPath, getTokenFromConfig, measure } from "./utils.js";
import { indexGitRepository } from "./zoekt.js";
import { DEFAULT_SETTINGS } from './constants.js';
import { Queue, Worker, Job } from 'bullmq';
import { Redis } from 'ioredis';
import * as os from 'os';
import { ConnectionManager } from './connectionManager.js';
import { ConnectionConfig } from '@sourcebot/schemas/v3/connection.type';
import { RepoManager } from './repoManager.js';

const logger = createLogger('main');

type RepoWithConnections = Repo & { connections: (RepoToConnection & { connection: Connection})[] };

// TODO: do this better? ex: try using the tokens from all the connections
// We can no longer use repo.cloneUrl directly since it doesn't contain the token for security reasons. As a result, we need to
// fetch the token here using the connections from the repo. Multiple connections could be referencing this repo, and each
// may have their own token. This method will just pick the first connection that has a token (if one exists) and uses that. This
// may technically cause syncing to fail if that connection's token just so happens to not have access to the repo it's referrencing.
const getTokenForRepo = async (repo: RepoWithConnections, db: PrismaClient) => {
const repoConnections = repo.connections;
if (repoConnections.length === 0) {
logger.error(`Repo ${repo.id} has no connections`);
return;
}

let token: string | undefined;
for (const repoConnection of repoConnections) {
const connection = repoConnection.connection;
const config = connection.config as unknown as ConnectionConfig;
if (config.token) {
token = await getTokenFromConfig(config.token, connection.orgId, db);
if (token) {
break;
}
}
}

return token;
}

const syncGitRepository = async (repo: RepoWithConnections, ctx: AppContext, db: PrismaClient) => {
let fetchDuration_s: number | undefined = undefined;
let cloneDuration_s: number | undefined = undefined;

const repoPath = getRepoPath(repo, ctx);
const metadata = repo.metadata as Record<string, string>;

if (existsSync(repoPath)) {
logger.info(`Fetching ${repo.id}...`);

const { durationMs } = await measure(() => fetchRepository(repoPath, ({ method, stage, progress }) => {
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
fetchDuration_s = durationMs / 1000;

process.stdout.write('\n');
logger.info(`Fetched ${repo.name} in ${fetchDuration_s}s`);

} else {
logger.info(`Cloning ${repo.id}...`);

const token = await getTokenForRepo(repo, db);
let cloneUrl = repo.cloneUrl;
if (token) {
const url = new URL(cloneUrl);
url.username = token;
cloneUrl = url.toString();
}

const { durationMs } = await measure(() => cloneRepository(cloneUrl, repoPath, metadata, ({ method, stage, progress }) => {
logger.info(`git.${method} ${stage} stage ${progress}% complete for ${repo.id}`)
}));
cloneDuration_s = durationMs / 1000;

process.stdout.write('\n');
logger.info(`Cloned ${repo.id} in ${cloneDuration_s}s`);
}

logger.info(`Indexing ${repo.id}...`);
const { durationMs } = await measure(() => indexGitRepository(repo, ctx));
const indexDuration_s = durationMs / 1000;
logger.info(`Indexed ${repo.id} in ${indexDuration_s}s`);

return {
fetchDuration_s,
cloneDuration_s,
indexDuration_s,
}
}

async function addReposToQueue(db: PrismaClient, queue: Queue, repos: Repo[]) {
for (const repo of repos) {
await db.$transaction(async (tx) => {
await tx.repo.update({
where: { id: repo.id },
data: { repoIndexingStatus: RepoIndexingStatus.IN_INDEX_QUEUE },
});

// Add the job to the queue
await queue.add('indexJob', repo);
logger.info(`Added job to queue for repo ${repo.id}`);
}).catch((err: unknown) => {
logger.error(`Failed to add job to queue for repo ${repo.id}: ${err}`);
});
}
}

export const main = async (db: PrismaClient, context: AppContext) => {
/////////////////////////////
// Init Redis
/////////////////////////////
const redis = new Redis({
host: 'localhost',
port: 6379,
Expand All @@ -128,99 +22,9 @@ export const main = async (db: PrismaClient, context: AppContext) => {
process.exit(1);
});

const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis, context);
setInterval(async () => {
const connections = await db.connection.findMany({
where: {
syncStatus: ConnectionSyncStatus.SYNC_NEEDED,
}
});
for (const connection of connections) {
await connectionManager.scheduleConnectionSync(connection);
}
}, DEFAULT_SETTINGS.resyncConnectionPollingIntervalMs);

/////////////////////////
// Setup repo indexing
/////////////////////////
const indexQueue = new Queue('indexQueue');

const numCores = os.cpus().length;
const numWorkers = numCores * DEFAULT_SETTINGS.indexConcurrencyMultiple;
logger.info(`Detected ${numCores} cores. Setting repo index max concurrency to ${numWorkers}`);
const worker = new Worker('indexQueue', async (job: Job) => {
const repo = job.data as RepoWithConnections;

let indexDuration_s: number | undefined;
let fetchDuration_s: number | undefined;
let cloneDuration_s: number | undefined;

const stats = await syncGitRepository(repo, context, db);
indexDuration_s = stats.indexDuration_s;
fetchDuration_s = stats.fetchDuration_s;
cloneDuration_s = stats.cloneDuration_s;

captureEvent('repo_synced', {
vcs: 'git',
codeHost: repo.external_codeHostType,
indexDuration_s,
fetchDuration_s,
cloneDuration_s,
});

await db.repo.update({
where: {
id: repo.id,
},
data: {
indexedAt: new Date(),
repoIndexingStatus: RepoIndexingStatus.INDEXED,
}
});
}, { connection: redis, concurrency: numWorkers });

worker.on('completed', (job: Job) => {
logger.info(`Job ${job.id} completed`);
});
worker.on('failed', async (job: Job | undefined, err: unknown) => {
logger.info(`Job failed with error: ${err}`);
if (job) {
await db.repo.update({
where: {
id: job.data.id,
},
data: {
repoIndexingStatus: RepoIndexingStatus.FAILED,
}
})
}
});

// Repo indexing loop
while (true) {
const thresholdDate = new Date(Date.now() - DEFAULT_SETTINGS.reindexIntervalMs);
const repos = await db.repo.findMany({
where: {
repoIndexingStatus: {
notIn: [RepoIndexingStatus.IN_INDEX_QUEUE, RepoIndexingStatus.FAILED]
},
OR: [
{ indexedAt: null },
{ indexedAt: { lt: thresholdDate } },
{ repoIndexingStatus: RepoIndexingStatus.NEW }
]
},
include: {
connections: {
include: {
connection: true
}
}
}
});
addReposToQueue(db, indexQueue, repos);

const connectionManager = new ConnectionManager(db, DEFAULT_SETTINGS, redis);
connectionManager.registerPollingCallback();

await new Promise(resolve => setTimeout(resolve, 1000));
}
const repoManager = new RepoManager(db, DEFAULT_SETTINGS, redis, context);
repoManager.blockingPollLoop();
}
Loading