Skip to content

Commit dd8ff6e

Browse files
Connection management (#178)
1 parent bae7ca3 commit dd8ff6e

File tree

18 files changed

+626
-352
lines changed

18 files changed

+626
-352
lines changed

packages/backend/src/config.ts

Lines changed: 0 additions & 139 deletions
This file was deleted.
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import { Connection, ConnectionSyncStatus, PrismaClient, Prisma } from "@sourcebot/db";
2+
import { Job, Queue, Worker } from 'bullmq';
3+
import { AppContext, Settings, WithRequired } from "./types.js";
4+
import { ConnectionConfig } from "@sourcebot/schemas/v3/connection.type";
5+
import { createLogger } from "./logger.js";
6+
import os from 'os';
7+
import { Redis } from 'ioredis';
8+
import { getTokenFromConfig, marshalBool } from "./utils.js";
9+
import { getGitHubReposFromConfig } from "./github.js";
10+
11+
interface IConnectionManager {
12+
scheduleConnectionSync: (connection: Connection) => Promise<void>;
13+
dispose: () => void;
14+
}
15+
16+
const QUEUE_NAME = 'connectionSyncQueue';
17+
18+
type JobPayload = {
19+
connectionId: number,
20+
orgId: number,
21+
config: ConnectionConfig,
22+
};
23+
24+
export class ConnectionManager implements IConnectionManager {
25+
private queue = new Queue<JobPayload>(QUEUE_NAME);
26+
private worker: Worker;
27+
private logger = createLogger('ConnectionManager');
28+
29+
constructor(
30+
private db: PrismaClient,
31+
settings: Settings,
32+
redis: Redis,
33+
private context: AppContext,
34+
) {
35+
const numCores = os.cpus().length;
36+
this.worker = new Worker(QUEUE_NAME, this.runSyncJob.bind(this), {
37+
connection: redis,
38+
concurrency: numCores * settings.configSyncConcurrencyMultiple,
39+
});
40+
this.worker.on('completed', this.onSyncJobCompleted.bind(this));
41+
this.worker.on('failed', this.onSyncJobFailed.bind(this));
42+
}
43+
44+
public async scheduleConnectionSync(connection: Connection) {
45+
await this.db.$transaction(async (tx) => {
46+
await tx.connection.update({
47+
where: { id: connection.id },
48+
data: { syncStatus: ConnectionSyncStatus.IN_SYNC_QUEUE },
49+
});
50+
51+
const connectionConfig = connection.config as unknown as ConnectionConfig;
52+
53+
await this.queue.add('connectionSyncJob', {
54+
connectionId: connection.id,
55+
orgId: connection.orgId,
56+
config: connectionConfig,
57+
});
58+
this.logger.info(`Added job to queue for connection ${connection.id}`);
59+
}).catch((err: unknown) => {
60+
this.logger.error(`Failed to add job to queue for connection ${connection.id}: ${err}`);
61+
});
62+
}
63+
64+
private async runSyncJob(job: Job<JobPayload>) {
65+
const { config, orgId } = job.data;
66+
// @note: We aren't actually doing anything with this atm.
67+
const abortController = new AbortController();
68+
69+
type RepoData = WithRequired<Prisma.RepoCreateInput, 'connections'>;
70+
const repoData: RepoData[] = await (async () => {
71+
switch (config.type) {
72+
case 'github': {
73+
const token = config.token ? getTokenFromConfig(config.token, this.context) : undefined;
74+
const gitHubRepos = await getGitHubReposFromConfig(config, abortController.signal, this.context);
75+
const hostUrl = config.url ?? 'https://github.com';
76+
const hostname = config.url ? new URL(config.url).hostname : 'github.com';
77+
78+
return gitHubRepos.map((repo) => {
79+
const repoName = `${hostname}/${repo.full_name}`;
80+
const cloneUrl = new URL(repo.clone_url!);
81+
if (token) {
82+
cloneUrl.username = token;
83+
}
84+
85+
const record: RepoData = {
86+
external_id: repo.id.toString(),
87+
external_codeHostType: 'github',
88+
external_codeHostUrl: hostUrl,
89+
cloneUrl: cloneUrl.toString(),
90+
name: repoName,
91+
isFork: repo.fork,
92+
isArchived: !!repo.archived,
93+
org: {
94+
connect: {
95+
id: orgId,
96+
},
97+
},
98+
connections: {
99+
create: {
100+
connectionId: job.data.connectionId,
101+
}
102+
},
103+
metadata: {
104+
'zoekt.web-url-type': 'github',
105+
'zoekt.web-url': repo.html_url,
106+
'zoekt.name': repoName,
107+
'zoekt.github-stars': (repo.stargazers_count ?? 0).toString(),
108+
'zoekt.github-watchers': (repo.watchers_count ?? 0).toString(),
109+
'zoekt.github-subscribers': (repo.subscribers_count ?? 0).toString(),
110+
'zoekt.github-forks': (repo.forks_count ?? 0).toString(),
111+
'zoekt.archived': marshalBool(repo.archived),
112+
'zoekt.fork': marshalBool(repo.fork),
113+
'zoekt.public': marshalBool(repo.private === false)
114+
},
115+
};
116+
117+
return record;
118+
})
119+
}
120+
}
121+
})();
122+
123+
// @note: to handle orphaned Repos we delete all RepoToConnection records for this connection,
124+
// and then recreate them when we upsert the repos. For example, if a repo is no-longer
125+
// captured by the connection's config (e.g., it was deleted, marked archived, etc.), it won't
126+
// appear in the repoData array above, and so the RepoToConnection record won't be re-created.
127+
// Repos that have no RepoToConnection records are considered orphaned and can be deleted.
128+
await this.db.$transaction(async (tx) => {
129+
await tx.connection.update({
130+
where: {
131+
id: job.data.connectionId,
132+
},
133+
data: {
134+
repos: {
135+
deleteMany: {}
136+
}
137+
}
138+
});
139+
140+
await Promise.all(repoData.map((repo) => {
141+
return tx.repo.upsert({
142+
where: {
143+
external_id_external_codeHostUrl: {
144+
external_id: repo.external_id,
145+
external_codeHostUrl: repo.external_codeHostUrl,
146+
},
147+
},
148+
create: repo,
149+
update: repo as Prisma.RepoUpdateInput,
150+
});
151+
}));
152+
153+
});
154+
}
155+
156+
157+
private async onSyncJobCompleted(job: Job<JobPayload>) {
158+
this.logger.info(`Connection sync job ${job.id} completed`);
159+
const { connectionId } = job.data;
160+
161+
await this.db.connection.update({
162+
where: {
163+
id: connectionId,
164+
},
165+
data: {
166+
syncStatus: ConnectionSyncStatus.SYNCED,
167+
syncedAt: new Date()
168+
}
169+
})
170+
}
171+
172+
private async onSyncJobFailed(job: Job | undefined, err: unknown) {
173+
this.logger.info(`Connection sync job failed with error: ${err}`);
174+
if (job) {
175+
const { connectionId } = job.data;
176+
await this.db.connection.update({
177+
where: {
178+
id: connectionId,
179+
},
180+
data: {
181+
syncStatus: ConnectionSyncStatus.FAILED,
182+
syncedAt: new Date()
183+
}
184+
})
185+
}
186+
}
187+
188+
public dispose() {
189+
this.worker.close();
190+
this.queue.close();
191+
}
192+
}
193+

packages/backend/src/constants.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ export const DEFAULT_SETTINGS: Settings = {
77
maxFileSize: 2 * 1024 * 1024, // 2MB in bytes
88
autoDeleteStaleRepos: true,
99
reindexIntervalMs: 1000 * 60,
10-
resyncIntervalMs: 1000 * 60 * 60 * 24, // 1 day in milliseconds
10+
resyncConnectionPollingIntervalMs: 1000,
1111
indexConcurrencyMultiple: 3,
1212
configSyncConcurrencyMultiple: 3,
1313
}

0 commit comments

Comments
 (0)