Skip to content

Commit 721590e

Browse files
chore: send resources to go correctly
1 parent 812e3de commit 721590e

File tree

2 files changed

+110
-49
lines changed

2 files changed

+110
-49
lines changed

packages/events/src/kafka/event-dispatch/resource.ts

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ import { eq, takeFirst } from "@ctrlplane/db";
22
import { db } from "@ctrlplane/db/client";
33
import * as schema from "@ctrlplane/db/schema";
44

5-
import { sendNodeEvent } from "../client.js";
5+
import type { FullResource, PbResource } from "../events.js";
6+
import { sendGoEvent, sendNodeEvent } from "../client.js";
67
import { Event } from "../events.js";
78

89
const getFullResource = async (resource: schema.Resource) => {
@@ -16,19 +17,46 @@ const getFullResource = async (resource: schema.Resource) => {
1617
return { ...resource, metadata };
1718
};
1819

20+
const getPbResource = (resource: FullResource): PbResource => ({
21+
id: resource.id,
22+
name: resource.name,
23+
version: resource.version,
24+
kind: resource.kind,
25+
identifier: resource.identifier,
26+
workspaceId: resource.workspaceId,
27+
createdAt: resource.createdAt.toISOString(),
28+
providerId: resource.providerId ?? undefined,
29+
lockedAt: resource.lockedAt?.toISOString() ?? undefined,
30+
updatedAt: resource.updatedAt?.toISOString() ?? undefined,
31+
deletedAt: resource.deletedAt?.toISOString() ?? undefined,
32+
metadata: resource.metadata,
33+
});
34+
35+
const convertFullResourceToNodeEvent = (fullResource: FullResource) => ({
36+
workspaceId: fullResource.workspaceId,
37+
eventType: Event.ResourceCreated,
38+
eventId: fullResource.id,
39+
timestamp: Date.now(),
40+
source: "api" as const,
41+
payload: fullResource,
42+
});
43+
44+
const convertFullResourceToGoEvent = (fullResource: FullResource) => ({
45+
workspaceId: fullResource.workspaceId,
46+
eventType: Event.ResourceCreated as const,
47+
payload: getPbResource(fullResource),
48+
timestamp: Date.now(),
49+
});
50+
1951
export const dispatchResourceCreated = (
2052
resource: schema.Resource,
21-
source?: "api" | "scheduler" | "user-action",
53+
_?: "api" | "scheduler" | "user-action",
2254
) =>
2355
getFullResource(resource).then((fullResource) =>
24-
sendNodeEvent({
25-
workspaceId: resource.workspaceId,
26-
eventType: Event.ResourceCreated,
27-
eventId: resource.id,
28-
timestamp: Date.now(),
29-
source: source ?? "api",
30-
payload: fullResource,
31-
}),
56+
Promise.all([
57+
sendNodeEvent(convertFullResourceToNodeEvent(fullResource)),
58+
sendGoEvent(convertFullResourceToGoEvent(fullResource)),
59+
]),
3260
);
3361

3462
export const dispatchResourceUpdated = async (
@@ -41,29 +69,28 @@ export const dispatchResourceUpdated = async (
4169
getFullResource(current),
4270
]);
4371

44-
sendNodeEvent({
45-
workspaceId: current.workspaceId,
46-
eventType: Event.ResourceUpdated,
47-
eventId: current.id,
48-
timestamp: Date.now(),
49-
source: source ?? "api",
50-
payload: { previous: previousFullResource, current: currentFullResource },
51-
});
72+
await Promise.all([
73+
sendNodeEvent({
74+
workspaceId: current.workspaceId,
75+
eventType: Event.ResourceUpdated,
76+
eventId: current.id,
77+
timestamp: Date.now(),
78+
source: source ?? "api",
79+
payload: { previous: previousFullResource, current: currentFullResource },
80+
}),
81+
sendGoEvent(convertFullResourceToGoEvent(currentFullResource)),
82+
]);
5283
};
5384

5485
export const dispatchResourceDeleted = (
5586
resource: schema.Resource,
56-
source?: "api" | "scheduler" | "user-action",
87+
_?: "api" | "scheduler" | "user-action",
5788
) =>
5889
getFullResource(resource).then((fullResource) =>
59-
sendNodeEvent({
60-
workspaceId: resource.workspaceId,
61-
eventType: Event.ResourceDeleted,
62-
eventId: resource.id,
63-
timestamp: Date.now(),
64-
source: source ?? "api",
65-
payload: fullResource,
66-
}),
90+
Promise.all([
91+
sendNodeEvent(convertFullResourceToNodeEvent(fullResource)),
92+
sendGoEvent(convertFullResourceToGoEvent(fullResource)),
93+
]),
6794
);
6895

6996
export const getWorkspaceIdForResource = async (resourceId: string) =>

packages/events/src/kafka/events.ts

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -129,29 +129,42 @@ export type EventPayload = {
129129
// [Event.SystemDeleted]: schema.System;
130130
};
131131

132+
// Remove the typescript-specific $typeName field from the protobuf objects
133+
// go complains about the $typeName field being unknown
134+
type Without$typeName<T> = Omit<T, "$typeName">;
135+
export type PbResource = Without$typeName<pb.Resource>;
136+
export type PbDeployment = Without$typeName<pb.Deployment>;
137+
export type PbDeploymentVariable = Without$typeName<pb.DeploymentVariable>;
138+
export type PbDeploymentVariableValue =
139+
Without$typeName<pb.DeploymentVariableValue>;
140+
export type PbDeploymentVersion = Without$typeName<pb.DeploymentVersion>;
141+
export type PbEnvironment = Without$typeName<pb.Environment>;
142+
export type PbPolicy = Without$typeName<pb.Policy>;
143+
export type PbJob = Without$typeName<pb.Job>;
144+
132145
export type GoEventPayload = {
133-
[Event.ResourceCreated]: pb.Resource;
134-
[Event.ResourceUpdated]: pb.Resource;
135-
[Event.ResourceDeleted]: pb.Resource;
136-
[Event.DeploymentCreated]: pb.Deployment;
137-
[Event.DeploymentUpdated]: pb.Deployment;
138-
[Event.DeploymentDeleted]: pb.Deployment;
139-
[Event.DeploymentVariableCreated]: pb.DeploymentVariable;
140-
[Event.DeploymentVariableUpdated]: pb.DeploymentVariable;
146+
[Event.ResourceCreated]: PbResource;
147+
[Event.ResourceUpdated]: PbResource;
148+
[Event.ResourceDeleted]: PbResource;
149+
[Event.DeploymentCreated]: PbDeployment;
150+
[Event.DeploymentUpdated]: PbDeployment;
151+
[Event.DeploymentDeleted]: PbDeployment;
152+
[Event.DeploymentVariableCreated]: PbDeploymentVariable;
153+
[Event.DeploymentVariableUpdated]: PbDeploymentVariable;
141154
[Event.DeploymentVariableDeleted]: pb.DeploymentVariable;
142-
[Event.DeploymentVariableValueCreated]: pb.DeploymentVariableValue;
155+
[Event.DeploymentVariableValueCreated]: PbDeploymentVariableValue;
143156
[Event.DeploymentVariableValueUpdated]: pb.DeploymentVariableValue;
144-
[Event.DeploymentVariableValueDeleted]: pb.DeploymentVariableValue;
145-
[Event.DeploymentVersionCreated]: pb.DeploymentVersion;
146-
[Event.DeploymentVersionUpdated]: pb.DeploymentVersion;
147-
[Event.DeploymentVersionDeleted]: schema.DeploymentVersion;
148-
[Event.EnvironmentCreated]: pb.Environment;
149-
[Event.EnvironmentUpdated]: pb.Environment;
150-
[Event.EnvironmentDeleted]: pb.Environment;
151-
[Event.PolicyCreated]: pb.Policy;
152-
[Event.PolicyUpdated]: pb.Policy;
153-
[Event.PolicyDeleted]: pb.Policy;
154-
[Event.JobUpdated]: pb.Job;
157+
[Event.DeploymentVariableValueDeleted]: PbDeploymentVariableValue;
158+
[Event.DeploymentVersionCreated]: PbDeploymentVersion;
159+
[Event.DeploymentVersionUpdated]: PbDeploymentVersion;
160+
[Event.DeploymentVersionDeleted]: PbDeploymentVersion;
161+
[Event.EnvironmentCreated]: PbEnvironment;
162+
[Event.EnvironmentUpdated]: PbEnvironment;
163+
[Event.EnvironmentDeleted]: PbEnvironment;
164+
[Event.PolicyCreated]: PbPolicy;
165+
[Event.PolicyUpdated]: PbPolicy;
166+
[Event.PolicyDeleted]: PbPolicy;
167+
[Event.JobUpdated]: PbJob;
155168
};
156169

157170
export type Message<T extends keyof EventPayload> = {
@@ -166,6 +179,27 @@ export type Message<T extends keyof EventPayload> = {
166179
export type GoMessage<T extends keyof GoEventPayload> = {
167180
workspaceId: string;
168181
eventType: T;
169-
source: "api" | "scheduler" | "user-action";
170-
data: GoEventPayload[T];
182+
payload: GoEventPayload[T];
171183
};
184+
185+
// Helper function to wrap a selector in the protobuf format
186+
function wrapSelector<T extends Record<string, any> | null | undefined>(
187+
selector: T,
188+
): T extends null | undefined ? null : { json_selector: T } {
189+
if (selector == null) return null as any;
190+
return { json_selector: selector } as any;
191+
}
192+
193+
// Helper to transform an object's selector fields to protobuf format
194+
export function wrapSelectorsInObject<T extends Record<string, any>>(
195+
obj: T,
196+
selectorFields: (keyof T)[],
197+
): T {
198+
const result = { ...obj };
199+
for (const field of selectorFields) {
200+
if (field in result) {
201+
(result as any)[field] = wrapSelector(result[field]);
202+
}
203+
}
204+
return result;
205+
}

0 commit comments

Comments
 (0)