Skip to content

Commit fe8882e

Browse files
committed
Allow event options to be configurable
1 parent ec181ab commit fe8882e

File tree

4 files changed

+204
-16
lines changed

4 files changed

+204
-16
lines changed

src/component/events.ts

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@ import {
77
MutationCtx,
88
} from "./_generated/server";
99
import { internal } from "./_generated/api";
10-
import { sendEvents } from "../sdk/EventProcessor";
10+
import {
11+
EventProcessorOptions,
12+
sendEvents,
13+
validateEventProcessorOptions,
14+
} from "../sdk/EventProcessor";
1115
import isEqual from "lodash.isequal";
1216

13-
// TODO: Make these configurable.
1417
export const EVENT_CAPACITY = 1000;
1518
export const EVENT_BATCH_SIZE = 100;
1619
export const EVENT_PROCESSING_INTERVAL_SECONDS = 5;
@@ -20,6 +23,9 @@ const eventsOptions = v.optional(
2023
allAttributesPrivate: v.optional(v.boolean()),
2124
privateAttributes: v.optional(v.array(v.string())),
2225
eventsUri: v.optional(v.string()),
26+
eventCapacity: v.optional(v.number()),
27+
eventBatchSize: v.optional(v.number()),
28+
eventProcessingIntervalSeconds: v.optional(v.number()),
2329
})
2430
);
2531

@@ -31,20 +37,23 @@ export const storeEvents = mutation({
3137
},
3238
returns: v.null(),
3339
handler: async (ctx, { sdkKey, payloads, options }) => {
40+
validateEventProcessorOptions(options);
3441
await handleScheduleProcessing(ctx, {
3542
sdkKey,
3643
options,
3744
});
3845

39-
const numEvents = (await ctx.db.query("events").take(EVENT_CAPACITY + 1))
46+
const eventCapacity = options?.eventCapacity ?? EVENT_CAPACITY;
47+
48+
const numEvents = (await ctx.db.query("events").take(eventCapacity + 1))
4049
.length;
4150

42-
if (numEvents >= EVENT_CAPACITY) {
51+
if (numEvents >= eventCapacity) {
4352
console.warn("Event store is full, dropping events.");
4453
return;
4554
}
4655

47-
const payloadsToStore = payloads.slice(0, EVENT_CAPACITY - numEvents);
56+
const payloadsToStore = payloads.slice(0, eventCapacity - numEvents);
4857
if (payloadsToStore.length !== payloads.length) {
4958
console.warn(
5059
`${payloads.length - payloadsToStore.length} events were dropped due to capacity limits.`
@@ -68,13 +77,10 @@ const handleScheduleProcessing = async (
6877
}: {
6978
sdkKey: string;
7079
runImmediately?: boolean;
71-
options?: {
72-
allAttributesPrivate?: boolean;
73-
privateAttributes?: string[];
74-
eventsUri?: string;
75-
};
80+
options?: EventProcessorOptions;
7681
}
7782
) => {
83+
validateEventProcessorOptions(options);
7884
const existingScheduledJob = await ctx.db.query("eventSchedule").first();
7985
if (existingScheduledJob !== null) {
8086
const existingSystemJob = await ctx.db.system.get(
@@ -132,8 +138,10 @@ export const processEvents = internalAction({
132138
options: eventsOptions,
133139
},
134140
handler: async (ctx, { sdkKey, options }) => {
141+
validateEventProcessorOptions(options);
142+
const eventBatchSize = options?.eventBatchSize ?? EVENT_BATCH_SIZE;
135143
const events = await ctx.runQuery(internal.events.getOldestEvents, {
136-
count: EVENT_BATCH_SIZE,
144+
count: eventBatchSize,
137145
});
138146

139147
if (events.length === 0) {

src/sdk/EventProcessor.test.ts

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import { describe, expect, test, vi } from "vitest";
2-
import { EventProcessor } from "./EventProcessor";
2+
import {
3+
EventProcessor,
4+
validateEventProcessorOptions,
5+
} from "./EventProcessor";
36
import { convexTest } from "convex-test";
47
import schema from "../component/schema";
58
import { modules } from "../component/setup.test";
@@ -35,3 +38,82 @@ describe("EventProcessor", () => {
3538
expect(sendEvents).toHaveBeenCalledOnce();
3639
});
3740
});
41+
42+
describe("validateEventProcessorOptions", () => {
43+
test("should throw an error if allAttributesPrivate is not a boolean", () => {
44+
const options = { allAttributesPrivate: "true" };
45+
expect(() => validateEventProcessorOptions(options)).toThrow(
46+
new Error("allAttributesPrivate must be a boolean")
47+
);
48+
});
49+
50+
test("should throw an error if privateAttributes is not an array", () => {
51+
const options = { privateAttributes: "not-an-array" };
52+
// @ts-expect-error testing invalid input
53+
expect(() => validateEventProcessorOptions(options)).toThrow(
54+
new Error("privateAttributes must be an array of strings")
55+
);
56+
});
57+
58+
test("should throw an error if eventsUri is not a string", () => {
59+
const options = { eventsUri: 12345 };
60+
// @ts-expect-error testing invalid input
61+
expect(() => validateEventProcessorOptions(options)).toThrow(
62+
new Error("eventsUri must be a string")
63+
);
64+
});
65+
66+
test("should throw an error if eventsUri is not a valid URL", () => {
67+
const options = { eventsUri: "invalid-url" };
68+
expect(() => validateEventProcessorOptions(options)).toThrow(
69+
new Error("eventsUri must be a valid URL")
70+
);
71+
});
72+
73+
test("should throw an error if eventProcessingIntervalSeconds is not a positive number", () => {
74+
const options = { eventProcessingIntervalSeconds: -1 };
75+
expect(() => validateEventProcessorOptions(options)).toThrow(
76+
new Error("eventProcessingIntervalSeconds must be a positive number")
77+
);
78+
});
79+
80+
test("should throw an error if eventCapacity is not a positive number", () => {
81+
const options = { eventCapacity: 0 };
82+
expect(() => validateEventProcessorOptions(options)).toThrow(
83+
new Error("eventCapacity must be a positive number")
84+
);
85+
});
86+
87+
test("should throw an error if eventCapacity is less than eventBatchSize", () => {
88+
const options = { eventCapacity: 10, eventBatchSize: 20 };
89+
expect(() => validateEventProcessorOptions(options)).toThrow(
90+
new Error("eventCapacity must be greater than or equal to eventBatchSize")
91+
);
92+
});
93+
94+
test("should throw an error if eventBatchSize is not a positive number", () => {
95+
const options = { eventBatchSize: -1 };
96+
expect(() => validateEventProcessorOptions(options)).toThrow(
97+
new Error("eventBatchSize must be a positive number")
98+
);
99+
});
100+
101+
test("should throw an error if eventBatchSize exceeds 4000", () => {
102+
const options = { eventBatchSize: 5000 };
103+
expect(() => validateEventProcessorOptions(options)).toThrow(
104+
new Error("eventBatchSize must be less than or equal to 4000")
105+
);
106+
});
107+
108+
test("should not throw an error if options are valid", () => {
109+
const validOptions = {
110+
allAttributesPrivate: true,
111+
privateAttributes: ["attr1", "attr2"],
112+
eventsUri: "https://valid.url",
113+
eventProcessingIntervalSeconds: 10,
114+
eventCapacity: 100,
115+
eventBatchSize: 50,
116+
};
117+
expect(() => validateEventProcessorOptions(validOptions)).not.toThrow();
118+
});
119+
});

src/sdk/EventProcessor.ts

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ export class EventProcessor {
1818
constructor(
1919
private readonly eventStore: ComponentApi["events"],
2020
private readonly ctx: RunMutationCtx,
21-
private readonly sdkKey: string
21+
private readonly sdkKey: string,
22+
private readonly options?: EventProcessorOptions
2223
) {}
2324

2425
sendEvent(inputEvent: object) {
2526
return this.ctx.runMutation(this.eventStore.storeEvents, {
2627
payloads: [JSON.stringify(inputEvent)],
2728
sdkKey: this.sdkKey,
29+
options: this.options,
2830
});
2931
}
3032

@@ -94,3 +96,94 @@ export const sendEvents = async (
9496

9597
await eventProcessor.flush();
9698
};
99+
100+
export type EventProcessorOptions = {
101+
allAttributesPrivate?: boolean;
102+
privateAttributes?: string[];
103+
104+
// The URL to send events to. If not provided, the default LaunchDarkly
105+
// events endpoint will be used. Most users will not need to set this.
106+
eventsUri?: string;
107+
// The number of events to store in LaunchDarkly, awaiting processing
108+
// to be sent to LaunchDarkly. If the number of events exceeds this
109+
// value, new events will be dropped.
110+
eventCapacity?: number;
111+
// The number of events to send to LaunchDarkly in a single batch.
112+
eventBatchSize?: number;
113+
// How often to process events, in seconds.
114+
eventProcessingIntervalSeconds?: number;
115+
};
116+
117+
export const validateEventProcessorOptions = (
118+
options?: EventProcessorOptions
119+
) => {
120+
if (!options) {
121+
return;
122+
}
123+
124+
if (
125+
options.allAttributesPrivate !== undefined &&
126+
typeof options.allAttributesPrivate !== "boolean"
127+
) {
128+
throw new Error("allAttributesPrivate must be a boolean");
129+
}
130+
if (
131+
options.privateAttributes !== undefined &&
132+
!Array.isArray(options.privateAttributes)
133+
) {
134+
throw new Error("privateAttributes must be an array of strings");
135+
}
136+
if (options.eventsUri !== undefined) {
137+
if (typeof options.eventsUri !== "string") {
138+
throw new Error("eventsUri must be a string");
139+
}
140+
try {
141+
new URL(options.eventsUri);
142+
} catch {
143+
throw new Error("eventsUri must be a valid URL");
144+
}
145+
}
146+
147+
if (options.eventProcessingIntervalSeconds !== undefined) {
148+
if (
149+
typeof options.eventProcessingIntervalSeconds !== "number" ||
150+
options.eventProcessingIntervalSeconds <= 0
151+
) {
152+
throw new Error(
153+
"eventProcessingIntervalSeconds must be a positive number"
154+
);
155+
}
156+
}
157+
158+
if (options.eventCapacity !== undefined) {
159+
if (
160+
typeof options.eventCapacity !== "number" ||
161+
options.eventCapacity <= 0
162+
) {
163+
throw new Error("eventCapacity must be a positive number");
164+
}
165+
166+
if (
167+
options.eventBatchSize !== undefined &&
168+
options.eventCapacity <= options.eventBatchSize
169+
) {
170+
throw new Error(
171+
"eventCapacity must be greater than or equal to eventBatchSize"
172+
);
173+
}
174+
}
175+
176+
if (options.eventBatchSize !== undefined) {
177+
if (
178+
typeof options.eventBatchSize !== "number" ||
179+
options.eventBatchSize <= 0
180+
) {
181+
throw new Error("eventBatchSize must be a positive number");
182+
}
183+
184+
// Do not allow eventBatchSize to exceed 4000 to respect Convex limits.
185+
if (options.eventBatchSize > 4000) {
186+
throw new Error("eventBatchSize must be less than or equal to 4000");
187+
}
188+
}
189+
};

src/sdk/LaunchDarkly.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@ import {
99
import { createPlatformInfo } from "./createPlatformInfo";
1010
import ConvexCrypto from "./crypto";
1111
import { FeatureStore } from "./FeatureStore";
12-
import { EventProcessor } from "./EventProcessor";
12+
import {
13+
EventProcessor,
14+
EventProcessorOptions,
15+
validateEventProcessorOptions,
16+
} from "./EventProcessor";
1317
import { RunMutationCtx, RunQueryCtx } from "../component/types";
1418
import { ComponentApi } from "./useApi";
1519

@@ -21,7 +25,7 @@ export class LaunchDarkly extends LDClientImpl {
2125
application?: LDOptions["application"];
2226
sendEvents?: boolean;
2327
LAUNCHDARKLY_SDK_KEY?: string;
24-
}
28+
} & EventProcessorOptions
2529
) {
2630
const { store, events } = component;
2731
const logger = new BasicLogger({
@@ -56,7 +60,8 @@ export class LaunchDarkly extends LDClientImpl {
5660
// We can only send events if the context has a runMutation function.
5761
// This exists in Convex mutations and actions, but not in queries.
5862
if ("runMutation" in ctx && sendEvents) {
59-
const eventProcessor = new EventProcessor(events, ctx, sdkKey);
63+
validateEventProcessorOptions(options);
64+
const eventProcessor = new EventProcessor(events, ctx, sdkKey, options);
6065
// @ts-expect-error We are setting the eventProcessor directly here.
6166
this.eventProcessor = eventProcessor;
6267
}

0 commit comments

Comments
 (0)