|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +toc: true |
| 4 | +title: Bookmarking |
| 5 | +description: Reader Bookmarks on an EventStream |
| 6 | +date: 2025-11-29 06:00:00 |
| 7 | +categories: [Eventstore Documentation] |
| 8 | +tags: [bookmark] |
| 9 | +--- |
| 10 | +# Stream Readers and Bookmarking |
| 11 | + |
| 12 | +This guide covers how to track event processing progress using bookmarks, enabling resumable and eventually consistent event processing. |
| 13 | + |
| 14 | +## Stream Readers and Bookmarking |
| 15 | + |
| 16 | +In event sourcing, it's crucial to persist the point up to which a stream has been processed by a reader. This enables: |
| 17 | + |
| 18 | +- **Resumable processing**: Restart processing from the last checkpoint after crashes or deployments |
| 19 | +- **Eventually consistent read models**: Build derived views that catch up independently |
| 20 | +- **Processing guarantees**: Ensure events are processed at-least-once |
| 21 | +- **Progress monitoring**: Track how far behind real-time each reader is |
| 22 | + |
| 23 | +Eventual consistency allows readers and processors to move independently from the write side. A new read model that still has to catch up with history can process events at its own pace: |
| 24 | + |
| 25 | +```java |
| 26 | +// New projection starting from scratch |
| 27 | +CustomerSummary projection = new CustomerSummary("123"); |
| 28 | + |
| 29 | +// Process all historical events |
| 30 | +Projector.from(stream).towards(projection).build().run(); |
| 31 | + |
| 32 | +// Now caught up with history |
| 33 | +``` |
| 34 | + |
| 35 | +Bookmarks enable this independence by marking the last successfully processed event. |
| 36 | + |
| 37 | +## Placing a Bookmark |
| 38 | + |
| 39 | +The `placeBookmark()` method registers the `EventReference` of the last processed event for a named reader: |
| 40 | + |
| 41 | +```java |
| 42 | +stream.placeBookmark( |
| 43 | + "customer-summary-builder", // Reader identifier |
| 44 | + event.reference(), // Last processed event |
| 45 | + Tags.of("status", "active") // Metadata tags |
| 46 | +); |
| 47 | +``` |
| 48 | + |
| 49 | +**Reader Identifier**: A unique string identifying the processor (e.g., "order-projection", "email-sender-v2"). This is the key for the bookmark. |
| 50 | + |
| 51 | +**Event Reference**: The reference of the last event successfully processed by this reader. |
| 52 | + |
| 53 | +**Tags**: Optional metadata stored with the bookmark for observability. Common uses: |
| 54 | +- Processing status: `Tags.of("status", "active")` |
| 55 | +- Timestamp: `Tags.of("updated-at", Instant.now().toString())` |
| 56 | +- Version: `Tags.of("version", "2.0")` |
| 57 | +- Instance ID: `Tags.of("instance", hostname)` |
| 58 | + |
| 59 | +Tags are stored only for monitoring and debugging—they don't affect bookmark retrieval. |
| 60 | + |
| 61 | +**Creating vs. Updating**: There's no distinction. Calling `placeBookmark()` with the same reader ID creates the bookmark on first call and updates it on subsequent calls. The reader ID is the key. |
| 62 | + |
| 63 | +```java |
| 64 | +// First call - creates bookmark |
| 65 | +stream.placeBookmark("my-reader", ref1, Tags.none()); |
| 66 | + |
| 67 | +// Second call - updates bookmark for same reader |
| 68 | +stream.placeBookmark("my-reader", ref2, Tags.none()); |
| 69 | +``` |
| 70 | + |
| 71 | +## Retrieving a Bookmark |
| 72 | + |
| 73 | +The `getBookmark()` method retrieves the last bookmarked position for a reader: |
| 74 | + |
| 75 | +```java |
| 76 | +Optional<EventReference> bookmark = stream.getBookmark("customer-summary-builder"); |
| 77 | + |
| 78 | +bookmark.ifPresentOrElse( |
| 79 | + ref -> System.out.println("Resume from position: " + ref.position()), |
| 80 | + () -> System.out.println("No bookmark - start from beginning") |
| 81 | +); |
| 82 | +``` |
| 83 | + |
| 84 | +Returns `Optional.empty()` if no bookmark exists for that reader (first run scenario). |
| 85 | + |
| 86 | +## Typical Usage Scenario |
| 87 | + |
| 88 | +A complete example showing bookmark retrieval, event processing with append listeners, and periodic bookmark updates: |
| 89 | + |
| 90 | +```java |
| 91 | +EventStream<CustomerEvent> stream = eventstore.getEventStream( |
| 92 | + EventStreamId.forContext("customer").anyPurpose(), |
| 93 | + CustomerEvent.class |
| 94 | +); |
| 95 | + |
| 96 | +String readerName = "customer-analytics"; |
| 97 | + |
| 98 | +// Retrieve last bookmark |
| 99 | +Optional<EventReference> bookmark = stream.getBookmark(readerName); |
| 100 | +EventReference startAfter = bookmark.orElse(null); |
| 101 | + |
| 102 | +if (startAfter == null) { |
| 103 | + System.out.println("No bookmark found - processing from beginning"); |
| 104 | +} else { |
| 105 | + System.out.println("Resuming from position: " + startAfter.position()); |
| 106 | +} |
| 107 | + |
| 108 | +// Register listener for new events appended to stream |
| 109 | +stream.subscribe((EventReference atLeastUntil) -> { |
| 110 | + // Process new events since last bookmark |
| 111 | + stream.query(EventQuery.matchAll(), startAfter, Limit.to(100)) |
| 112 | + .forEach(event -> { |
| 113 | + processEvent(event); |
| 114 | + |
| 115 | + // Update bookmark after processing |
| 116 | + stream.placeBookmark( |
| 117 | + readerName, |
| 118 | + event.reference(), |
| 119 | + Tags.of("processed-at", Instant.now().toString()) |
| 120 | + ); |
| 121 | + }); |
| 122 | +}); |
| 123 | + |
| 124 | +// Initial catch-up: process all existing events |
| 125 | +stream.query(EventQuery.matchAll(), startAfter) |
| 126 | + .forEach(event -> { |
| 127 | + processEvent(event); |
| 128 | + stream.placeBookmark(readerName, event.reference(), Tags.none()); |
| 129 | + }); |
| 130 | +``` |
| 131 | + |
| 132 | +When the application starts: |
| 133 | +- **No bookmark exists**: Process from the beginning of the stream |
| 134 | +- **Bookmark exists**: Resume from the bookmarked position |
| 135 | + |
| 136 | +The append listener ensures new events are processed without delay. |
| 137 | + |
| 138 | +> This simple/naive implementation does one write operation (placeBookmark) per single event that is processed, which is far from ideal. |
| 139 | +The goal here was to show the basic setup, read further for ways to optimize this. |
| 140 | +{: .prompt-warning } |
| 141 | + |
| 142 | + |
| 143 | +## Typical Usage Scenario with Batching |
| 144 | + |
| 145 | +A single reader instance works through an event stream, placing bookmarks as savepoints: |
| 146 | + |
| 147 | +```java |
| 148 | +String readerName = "order-fulfillment"; |
| 149 | +EventStream<OrderEvent> stream = eventstore.getEventStream( |
| 150 | + EventStreamId.forContext("orders").anyPurpose(), |
| 151 | + OrderEvent.class |
| 152 | +); |
| 153 | + |
| 154 | +// Retrieve bookmark |
| 155 | +Optional<EventReference> lastProcessed = stream.getBookmark(readerName); |
| 156 | + |
| 157 | +// Query events after bookmark |
| 158 | +EventQuery query = EventQuery.matchAll(); |
| 159 | +EventReference after = lastProcessed.orElse(null); |
| 160 | +int batchSize = 500; |
| 161 | + |
| 162 | +while (true) { |
| 163 | + List<Event<OrderEvent>> batch = stream.query( |
| 164 | + query, |
| 165 | + after, |
| 166 | + Limit.to(batchSize) |
| 167 | + ).toList(); |
| 168 | + |
| 169 | + if (batch.isEmpty()) { |
| 170 | + break; // Caught up |
| 171 | + } |
| 172 | + |
| 173 | + // Process batch |
| 174 | + batch.forEach(event -> processEvent(event)); |
| 175 | + |
| 176 | + // Update bookmark after batch |
| 177 | + EventReference lastInBatch = batch.getLast().reference(); |
| 178 | + stream.placeBookmark( |
| 179 | + readerName, |
| 180 | + lastInBatch, |
| 181 | + Tags.of("batch-size", String.valueOf(batch.size())) |
| 182 | + ); |
| 183 | + |
| 184 | + after = lastInBatch; |
| 185 | +} |
| 186 | +``` |
| 187 | + |
| 188 | +**Restart Behavior**: If the process crashes and restarts, the bookmark is a persisted pointer indicating where to resume. The reader queries events after the bookmark and continues processing. |
| 189 | + |
| 190 | +**Integration with Query Parameters**: The bookmark works naturally with the `after` parameter of event queries. Simply pass the bookmarked reference as the `after` parameter. |
| 191 | + |
| 192 | +**Multi-Server Failover**: In a multi-server setup with failover, another logical instance of the same reader can take over by retrieving the same bookmark: |
| 193 | + |
| 194 | +```java |
| 195 | +// Server 1 goes down while processing |
| 196 | +// Bookmark persisted at position 1000 |
| 197 | + |
| 198 | +// Server 2 takes over |
| 199 | +Optional<EventReference> bookmark = stream.getBookmark("order-fulfillment"); |
| 200 | +// Resumes from position 1000 |
| 201 | +``` |
| 202 | + |
| 203 | +Multiple instances can't process the same reader concurrently (sequential processing), but failover enables high availability. |
| 204 | + |
| 205 | +**Idempotent Processing with Larger Batches**: When combined with idempotent event handling, bookmarks can be updated less frequently for better performance: |
| 206 | + |
| 207 | +```java |
| 208 | +int batchSize = 1000; |
| 209 | +int bookmarkEvery = 100; // Update bookmark every 100 events |
| 210 | +int processedCount = 0; |
| 211 | + |
| 212 | +stream.query(query, after, Limit.to(batchSize)).forEach(event -> { |
| 213 | + processEventIdempotently(event); // Idempotent handler |
| 214 | + processedCount++; |
| 215 | + |
| 216 | + if (processedCount % bookmarkEvery == 0) { |
| 217 | + stream.placeBookmark(readerName, event.reference(), Tags.none()); |
| 218 | + } |
| 219 | +}); |
| 220 | + |
| 221 | +// Place final bookmark |
| 222 | +if (processedCount > 0) { |
| 223 | + stream.placeBookmark(readerName, lastEventRef, Tags.none()); |
| 224 | +} |
| 225 | +``` |
| 226 | + |
| 227 | +If processing fails mid-batch, idempotent handlers allow re-processing without side effects. |
| 228 | + |
| 229 | + |
| 230 | +> Idempotency can be as simple as storing the Event position (available in the EventReference) along with the read model, and to ignore handling of any events that carry a position that is not higher than the one already stored. |
| 231 | +{: .prompt-info } |
| 232 | + |
| 233 | +## Multiple Reader Instances |
| 234 | + |
| 235 | +Event streams are typically processed **sequentially** because events in a stream have causal relationships and ordering requirements. Parallelizing their processing toward a useful result is not trivial. |
| 236 | + |
| 237 | +**Exceptions**: Scenarios that can be parallelized include: |
| 238 | +- **MapReduce algorithms**: Independent aggregations that can be combined |
| 239 | +- **CRDTs (Conflict-free Replicated Data Types)**: Structures designed for commutative operations |
| 240 | + |
| 241 | +For most use cases, a **single reader instance** works its way sequentially through all events in a stream. |
| 242 | + |
| 243 | +However, **multiple independent read models** can be built in parallel: |
| 244 | + |
| 245 | +```java |
| 246 | +// Three different readers processing the same stream independently |
| 247 | + |
| 248 | +// Reader 1: Customer analytics |
| 249 | +stream.placeBookmark("customer-analytics", ref1, Tags.none()); |
| 250 | + |
| 251 | +// Reader 2: Email notification sender |
| 252 | +stream.placeBookmark("email-sender", ref2, Tags.none()); |
| 253 | + |
| 254 | +// Reader 3: Reporting dashboard |
| 255 | +stream.placeBookmark("dashboard-updater", ref3, Tags.none()); |
| 256 | +``` |
| 257 | + |
| 258 | +Each reader: |
| 259 | +- Processes events at its own pace |
| 260 | +- Maintains its own bookmark |
| 261 | +- Can lag behind or catch up independently |
| 262 | +- Builds a different view or performs different actions |
| 263 | + |
| 264 | +These are the "readers" placing their bookmarks. They operate independently on the same event stream. |
| 265 | + |
| 266 | +**Key Insight**: While a single reader must process events sequentially, many different readers can process the same stream concurrently, each maintaining their own bookmark. |
| 267 | + |
| 268 | +## Listening for Bookmark Updates - Monitoring Reader Progress |
| 269 | + |
| 270 | +Register a listener to receive real-time notifications when bookmarks are updated: |
| 271 | + |
| 272 | +```java |
| 273 | +stream.subscribe((String reader, EventReference processedUntil) -> { |
| 274 | + System.out.println( |
| 275 | + reader + " processed up to position " + processedUntil.position() |
| 276 | + ); |
| 277 | + |
| 278 | + // Record metrics |
| 279 | + metricsService.recordReaderPosition(reader, processedUntil.position()); |
| 280 | +}); |
| 281 | +``` |
| 282 | + |
| 283 | +The listener is invoked asynchronously (eventually consistent) whenever any reader places or updates a bookmark. |
| 284 | + |
| 285 | +Some use Cases for Bookmark Listeners: |
| 286 | + |
| 287 | +- **Observability** - verify processors are still running and Eventual Consistency is keeping up with events being appended |
| 288 | +- **Monitoring delays in processing** |
| 289 | +- **Waiting for a Reader to process up to a certain point** |
| 290 | +- **Coordinating Multiple Readers** |
| 291 | +- etc ... |
| 292 | +``` |
| 293 | +
|
| 294 | +Bookmark listeners enable powerful monitoring and coordination patterns for eventually consistent event processing systems. |
0 commit comments