A framework around kafka.js to transparently use Schema Registry and create an application that consumes, produces, and reacts to different kafka topics. Supports consumption in batches or in parallel. Statically define and verify the schemas / message types in TypeScript
Packages:
- @ovotech/avro-kafkajs - wrapper around kafka.js to use Schema Registry
- @ovotech/avro-ts - Generate typescript from avro schemas. Without cli dependencies.
- @ovotech/avro-ts-cli - A cli interface for
@ovotech/avro-ts
- @ovotech/blaise | - @ovotech/castle combined with @ovotech/avro-mock-generator
- @ovotech/castle - core service server
- @ovotech/castle-cli - cli for interacting with kafka and Schema Registry
- @ovotech/castle-stream - A castle implementation using streams as input instead of kafka consumers. Useful backfillers and the like.
yarn add @ovotech/castle
import { createCastle, produce, consumeEachMessage } from '@ovotech/castle';
import { Event, EventSchema } from './avro';
// Define producers as pure functions
// With statically setting the typescript types and avro schemas
const mySender = produce<Event>({ topic: 'my-topic-1', schema: EventSchema });
// Define consumers as pure functions
// With statically setting which types it will accept
const eachEvent = consumeEachMessage<Event>(async ({ message }) => {
console.log(message.value);
});
const main = async () => {
const castle = createCastle({
schemaRegistry: { uri: 'http://localhost:8081' },
kafka: { brokers: ['localhost:29092'] },
consumers: [{ topic: 'my-topic-1', groupId: 'my-group-1', eachMessage: eachEvent }],
});
// Start all consumers and producers
await castle.start();
await mySender(castle.producer, [{ value: { field1: 'my-string' } }]);
};
main();
You can connect to multiple topics, each of which is will have its own independent consumer group. More about castle package in packages/castle/README.md
yarn global add @ovotech/castle-cli
You can read about the various commands available with
castle --help
There are 4 main subcommand groups:
- castle topic: Subcommands for searching and manipulating kafka topics, as well as producing and consuming events from them.
- castle schema: Subcommands for getting schema versions of kafka topics.
- castle config: Subcommands to create / edit connection configs to kafka brokers and schema registers, that can be used by other commands.
- castle group: Subcommands to manipulate consumer group offsets.
You can configure access to the kafka to your server named uat
, if you have the tls key, cert and certificate authority as text files. The schema registry is set as a url, any username and password can be set with a url provided auth like: http://user:pass@localhost:8081
. The config file is saved to $HOME/.castle-cli/
folder.
castle config set uat --kafka-broker localhost:3203 --key private.pem --ca ca.pem --cert cert.pem --schema-registry http://localhost:8081
After that is set you can use it in any command by stating --config uat
(or -C uat
):
castle schema search my-topic --config uat
castle schema show my-topic-full-name --config uat
castle topic search my-topic --config uat
castle topic consume my-topic-full-name --config uat
Using it without a specified config would connect to the default local kafka server.
- @ovotech/avro-timestamp-millis - Date stored as the number of milliseconds since epoch
- @ovotech/avro-epoch-days - Date stored as the number of days since epoch
- @ovotech/avro-decimal - Decimal object value as raw bytes
You can run the tests with:
yarn test
Style is maintained with prettier and eslint
yarn lint
Deployment is preferment by lerna automatically on merge / push to master, but you'll need to bump the package version numbers yourself. Only updated packages with newer versions will be pushed to the npm registry.
Have a bug? File an issue with a simple example that reproduces this so we can take a look & confirm.
Want to make a change? Submit a PR, explain why it's useful, and make sure you've updated the docs (this file) and the tests (see test folder).
This project is licensed under Apache 2 - see the LICENSE file for details
It's Kafka's greatest work :)