Skip to content

Commit

Permalink
feat: better error handling (#20)
Browse files Browse the repository at this point in the history
* refactor: improve consumer interface

* refactor: define consumer record mappers

* feat: introduce ConsumerApp and enhance MessageRouter with new combinators

* fix: update import statement

* test: cover consumer with tests

* docs: add changeset
  • Loading branch information
floydspace authored Nov 22, 2024
1 parent d7d0ca5 commit 4c5f113
Show file tree
Hide file tree
Showing 19 changed files with 1,005 additions and 274 deletions.
5 changes: 5 additions & 0 deletions .changeset/old-badgers-fail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect-kafka": patch
---

implement an ability to catch errors raised in MessageRouter handler
4 changes: 4 additions & 0 deletions .projen/deps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion .projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ project.addPackageIgnore("/flake.nix");
project.addPackageIgnore("/docker-compose.yml");

// Effect dependencies
project.addDevDeps("@effect/platform-node", "@effect/vitest", "@fluffy-spoon/substitute");
project.addDevDeps("@effect/platform", "@effect/platform-node", "@effect/vitest", "@fluffy-spoon/substitute");
project.addPeerDeps("effect");

// Kafka dependencies
Expand Down
1 change: 1 addition & 0 deletions package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 29 additions & 47 deletions src/ConfluentKafka/ConfluentKafkaJSInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,31 @@
* @since 0.2.0
*/
import { KafkaJS } from "@confluentinc/kafka-javascript";
import { Chunk, Config, Effect, Fiber, Layer, Queue, Runtime, Stream } from "effect";
import { Config, Effect, Layer, Queue, Runtime } from "effect";
import * as Consumer from "../Consumer.js";
import * as ConsumerRecord from "../ConsumerRecord.js";
import * as KafkaInstance from "../KafkaInstance.js";
import type * as MessageRouter from "../MessageRouter.js";
import * as Producer from "../Producer.js";
import * as internal from "./internal/confluentKafkaJSInstance.js";

const mapBatchToConsumerRecords = (payload: KafkaJS.EachBatchPayload): ConsumerRecord.ConsumerRecord[] =>
payload.batch.messages.map((message) =>
ConsumerRecord.make({
topic: payload.batch.topic,
partition: payload.batch.partition,
highWatermark: payload.batch.highWatermark,
key: message.key,
value: message.value,
timestamp: message.timestamp,
attributes: message.attributes,
offset: message.offset,
headers: message.headers,
size: message.size,
heartbeat: () => Effect.promise(() => payload.heartbeat()),
commit: () => Effect.promise(() => payload.commitOffsetsIfNecessary()),
}),
);

/**
* @since 0.2.0
* @category constructors
Expand All @@ -33,57 +50,22 @@ export const make = (config: KafkaJS.KafkaConfig): Effect.Effect<KafkaInstance.K
Effect.gen(function* () {
const consumer = yield* internal.connectConsumerScoped(kafka, options as KafkaJS.ConsumerConfig);

const subscribeAndConsume = (topics: MessageRouter.Route.Path[]) =>
Effect.gen(function* () {
const runtime = yield* Effect.runtime();

yield* internal.subscribe(consumer, { topics });

const queue = yield* Queue.bounded<ConsumerRecord.ConsumerRecord>(1);

const eachBatch: KafkaJS.EachBatchHandler = async (payload) => {
await Queue.offerAll(
queue,
payload.batch.messages.map((message) =>
ConsumerRecord.make({
topic: payload.batch.topic,
partition: payload.batch.partition,
highWatermark: payload.batch.highWatermark,
key: message.key,
value: message.value,
timestamp: message.timestamp,
attributes: message.attributes,
offset: message.offset,
headers: message.headers,
size: message.size,
heartbeat: () => Effect.promise(() => payload.heartbeat()),
commit: () => Effect.promise(() => payload.commitOffsetsIfNecessary()),
}),
),
).pipe(Runtime.runPromise(runtime));
};

yield* internal.consume(consumer, { eachBatch });

return queue;
});

return Consumer.make({
run: (app) =>
subscribe: (topics) => internal.subscribe(consumer, { topics }),
consume: () =>
Effect.gen(function* () {
const topics = Chunk.toArray(app.routes).map((route) => route.topic);
const queue = yield* Queue.bounded<ConsumerRecord.ConsumerRecord>(1);

const runtime = yield* Effect.runtime();

const queue = yield* subscribeAndConsume(topics);
const eachBatch: KafkaJS.EachBatchHandler = async (payload) => {
await Queue.offerAll(queue, mapBatchToConsumerRecords(payload)).pipe(Runtime.runPromise(runtime));
};

const fiber = yield* app.pipe(
Effect.provideServiceEffect(ConsumerRecord.ConsumerRecord, Queue.take(queue)),
Effect.forever,
Effect.fork,
);
yield* internal.consume(consumer, { eachBatch });

yield* Fiber.join(fiber);
return queue;
}),
runStream: (topic) => subscribeAndConsume([topic]).pipe(Effect.map(Stream.fromQueue), Stream.flatten()),
});
}),
});
Expand Down
83 changes: 32 additions & 51 deletions src/ConfluentKafka/ConfluentRdKafkaInstance.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
/**
* @since 0.2.0
*/
import { GlobalConfig } from "@confluentinc/kafka-javascript";
import { Array, Chunk, Config, Effect, Fiber, Layer, Queue, Runtime, Stream } from "effect";
import type { GlobalConfig, KafkaConsumer, Message } from "@confluentinc/kafka-javascript";
import { Array, Config, Effect, Layer, Queue, Runtime } from "effect";
import * as Consumer from "../Consumer.js";
import * as ConsumerRecord from "../ConsumerRecord.js";
import * as KafkaInstance from "../KafkaInstance.js";
import type * as MessageRouter from "../MessageRouter.js";
import * as Producer from "../Producer.js";
import * as internal from "./internal/confluentRdKafkaInstance.js";

const mapToConsumerRecord = (payload: Message, consumer: KafkaConsumer): ConsumerRecord.ConsumerRecord =>
ConsumerRecord.make({
topic: payload.topic,
partition: payload.partition,
highWatermark: "-1001", // Not supported
key: typeof payload.key === "string" ? Buffer.from(payload.key) : (payload.key ?? null),
value: payload.value,
headers: payload.headers?.reduce((acc, header) => {
const [key] = Object.keys(header);
acc[key] = header[key];
return acc;
}, {}),
timestamp: payload.timestamp?.toString() ?? "",
offset: payload.offset.toString(),
attributes: 0,
size: payload.size,
heartbeat: () => Effect.void, // Not supported
commit: () => Effect.sync(() => consumer.commit()),
});

/**
* @since 0.4.1
* @category constructors
Expand Down Expand Up @@ -141,59 +160,21 @@ export const make = (config: GlobalConfig): Effect.Effect<KafkaInstance.KafkaIns

const consumer = yield* internal.connectConsumerScoped(consumerConfig);

const subscribeAndConsume = (topics: MessageRouter.Route.Path[]) =>
Effect.gen(function* () {
const runtime = yield* Effect.runtime();

yield* internal.subscribeScoped(consumer, topics);

const queue = yield* Queue.bounded<ConsumerRecord.ConsumerRecord>(1);

const eachMessage: internal.ConsumerHandler = (payload) =>
Queue.offer(
queue,
ConsumerRecord.make({
topic: payload.topic,
partition: payload.partition,
highWatermark: "-1001", // Not supported
key: typeof payload.key === "string" ? Buffer.from(payload.key) : (payload.key ?? null),
value: payload.value,
headers: payload.headers?.reduce((acc, header) => {
const [key] = Object.keys(header);
acc[key] = header[key];
return acc;
}, {}),
timestamp: payload.timestamp?.toString() ?? "",
offset: payload.offset.toString(),
attributes: 0,
size: payload.size,
heartbeat: () => Effect.void, // Not supported
commit: () => Effect.sync(() => consumer.commit()),
}),
).pipe(Runtime.runFork(runtime));

yield* internal.consume(consumer, { eachMessage });

return queue;
});

return Consumer.make({
run: (app) =>
subscribe: (topics) => internal.subscribeScoped(consumer, topics),
consume: () =>
Effect.gen(function* () {
const topics = Chunk.toArray(app.routes).map((route) => route.topic);
const queue = yield* Queue.bounded<ConsumerRecord.ConsumerRecord>(1);

const runtime = yield* Effect.runtime();

const queue = yield* subscribeAndConsume(topics);
const eachMessage: internal.ConsumerHandler = (payload) =>
Queue.offer(queue, mapToConsumerRecord(payload, consumer)).pipe(Runtime.runFork(runtime));

const fiber = yield* app.pipe(
Effect.provideServiceEffect(ConsumerRecord.ConsumerRecord, Queue.take(queue)),
Effect.forever,
Effect.fork,
);
yield* internal.consume(consumer, { eachMessage });

yield* Fiber.join(fiber);
}).pipe(Effect.scoped),
runStream: (topic) =>
subscribeAndConsume([topic]).pipe(Effect.map(Stream.fromQueue), Stream.scoped, Stream.flatten()),
return queue;
}),
});
}),
});
Expand Down
47 changes: 34 additions & 13 deletions src/Consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,8 @@ export type TypeId = typeof TypeId;
* @since 0.1.0
* @category models
*/
export interface Consumer {
export interface Consumer extends internal.ConsumerConstructorProps {
readonly [TypeId]: TypeId;
readonly run: {
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
): Effect.Effect<void, never, Exclude<R, ConsumerRecord.ConsumerRecord>>;
};
readonly runStream: {
(path: MessageRouter.Route.Path): Stream.Stream<ConsumerRecord.ConsumerRecord>;
};
}

/**
Expand Down Expand Up @@ -151,10 +143,7 @@ export declare namespace Consumer {
* @since 0.1.0
* @category constructors
*/
export const make: (options: {
readonly run: (app: MessageRouter.MessageRouter) => Effect.Effect<void>;
readonly runStream: (path: MessageRouter.Route.Path) => Stream.Stream<ConsumerRecord.ConsumerRecord>;
}) => Consumer = internal.make;
export const make: (options: internal.ConsumerConstructorProps) => Consumer = internal.make;

/**
* @since 0.3.1
Expand Down Expand Up @@ -197,6 +186,38 @@ export const serve: {
>;
} = internal.serve;

/**
* @since 0.5.0
* @category accessors
*/
export const serveOnceEffect: {
/**
* @since 0.5.0
* @category accessors
*/
(
options: Consumer.ConsumerOptions,
): <E, R>(
app: MessageRouter.MessageRouter<E, R>,
) => Effect.Effect<
void,
Error.ConnectionException,
Scope.Scope | KafkaInstance.KafkaInstance | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
/**
* @since 0.5.0
* @category accessors
*/
<E, R>(
app: MessageRouter.MessageRouter<E, R>,
options: Consumer.ConsumerOptions,
): Effect.Effect<
void,
Error.ConnectionException,
Scope.Scope | KafkaInstance.KafkaInstance | Exclude<R, ConsumerRecord.ConsumerRecord>
>;
} = internal.serveOnceEffect;

/**
* @since 0.1.0
* @category accessors
Expand Down
11 changes: 11 additions & 0 deletions src/ConsumerApp.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/**
* @since 0.5.0
*/
import { Effect } from "effect";
import type * as ConsumerRecord from "./ConsumerRecord.js";

/**
* @since 0.1.0
* @category models
*/
export type Default<E = never, R = never> = Effect.Effect<void, E, R | ConsumerRecord.ConsumerRecord>;
6 changes: 6 additions & 0 deletions src/ConsumerRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,9 @@ export declare namespace ConsumerRecord {
* @category constructors
*/
export const make: (payload: internal.ConsumerRecordConstructorProps) => ConsumerRecord = internal.make;

/**
* @since 0.5.0
* @category constructors
*/
export const empty: ConsumerRecord = internal.empty;
Loading

0 comments on commit 4c5f113

Please sign in to comment.