Skip to content

Commit 812e3de

Browse files
committed
update format
1 parent 79adc72 commit 812e3de

File tree

10 files changed

+41
-60
lines changed

10 files changed

+41
-60
lines changed

apps/workspace-engine/pkg/events/handler/handler.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,11 @@ const (
5757
UserApprovalRecordDelete EventType = "user-approval-record.deleted"
5858
)
5959

60-
// BaseEvent represents the common structure of all events
61-
type BaseEvent struct {
62-
EventType EventType `json:"eventType"`
63-
WorkspaceID string `json:"workspaceId"`
64-
}
65-
6660
// RawEvent represents the raw event data received from Kafka messages
6761
type RawEvent struct {
68-
BaseEvent
69-
Data json.RawMessage `json:"payload,omitempty"`
62+
EventType EventType `json:"eventType"`
63+
WorkspaceID string `json:"workspaceId"`
64+
Data json.RawMessage `json:"data,omitempty"`
7065
}
7166

7267
// Handler defines the interface for processing events

apps/workspace-engine/pkg/events/handler/jobs/jobs.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ func HandleJobUpdated(
2020
}
2121

2222
ws.Jobs().Upsert(ctx, job)
23-
23+
2424
rt := &pb.ReleaseTarget{
2525
EnvironmentId: job.EnvironmentId,
26-
DeploymentId: job.DeploymentId,
27-
ResourceId: job.ResourceId,
26+
DeploymentId: job.DeploymentId,
27+
ResourceId: job.ResourceId,
2828
}
2929
ws.ReleaseManager().TaintReleaseTargets(rt)
3030

apps/workspace-engine/pkg/events/handler/policies/policies.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func HandlePolicyUpdated(
3333
}
3434

3535
ws.Policies().Upsert(ctx, policy)
36-
36+
3737
return nil
3838
}
3939

apps/workspace-engine/pkg/pb/selector.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ func NewJsonSelector(selector *structpb.Struct) *Selector {
88
Json: selector,
99
},
1010
}
11-
}
11+
}

apps/workspace-engine/test/e2e/engine_deployment_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,11 +257,11 @@ func TestEngine_DeploymentJobAgentConfigMerging(t *testing.T) {
257257
// Verify deployment has job agent config
258258
d, _ := engine.Workspace().Deployments().Get(deploymentID)
259259
config := d.GetJobAgentConfig().AsMap()
260-
260+
261261
if config["namespace"] != "custom-namespace" {
262262
t.Fatalf("deployment job agent config namespace mismatch: got %v, want custom-namespace", config["namespace"])
263263
}
264-
264+
265265
if timeout, ok := config["timeout"].(float64); !ok || timeout != 300 {
266266
t.Fatalf("deployment job agent config timeout mismatch: got %v, want 300", config["timeout"])
267267
}
@@ -278,12 +278,12 @@ func TestEngine_DeploymentJobAgentConfigMerging(t *testing.T) {
278278
break
279279
}
280280
jobConfig := job.GetJobAgentConfig().AsMap()
281-
281+
282282
// Verify merged config includes deployment-specific settings
283283
if jobConfig["namespace"] != "custom-namespace" {
284284
t.Fatalf("job config namespace mismatch: got %v, want custom-namespace", jobConfig["namespace"])
285285
}
286-
286+
287287
if timeout, ok := jobConfig["timeout"].(float64); !ok || timeout != 300 {
288288
t.Fatalf("job config timeout mismatch: got %v, want 300", jobConfig["timeout"])
289289
}
@@ -389,7 +389,7 @@ func TestEngine_DeploymentMultipleJobAgents(t *testing.T) {
389389
// Verify each job has the correct job agent
390390
k8sJobFound := false
391391
dockerJobFound := false
392-
392+
393393
for _, job := range pendingJobs {
394394
if job.DeploymentId == deploymentK8s {
395395
k8sJobFound = true

apps/workspace-engine/test/e2e/engine_job_agent_test.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,11 +187,11 @@ func TestEngine_JobAgentWithConfig(t *testing.T) {
187187
ja.Name = "Config Test Agent"
188188
ja.WorkspaceId = engine.Workspace().ID
189189
ja.Config = c.MustNewStructFromMap(map[string]any{
190-
"namespace": "default",
191-
"timeout": 300,
192-
"retries": 3,
193-
"cleanupPolicy": "always",
194-
"imageRegistry": "docker.io",
190+
"namespace": "default",
191+
"timeout": 300,
192+
"retries": 3,
193+
"cleanupPolicy": "always",
194+
"imageRegistry": "docker.io",
195195
"resources": map[string]any{
196196
"cpu": "1000m",
197197
"memory": "2Gi",
@@ -259,8 +259,8 @@ func TestEngine_JobAgentConfigUpdate(t *testing.T) {
259259

260260
// Update config
261261
ja.Config = c.MustNewStructFromMap(map[string]any{
262-
"timeout": 600,
263-
"retries": 5,
262+
"timeout": 600,
263+
"retries": 5,
264264
"newField": "newValue",
265265
})
266266

@@ -563,4 +563,3 @@ func TestEngine_JobAgentNameUniqueness(t *testing.T) {
563563
t.Fatal("agents should have different IDs")
564564
}
565565
}
566-

apps/workspace-engine/test/e2e/engine_policy_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,9 @@ func TestEngine_PolicyResourceSelector(t *testing.T) {
257257
selector.ResourceSelector = pb.NewJsonSelector(c.MustNewStructFromMap(map[string]any{
258258
"type": "metadata",
259259
"operator": "equals",
260-
"key": "priority",
261-
"value": "critical",
262-
}),
260+
"key": "priority",
261+
"value": "critical",
262+
}),
263263
)
264264
policy.Selectors = []*pb.PolicyTargetSelector{selector}
265265
engine.PushEvent(ctx, handler.PolicyCreate, policy)
@@ -459,7 +459,7 @@ func TestEngine_PolicyMultipleSelectors(t *testing.T) {
459459
}),
460460
),
461461
integration.WithPolicyTargetSelector(
462-
integration.PolicyTargetJsonDeploymentSelector(map[string]any{
462+
integration.PolicyTargetJsonDeploymentSelector(map[string]any{
463463
"type": "name",
464464
"operator": "contains",
465465
"value": "staging",

apps/workspace-engine/test/integration/workspace.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,23 @@ func (tw *TestWorkspace) PushEvent(ctx context.Context, eventType handler.EventT
6464
// Marshal the data payload
6565
var dataBytes []byte
6666
var err error
67-
67+
6868
// Check if data is a protobuf message
6969
if protoMsg, ok := data.(proto.Message); ok {
7070
dataBytes, err = protojson.Marshal(protoMsg)
7171
} else {
7272
dataBytes, err = json.Marshal(data)
7373
}
74-
74+
7575
if err != nil {
7676
tw.t.Fatalf("failed to marshal event data: %v", err)
7777
return tw
7878
}
7979

8080
// Create raw event
8181
rawEvent := handler.RawEvent{
82-
BaseEvent: handler.BaseEvent{
83-
EventType: eventType,
84-
WorkspaceID: tw.workspace.ID,
85-
},
82+
EventType: eventType,
83+
WorkspaceID: tw.workspace.ID,
8684
Data: dataBytes,
8785
}
8886

packages/events/src/kafka/client.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ import { Kafka } from "kafkajs";
33

44
import { logger } from "@ctrlplane/logger";
55

6-
import type { EventPayload, GoEventPayload, Message } from "./events.js";
6+
import type {
7+
EventPayload,
8+
GoEventPayload,
9+
GoMessage,
10+
Message,
11+
} from "./events.js";
712
import { env } from "../config.js";
813

914
const log = logger.child({ component: "kafka-client" });
@@ -26,7 +31,7 @@ const getProducer = async () => {
2631
};
2732

2833
export const sendGoEvent = async <T extends keyof GoEventPayload>(
29-
message: Message<T> | Message<T>[],
34+
message: GoMessage<T> | GoMessage<T>[],
3035
) => {
3136
const messages = Array.isArray(message) ? message : [message];
3237
const topic = "workspace-events";
@@ -36,7 +41,6 @@ export const sendGoEvent = async <T extends keyof GoEventPayload>(
3641
messages: messages.map((message) => ({
3742
key: message.workspaceId,
3843
value: JSON.stringify(message),
39-
timestamp: message.timestamp.toString(),
4044
})),
4145
});
4246
};

packages/events/src/kafka/events.ts

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -163,24 +163,9 @@ export type Message<T extends keyof EventPayload> = {
163163
payload: EventPayload[T];
164164
};
165165

166-
// Helper function to wrap a selector in the protobuf format
167-
function wrapSelector<T extends Record<string, any> | null | undefined>(
168-
selector: T,
169-
): T extends null | undefined ? null : { json_selector: T } {
170-
if (selector == null) return null as any;
171-
return { json_selector: selector } as any;
172-
}
173-
174-
// Helper to transform an object's selector fields to protobuf format
175-
export function wrapSelectorsInObject<T extends Record<string, any>>(
176-
obj: T,
177-
selectorFields: (keyof T)[],
178-
): T {
179-
const result = { ...obj };
180-
for (const field of selectorFields) {
181-
if (field in result) {
182-
(result as any)[field] = wrapSelector(result[field]);
183-
}
184-
}
185-
return result;
186-
}
166+
export type GoMessage<T extends keyof GoEventPayload> = {
167+
workspaceId: string;
168+
eventType: T;
169+
source: "api" | "scheduler" | "user-action";
170+
data: GoEventPayload[T];
171+
};

0 commit comments

Comments
 (0)