Skip to content

Commit

Permalink
Merge branch '4.5'
Browse files Browse the repository at this point in the history
  • Loading branch information
MGathier committed Jul 20, 2022
2 parents ff4f296 + c75a6e6 commit 46ef98d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 9 deletions.
14 changes: 5 additions & 9 deletions extensions/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

Apache Kafka is a very popular system for publishing and consuming events. Its architecture is fundamentally different from most messaging systems and combines speed with reliability.

Axon provides an extension dedicated to _publishing_ and _receiving_ event messages from Kafka. The Kafka Extension should be regarded as an alternative approach to distributing events, besides \(the default\) Axon Server.
Axon provides an extension dedicated to _publishing_ and _receiving_ event messages from Kafka. The Kafka Extension should be regarded as an alternative approach to distributing events, besides \(the default\) Axon Server. It's also possible to use the extension to stream events from Kafka to Axon server, or the other way around.

The implementation of the extension can be found [here](https://github.com/AxonFramework/extension-kafka). The shared repository also contains a [sample project](https://github.com/AxonFramework/extension-kafka/tree/master/kafka-axon-example) using the extension.

To use the Kafka Extension components from Axon, make sure the `axon-kafka` module is available on the classpath. Using the extension requires setting up and configuring Kafka following your project's requirements. How this is achieved is outside of the scope of this reference guide and should be found in Kafka's [documentation](https://kafka.apache.org/).

{% hint style="info" %}
Note that Kafka is a perfectly fine event distribution mechanism, but it is not a good fit for an event store. Along those lines this extension **only** provides the means to distributed Axon's events through Kafka. Due to this the extension cannot be used to event source aggregates, as this requires an event store implementation. Therefore we recommend using a built-for-purpose event store like [Axon Server](../axon-server-introduction.md), or alternatively an RDBMS based \(the JPA or JDBC implementations for example\).
Note that Kafka is a perfectly fine event distribution mechanism, but it is not an event store. Along those lines this extension **only** provides the means to distributed Axon's events through Kafka. Due to this the extension cannot be used to event source aggregates, as this requires an event store implementation. We recommend using a built-for-purpose event store like [Axon Server](../axon-server-introduction.md), or alternatively an RDBMS based \(the JPA or JDBC implementations for example\).
{% endhint %}

## Publishing Events to Kafka
Expand Down Expand Up @@ -81,7 +81,7 @@ public class KafkaEventPublicationConfiguration {
}
```

Lastly, we need to provide Axon's event messages to the `KafkaPublisher`. To that end a `KafkaEventPublisher` should be instantiate through the builder pattern. Remember to add the `KafkaEventPublisher` to an event processor implementation of your choice. It is recommended to use the `KafkaEventPublisher#DEFAULT_PROCESSING_GROUP` as the processing group name of the event processor to distinguish it from other event processors.
Lastly, we need to provide Axon's event messages to the `KafkaPublisher`. To that end a `KafkaEventPublisher` should be instantiated through the builder pattern. Remember to add the `KafkaEventPublisher` to an event processor implementation of your choice. It is recommended to use the `KafkaEventPublisher#DEFAULT_PROCESSING_GROUP` as the processing group name of the event processor to distinguish it from other event processors.

```java
public class KafkaEventPublicationConfiguration {
Expand Down Expand Up @@ -149,7 +149,7 @@ public class KafkaEventConsumptionConfiguration {

Using the `SubscribableKafkaMessageSource` means you are inclined to use a `SubscribingEventProcessor` to consume the events in your event handlers.

When using this source, Kafka's idea of pairing `Consumer` instances into "Consumer Groups" is used. This is strengthened by making the `groupId` upon source construction a _hard requirement_. To use a common `groupId` essentially means that the event-stream-workload can be shared on Kafka's terms, whereas a `SubscribingEventProcessor` typically works on it's own accord regardless of the number of instances. The workload sharing can be achieved by having several application instances with the same `groupId` or by adjusting the consumer count through the `SubscribableKafkaMessageSource`'s builder. The same benefit holds for [resetting](../axon-framework/events/event-processors/streaming.md#replaying-events) an event stream, which in Axon is reserved to the `TrackingEventProcessor`, but is now opened up through Kafka's own API's.
When using this source, Kafka's idea of pairing `Consumer` instances into "Consumer Groups" is used. This is strengthened by making the `groupId` upon source construction a _hard requirement_. To use a common `groupId` essentially means that the event-stream-workload can be shared on Kafka's terms, whereas a `SubscribingEventProcessor` typically works on its own accord regardless of the number of instances. The workload sharing can be achieved by having several application instances with the same `groupId` or by adjusting the consumer count through the `SubscribableKafkaMessageSource`'s builder. The same benefit holds for [resetting](../axon-framework/events/event-processors/streaming.md#replaying-events) an event stream, which in Axon is reserved to the `TrackingEventProcessor`, but is now opened up through Kafka's own API's.

Although the `SubscribableKafkaMessageSource` thus provides the niceties the tracking event processor normally provides, it does come with two catches:

Expand Down Expand Up @@ -218,22 +218,18 @@ If only a single subscribing event processor will be subscribed to the kafka mes

Using the `StreamableKafkaMessageSource` means you are inclined to use a `TrackingEventProcessor` to consume the events in your event handlers.

Where as the [subscribable kafka message source](kafka.md#consuming-events-with-a-subscribable-message-source) uses Kafka's idea of sharing the workload through multiple `Consumer` instances in the same "Consumer Group", the streamable approach enforces a **unique** consumer group per `Consumer` instance. Axon requires uniquely identified consumer group/`Consumer` pairs to \(1\) ensure event ordering and \(2\) to guarantee that each instance/thread receives the correct portion of the event stream during [parallel processing](../axon-framework/events/event-processors/streaming.md#parallel-processing). The distinct group id is derived by the `StreamableKafkaMessageSource` through a `groupIdPrefix` and a `groupdIdSuffixFactory`, which are adjustable through the source's builder.
Whereas the [subscribable kafka message source](kafka.md#consuming-events-with-a-subscribable-message-source) uses Kafka's idea of sharing the workload through multiple `Consumer` instances in the same "Consumer Group", the streamable approach doesn't use a consumer group, and assigns all available partitions.

```java
public class KafkaEventConsumptionConfiguration {
// ...
public StreamableKafkaMessageSource<String, byte[]> streamableKafkaMessageSource(List<String> topics,
String groupIdPrefix,
Supplier<String> groupIdSuffixFactory,
ConsumerFactory<String, byte[]> consumerFactory,
Fetcher<String, byte[], KafkaEventMessage> fetcher,
KafkaMessageConverter<String, byte[]> messageConverter,
int bufferCapacity) {
return StreamableKafkaMessageSource.<String, byte[]>builder()
.topics(topics) // Defaults to a collection of "Axon.Events"
.groupIdPrefix(groupIdPrefix) // Defaults to "Axon.Streamable.Consumer-"
.groupIdSuffixFactory(groupIdSuffixFactory) // Defaults to a random UUID
.consumerFactory(consumerFactory) // Hard requirement
.fetcher(fetcher) // Hard requirement
.messageConverter(messageConverter) // Defaults to a "DefaultKafkaMessageConverter"
Expand Down
4 changes: 4 additions & 0 deletions release-notes/rn-axon-server/rn-as-minor-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ This page provides a dedicated overview of patch releases for the Axon Server (E

## Release 4.5

### Release 4.5.19

* Security update: updated control database settings

### Release 4.5.18

* Performance improvements in replication process
Expand Down
4 changes: 4 additions & 0 deletions release-notes/rn-axon-server/rn-asse-minor-releases.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ This page provides a dedicated overview of patch releases for the Axon Server (S

### Release 4.5.13

* Security update: updated control database settings

### Release 4.5.13

* Reduced memory consumption during transactions
* Improved handling of out-of-memory exceptions
* Resolved a race condition in storing events that lead to delays in completing transactions
Expand Down

0 comments on commit 46ef98d

Please sign in to comment.