Skip to content

Commit

Permalink
Events RI: add example with JSON serialization (#9639)
Browse files Browse the repository at this point in the history
  • Loading branch information
adutra authored Sep 26, 2024
1 parent c7cb7b5 commit 6cb207e
Show file tree
Hide file tree
Showing 15 changed files with 1,961 additions and 789 deletions.
67 changes: 39 additions & 28 deletions events/ri/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,32 @@ Two implementations of the SPI module can be found in this module:

1. `org.projectnessie.events.ri.console.PrintingEventSubscriber` - a (very) simple implementation
that prints events to the console.
2. `org.projectnessie.events.ri.kafka.KafkaEventSubscriber` - an implementation that publishes
events to a Kafka topic.
2. `org.projectnessie.events.ri.kafka.KafkaAvroEventSubscriber` - an implementation that publishes
events to a Kafka topic using Avro.
3. `org.projectnessie.events.ri.kafka.KafkaJsonEventSubscriber` - an implementation that publishes
events to a Kafka topic using JSON.

## PrintingEventSubscriber
## Printing Subscriber

The `PrintingEventSubscriber` implementation is a very simple implementation that prints events to the console. It is
intended to illustrate the basic concepts of an SPI implementation. Do NOT use in production.

## KafkaEventSubscriber
## Kafka Subscribers

`KafkaEventSubscriber` illustrates how to handle events in a more realistic scenario. It is intended to be used as a
starting point for implementing a real subscriber based on [Apache Kafka].
`KafkaAvroEventSubscriber` illustrates how to handle events using Avro to serizalize and deserialize messages,
and using a Schema Registry.

[Apache Kafka]:https://kafka.apache.org/
`KafkaJsonEventSubscriber` illustrates how to handle events using JSON to serizalize and deserialize events.

Both subscribers are intended to be used as a starting point for implementing a real-world subscriber based on
[Apache Kafka](https://kafka.apache.org/).

### Configuration

The `KafkaEventSubscriber` implementation reads its configuration from a properties file. The default location of this
file is `./nessie-kafka.properties`, but this can be changed by setting the `nessie.events.config.file` system property
or the `NESSIE_EVENTS_CONFIG_FILE` environment variable to a different location.
The `KafkaAvroEventSubscriber` and `KafkaJsonEventSubscriber` implementations read their
configuration from a properties file. The default location of this file is
`./nessie-kafka.properties`, but this can be changed by setting the `nessie.events.config.file`
system property or the `NESSIE_EVENTS_CONFIG_FILE` environment variable to a different location.

The properties file should contain all the desired Kafka producer configuration options, as well as the following
subscriber-specific options:
Expand All @@ -36,13 +42,14 @@ subscriber-specific options:
- `nessie.events.repository-ids` - a comma-separated list of repository ids to watch. If not set, all repositories will
be watched. See below for more details.

A template configuration file can be found in `src/main/resources/nessie-kafka.template.properties`.
A template configuration file can be found in `src/main/resources/nessie-kafka-avro.template.properties`
and `src/main/resources/nessie-kafka-json.template.properties`.

### Serialization and deserialization

`KafkaEventSubscriber` uses the [Avro] library to define a domain model that is specific to the subscriber, and is not
tied in any way to Nessie's. This is indeed the recommended approach, as it allows the subscriber to define its own
domain model and schema, and to evolve them independently of Nessie.
`KafkaAvroEventSubscriber` uses the [Avro] library to define a domain model that is specific to the subscriber, and is not
tied in any way to Nessie's. This is the recommended approach, when you want to decouple Nessie's domain model
from the subscriber's domain model, at the cost of having to maintain the Avro schema files.

[Avro]:https://avro.apache.org/

Expand All @@ -53,11 +60,15 @@ used during build to generate Java classes from the Avro schema files. The gener
Note: for the consumer to be able to deserialize the messages into the same generated classes, it must be configured
with `specific.avro.reader=true`.

### Schema and polymorphism
`KafkaJsonEventSubscriber` uses Jackson to serialize and deserialize events to and from JSON. This is a simpler approach
than using Avro, but it requires the subscriber to have a good understanding of Nessie's domain model, as the JSON
representation of the events is directly tied to it.

### Schema and polymorphism with Avro messages

Nessie's Events API is polymorphic, in the sense that it supports different types of events. The `KafkaEventSubscriber`
Nessie's Events API is polymorphic, in the sense that it supports different types of events. The `KafkaAvroEventSubscriber`
implementation illustrates how to handle different types of events in the same subscriber, pushing to a single Kafka
topic.
topic.

In this case, the subscriber aggregates Nessie's event types into three kinds of Avro messages: `CommitEvent`,
`ReferenceEvent`, and `OperationEvent`.
Expand All @@ -79,9 +90,9 @@ Yokota's follow-up article] for a good overview.

### Topic partitioning and message ordering

The `KafkaEventSubscriber` implementation illustrates how to partition the destination Kafka topic _by repository id and
reference name_: this should allow downstream consumers to process events for a given reference in the order they were
produced.
`KafkaAvroEventSubscriber` and `KafkaJsonEventSubscriber` both illustrate how to partition the destination Kafka
topic _by repository id and reference name_: this should allow downstream consumers to process events for a given
reference in the order they were produced.

Note that the order in which events are produced to a topic does not necessarily reflect the order in which Nessie's
version store recorded them. As stated in Nessie's [Events API design document], strict ordering of events is not
Expand All @@ -101,7 +112,7 @@ commit timestamp.
Producer-side filtering is possible by changing the implementation of either the `getEventFilter()` or
`getEventTypeFilter()` methods.

To illustrate this, `KafkaEventSubscriber` uses its `EventTypeFilter` to completely ignore merge and transplant events.
To illustrate this, `KafkaAvroEventSubscriber` uses its `EventTypeFilter` to completely ignore merge and transplant events.
(Also note: merge and transplant events also trigger commit events, so it's still possible to capture what happened in a
branch by listening to commit events only.)

Expand All @@ -112,26 +123,26 @@ for example one could configure the subscribe to watch only certain branches or

### Using headers efficiently

The `KafkaEventSubscriber` implementation also illustrates how to use Kafka headers to pass additional information along
Both `KafkaAvroEventSubscriber` and `KafkaJsonEventSubscriber` show how to use Kafka headers to pass additional information along
with the message. In this case, the subscriber adds the repository id, the user and the event creation timestamp to the
message headers, allowing consumers to filter or sort messages based on these values.

### Other considerations

The `KafkaEventSubscriber` implementation also illustrates how to handle events in a non-blocking way, by publishing
`KafkaAvroEventSubscriber` and `KafkaJsonEventSubscriber` also illustrate how to handle events in a non-blocking way, by publishing
messages without waiting for the broker's acknowledgement. This is achieved by calling the `KafkaProducer#send` method
in asynchronous mode, with a completion callback. If your implementation needs to wait for the broker's acknowledgement
synchronously, then you should report to Nessie that this subscriber is going to be blocking by returning `true` from
the `isBlocking()` method.

Finally, the `KafkaEventSubscriber` implementation does not do any initialization in its constructor. This is because
Finally, both implementations avoid doing any initialization in their constructors. This is because
the constructor is called during the SPI module's initialization, which happens before Nessie is fully started. Instead,
the subscriber's initialization should be done in the `onSubscribe` method, which is called after Nessie has started
(but before receiving events).

### Testing

The `org.projectnessie.events.ri.kafka.ITKafkaEventSubscriber` integration test provides a real-world example of how to
test the subscriber. It uses a real Kafka broker and schema registry. It then plays a sequence of events that is
representative of what Nessie would send to the subscriber when a user creates a branch, commits to it, and then deletes
it. Finally, it checks that the subscriber has received the expected messages.
The integration tests provide a real-world example of how to test the subscriber. They use a real
Kafka broker, and for Avro, also a schema registry. It then plays a sequence of events that is
representative of what Nessie would send to the subscriber when a user creates a branch, commits to
it, and then deletes it. Finally, it checks that the subscriber has received the expected messages.
4 changes: 4 additions & 0 deletions events/ri/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,11 @@ dependencies {

implementation(libs.slf4j.api)
implementation(libs.kafka.clients)

// Avro serialization examples
implementation(libs.avro)

// Jackson serialization examples
implementation(platform(libs.jackson.bom))
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("com.fasterxml.jackson.core:jackson-annotations")
Expand All @@ -44,6 +47,7 @@ dependencies {
testImplementation(libs.kafka.streams.test.utils)
testImplementation(libs.logback.classic)
testImplementation(libs.kafka.avro.serializer)
testImplementation(libs.kafka.json.schema.serializer)

testCompileOnly(libs.microprofile.openapi)
testCompileOnly(libs.immutables.value.annotations)
Expand Down
Loading

0 comments on commit 6cb207e

Please sign in to comment.