Skip to content

Commit eb015b9

Browse files
committed
added docs on listeners
1 parent e019d14 commit eb015b9

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
---
2+
layout: post
3+
toc: true
4+
title: Eventstore Listeners
5+
description: Notifying your application about Eventstore activity
6+
date: 2025-11-29 06:00:00
7+
categories: [Eventstore Documentation,Eventstore API]
8+
tags: [listener]
9+
---
10+
## EventStore Listeners
11+
12+
Listeners enable your application to react to activity in the EventStore without constantly polling. They provide notifications when events are appended or when readers update their processing bookmarks. This reactive approach is essential for building responsive event-driven systems.
13+
14+
In single-instance deployments, listeners help decouple components—one part appends events while another reacts asynchronously. But listeners become truly powerful in multi-node deployments where several application instances share the same storage backend. When one node appends events, all subscribed nodes across the cluster receive notifications, enabling distributed coordination without additional messaging infrastructure.
15+
16+
EventStore provides two categories of listeners:
17+
18+
**Append Listeners** notify you when new events are written to a stream. Choose between:
19+
- **Eventually Consistent Append Listeners**: Asynchronous, non-blocking notifications (recommended for most scenarios)
20+
- **Consistent Append Listeners**: Synchronous, transactional notifications for immediate processing
21+
22+
**Bookmark Listeners** notify you when readers update their processing position, useful for monitoring progress in distributed event processing pipelines. Only eventually consistent bookmark listeners are supported.
23+
24+
### Consistency Trade-offs
25+
26+
**Eventually consistent listeners** execute asynchronously after the operation completes. They don't block the append or bookmark operation, making them ideal for high-throughput scenarios. There's a small delay between the operation and the notification, and exceptions in listener code won't affect the original operation. PostgreSQL's LISTEN/NOTIFY mechanism efficiently propagates notifications across all connected nodes with minimal latency.
27+
28+
**Consistent listeners** execute synchronously within the append transaction itself. They receive full typed domain events immediately and can update in-memory projections transactionally. However, they block the append operation, reduce throughput, and any exceptions can cause the append to fail. They're only suitable when immediate consistency within the same process is absolutely required.
29+
30+
**Recommendation**: Use eventually consistent listeners whenever possible. They scale better, isolate failures, and work seamlessly across distributed deployments. Reserve consistent listeners for rare cases where transactional guarantees within a single process are essential.
31+
32+
## Eventually Consistent Append Listeners
33+
34+
Eventually consistent append listeners receive asynchronous notifications after events are appended. They receive an `EventReference` pointing to at least the last appended event, not the full event data, and not every event appended triggers a separate notification. This lightweight notification model allows you to query only the events you care about.
35+
36+
These listeners are ideal for:
37+
- Building eventually consistent read models and projections
38+
- Triggering background jobs or workflows
39+
- Notifying external systems of changes
40+
- Coordinating distributed event processors
41+
42+
```java
43+
// Create event stream
44+
EventStore eventStore = PostgresEventStorage.newBuilder().buildStore();
45+
EventStreamId streamId = EventStreamId.forContext("order");
46+
EventStream<OrderEvent> stream = eventStore.getEventStream(streamId, OrderEvent.class);
47+
48+
// Track last processed position
49+
AtomicReference<EventReference> lastProcessed = new AtomicReference<>();
50+
51+
// Subscribe to eventually consistent notifications
52+
stream.subscribe((EventReference atLeastUntil) -> {
53+
// Query new events since last processed
54+
stream.query(EventQuery.matchAll(), lastProcessed.get())
55+
.forEach(event -> {
56+
System.out.println("Processing: " + event.type());
57+
updateReadModel(event);
58+
});
59+
60+
// Update bookmark for resumability
61+
lastProcessed.set(atLeastUntil);
62+
stream.placeBookmark("order-processor", atLeastUntil, Tags.none());
63+
});
64+
```
65+
66+
> Multiple nodes running the same listener code will each receive notifications independently! Make sure to put a coordination mechanism in place to avoid that every node will try to update your (shared) readmodel.
67+
{: .prompt-warning }
68+
69+
## Consistent Append Listeners
70+
71+
Consistent append listeners receive synchronous notifications during the append transaction. Unlike eventually consistent listeners, they receive the full typed domain events immediately, as well ass all events appended.
72+
No querying of the eventstore is thus required to process the information in the newly appended events.
73+
74+
These listeners are appropriate for very specific scenarios like updating an in-memory caches transactionally.
75+
76+
Use them sparingly—they block the append operation and reduce system throughput.
77+
78+
```java
79+
EventStore eventStore = InMemoryEventStorage.newBuilder().buildStore();
80+
EventStreamId streamId = EventStreamId.forContext("customer").withPurpose("123");
81+
EventStream<CustomerEvent> stream = eventStore.getEventStream(streamId, CustomerEvent.class);
82+
83+
// In-memory cache updated transactionally
84+
Map<String, String> cache = new ConcurrentHashMap<>();
85+
86+
stream.subscribe((List<Event<CustomerEvent>> events) -> {
87+
events.forEach(event -> {
88+
// Process typed events immediately
89+
switch (event.data()) {
90+
case CustomerRegistered(String id, String name) ->
91+
cache.put(id, name);
92+
case CustomerNameChanged(String id, String newName) ->
93+
cache.put(id, newName);
94+
case CustomerChurned(String id) ->
95+
cache.remove(id);
96+
}
97+
});
98+
});
99+
100+
// Cache is updated synchronously before append completes
101+
stream.append(AppendCriteria.none(),
102+
Event.of(new CustomerRegistered("123", "Alice"), Tags.none()));
103+
```
104+
105+
Keep processing logic fast and simple.
106+
107+
## Eventually Consistent Bookmark Listeners
108+
109+
Bookmark listeners notify you when readers update their processing position by placing a bookmark. They're useful for monitoring distributed event processing systems, detecting lag, and coordinating multiple processors.
110+
111+
Use cases include:
112+
- Monitoring progress across multiple readers
113+
- Detecting stuck or slow processors
114+
- Implementing health checks for processing pipelines
115+
- Coordinating distributed event processors
116+
117+
```java
118+
EventStore eventStore = PostgresEventStorage.newBuilder().buildStore();
119+
EventStreamId streamId = EventStreamId.forContext("order");
120+
EventStream<OrderEvent> stream = eventStore.getEventStream(streamId, OrderEvent.class);
121+
122+
// Monitor all readers
123+
Map<String, EventReference> readerPositions = new ConcurrentHashMap<>();
124+
125+
stream.subscribe((String reader, EventReference processedUntil) -> {
126+
System.out.println("Reader '" + reader + "' processed up to: " +
127+
processedUntil.position());
128+
129+
readerPositions.put(reader, processedUntil);
130+
131+
// Detect processing lag
132+
long latestPosition = getLatestEventPosition(stream);
133+
long readerPosition = processedUntil.position();
134+
long lag = latestPosition - readerPosition;
135+
136+
if (lag > 1000) {
137+
alertOnProcessingLag(reader, lag);
138+
}
139+
});
140+
```
141+
142+
**Why no consistent bookmark listeners?**
143+
144+
Consistent (synchronous) bookmark listeners would be problematic. Bookmarks are typically placed after processing each event or batch, potentially hundreds or thousands of times per second. Blocking these operations with synchronous notifications would severely degrade throughput and create tight coupling on readers.
145+
146+
Additionally, bookmark placement is often performed by autonomous background processors that shouldn't be coupled to the rest of the system, and in many deployment scenarios wouldn't even be deployed in the same process as the other components, making it impossible to synchronously notify them. The asynchronous, eventually consistent model allows readers to work independently while still providing visibility into their progress for monitoring purposes. If you need immediate awareness of bookmark updates, query the bookmark directly using `getBookmark(reader)`.

0 commit comments

Comments
 (0)