|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +toc: true |
| 4 | +title: Projecting Events |
| 5 | +description: Projecting Domain Events into a read model |
| 6 | +date: 2025-11-29 04:00:00 |
| 7 | +categories: [Eventstore Documentation] |
| 8 | +tags: [events,projection,projector,read model] |
| 9 | +--- |
| 10 | + |
| 11 | +This guide covers how to build projections from event streams, including event handlers, read models, and using the Projector utility. |
| 12 | + |
| 13 | +## Projections from Your Events |
| 14 | + |
| 15 | +Projections process events from a stream to create current-state views needed by your application. |
| 16 | +They produce views that make sense out of all the business events that happened until a certain moment in time. |
| 17 | + |
| 18 | +These materialized views enable: |
| 19 | + |
| 20 | +- **Business decisions**: Aggregating historical facts to validate new commands |
| 21 | +- **User screens**: Building denormalized data structures for efficient display |
| 22 | +- **REST endpoints**: Generating response payloads from event history |
| 23 | +- **Reports**: Calculating metrics and analytics from domain events |
| 24 | +- etc... |
| 25 | + |
| 26 | +A projection queries relevant events and applies them sequentially to build state: |
| 27 | + |
| 28 | +```java |
| 29 | +// Projection builds current customer state from events |
| 30 | +CustomerProjection projection = new CustomerProjection("123"); |
| 31 | +Projector.from(stream).towards(projection).build().run(); |
| 32 | + |
| 33 | +// Use the projection result for business logic |
| 34 | +if (!projection.isChurned()) { |
| 35 | + // Process order for active customer |
| 36 | +} |
| 37 | +``` |
| 38 | + |
| 39 | +Projections are deterministic: replaying the same events in the same order always produces the same result. |
| 40 | + |
| 41 | +## Writing a Simple Event Handler |
| 42 | + |
| 43 | +The `EventHandler` interface processes domain events without metadata. Implement the `when()` method to handle events: |
| 44 | + |
| 45 | +```java |
| 46 | +public class CustomerCounter implements EventHandler<CustomerEvent> { |
| 47 | + private int registrationCount = 0; |
| 48 | + |
| 49 | + @Override |
| 50 | + public void when(CustomerEvent event) { |
| 51 | + if (event instanceof CustomerRegistered) { |
| 52 | + registrationCount++; |
| 53 | + } |
| 54 | + } |
| 55 | + |
| 56 | + public int getCount() { return registrationCount; } |
| 57 | +} |
| 58 | +``` |
| 59 | + |
| 60 | +Use pattern matching with switch expressions for cleaner code: |
| 61 | + |
| 62 | +```java |
| 63 | +@Override |
| 64 | +public void when(CustomerEvent event) { |
| 65 | + switch(event) { |
| 66 | + case CustomerRegistered r -> registrationCount++; |
| 67 | + case CustomerChurned c -> churnCount++; |
| 68 | + default -> {} // Ignore other events |
| 69 | + } |
| 70 | +} |
| 71 | +``` |
| 72 | + |
| 73 | +`EventHandler` is a functional interface, enabling lambda usage: |
| 74 | + |
| 75 | +```java |
| 76 | +EventHandler<CustomerEvent> logger = event -> |
| 77 | + System.out.println("Event: " + event.getClass().getSimpleName()); |
| 78 | +``` |
| 79 | + |
| 80 | +## Writing an Event Handler with Access to Metadata |
| 81 | + |
| 82 | +The `EventWithMetaDataHandler` interface provides access to the full `Event` wrapper, including timestamp, tags, reference, and stream information: |
| 83 | + |
| 84 | +```java |
| 85 | +public class CustomerTimeline implements EventWithMetaDataHandler<CustomerEvent> { |
| 86 | + private List<TimelineEntry> timeline = new ArrayList<>(); |
| 87 | + |
| 88 | + @Override |
| 89 | + public void when(Event<CustomerEvent> event) { |
| 90 | + timeline.add(new TimelineEntry( |
| 91 | + event.timestamp(), |
| 92 | + event.reference().position(), |
| 93 | + event.data().getClass().getSimpleName() |
| 94 | + )); |
| 95 | + } |
| 96 | + |
| 97 | + public List<TimelineEntry> getTimeline() { return timeline; } |
| 98 | +} |
| 99 | +``` |
| 100 | + |
| 101 | +**Use `EventWithMetaDataHandler` when you need:** |
| 102 | +- Event timestamps for temporal information that is not in your event payload |
| 103 | +- Tags for correlation, filtering or additional information stored therein (audit logging log, ...) |
| 104 | +- References for tracking position |
| 105 | +- Stream information for multi-stream projections |
| 106 | + |
| 107 | +**Use `EventHandler` when:** |
| 108 | +- You only need the business event data |
| 109 | +- Building simple state aggregations |
| 110 | +- Metadata is irrelevant to the projection logic |
| 111 | + |
| 112 | +## Implementing a Readmodel |
| 113 | + |
| 114 | +A readmodel is a projection that implements the `Projection` interface, combining an `EventQuery` with an event handler: |
| 115 | + |
| 116 | +```java |
| 117 | +public class CustomerSummary implements Projection<CustomerEvent> { |
| 118 | + private final String customerId; |
| 119 | + private String name; |
| 120 | + private boolean churned; |
| 121 | + |
| 122 | + public CustomerSummary(String customerId) { |
| 123 | + this.customerId = customerId; |
| 124 | + } |
| 125 | + |
| 126 | + @Override |
| 127 | + public EventQuery eventQuery() { |
| 128 | + // Query all events for this specific customer |
| 129 | + return EventQuery.forEvents( |
| 130 | + EventTypesFilter.any(), |
| 131 | + Tags.of("customer", customerId) |
| 132 | + ); |
| 133 | + } |
| 134 | + |
| 135 | + @Override |
| 136 | + public void when(Event<CustomerEvent> event) { |
| 137 | + switch(event.data()) { |
| 138 | + case CustomerRegistered r -> this.name = r.name(); |
| 139 | + case CustomerNameChanged n -> this.name = n.name(); |
| 140 | + case CustomerChurned c -> this.churned = true; |
| 141 | + } |
| 142 | + } |
| 143 | + |
| 144 | + public String getName() { return name; } |
| 145 | + public boolean isChurned() { return churned; } |
| 146 | +} |
| 147 | +``` |
| 148 | + |
| 149 | +The `eventQuery()` method defines which events are relevant. The Projector will only call `when()` for matching events. |
| 150 | + |
| 151 | +For projections that don't need metadata, use `ProjectionWithoutMetaData`: |
| 152 | + |
| 153 | +```java |
| 154 | +public class OrderTotal implements ProjectionWithoutMetaData<OrderEvent> { |
| 155 | + private BigDecimal total = BigDecimal.ZERO; |
| 156 | + |
| 157 | + @Override |
| 158 | + public EventQuery eventQuery() { |
| 159 | + return EventQuery.forEvents( |
| 160 | + EventTypesFilter.of(OrderPlaced.class), |
| 161 | + Tags.none() |
| 162 | + ); |
| 163 | + } |
| 164 | + |
| 165 | + @Override |
| 166 | + public void when(OrderEvent event) { |
| 167 | + if (event instanceof OrderPlaced placed) { |
| 168 | + total = total.add(placed.amount()); |
| 169 | + } |
| 170 | + } |
| 171 | + |
| 172 | + public BigDecimal getTotal() { return total; } |
| 173 | +} |
| 174 | +``` |
| 175 | + |
| 176 | +## Using the Projector |
| 177 | + |
| 178 | +The `Projector` utility class executes projections by querying events and applying them to the handler: |
| 179 | + |
| 180 | +```java |
| 181 | +EventStream<CustomerEvent> stream = eventstore.getEventStream( |
| 182 | + EventStreamId.forContext("customers"), |
| 183 | + CustomerEvent.class |
| 184 | +); |
| 185 | + |
| 186 | +CustomerSummary projection = new CustomerSummary("123"); |
| 187 | + |
| 188 | +Projector.from(stream) |
| 189 | + .towards(projection) |
| 190 | + .build() |
| 191 | + .run(); |
| 192 | + |
| 193 | +// Projection now contains current state |
| 194 | +System.out.println("Customer: " + projection.getName()); |
| 195 | +``` |
| 196 | + |
| 197 | +The Projector: |
| 198 | +1. Queries events matching the projection's `eventQuery()` |
| 199 | +2. Streams them in batches to avoid memory issues |
| 200 | +3. Calls `when()` for each matching event |
| 201 | +4. Returns metrics about the projection execution |
| 202 | + |
| 203 | +## Configuring the Projector |
| 204 | + |
| 205 | +The Projector handles complexity so your application doesn't need to. By default, it queries events in batches of 500 to prevent memory exhaustion with large streams: |
| 206 | + |
| 207 | +```java |
| 208 | +// Use default batch size (500) |
| 209 | +Projector.from(stream) |
| 210 | + .towards(projection) |
| 211 | + .build() |
| 212 | + .run(); |
| 213 | + |
| 214 | +// Configure smaller batches for memory-constrained environments |
| 215 | +Projector.from(stream) |
| 216 | + .towards(projection) |
| 217 | + .inBatchesOf(100) |
| 218 | + .build() |
| 219 | + .run(); |
| 220 | + |
| 221 | +// Configure larger batches for better throughput |
| 222 | +Projector.from(stream) |
| 223 | + .towards(projection) |
| 224 | + .inBatchesOf(1000) |
| 225 | + .build() |
| 226 | + .run(); |
| 227 | +``` |
| 228 | + |
| 229 | +The Projector automatically handles pagination—your projection code remains simple regardless of stream size. |
| 230 | + |
| 231 | +You can also configure where to start processing: |
| 232 | + |
| 233 | +```java |
| 234 | +// Start from a specific position |
| 235 | +EventReference checkpoint = // ... from somewhere ... |
| 236 | + |
| 237 | +Projector.from(stream) |
| 238 | + .towards(projection) |
| 239 | + .startingAfter(checkpoint) |
| 240 | + .build() |
| 241 | + .run(); |
| 242 | +``` |
| 243 | + |
| 244 | +This could, for example, be useful if you already have a persisted or existing projection that is up-to-date to a certain point you want to update with new events that were appended to the eventstore. |
| 245 | + |
| 246 | +## Reusing the Projector |
| 247 | + |
| 248 | +A Projector instance tracks its position in the event stream and can be reused for incremental updates: |
| 249 | + |
| 250 | +```java |
| 251 | +CustomerSummary projection = new CustomerSummary("123"); |
| 252 | + |
| 253 | +Projector<CustomerEvent> projector = Projector.from(stream) |
| 254 | + .towards(projection) |
| 255 | + .build(); |
| 256 | + |
| 257 | +// Initial run - process all historical events |
| 258 | +ProjectorMetrics metrics1 = projector.run(); |
| 259 | +System.out.println("Initial: " + metrics1.eventsHandled() + " events"); |
| 260 | + |
| 261 | +// ... time passes, new events are appended to stream ... |
| 262 | + |
| 263 | +// Incremental run - process only new events since last run |
| 264 | +ProjectorMetrics metrics2 = projector.run(); |
| 265 | +System.out.println("Incremental: " + metrics2.eventsHandled() + " new events"); |
| 266 | + |
| 267 | +// The projection is now up-to-date |
| 268 | +System.out.println("Current state: " + projection.getName()); |
| 269 | +``` |
| 270 | + |
| 271 | +The Projector remembers the last processed event reference and automatically resumes from that position on subsequent runs. |
| 272 | + |
| 273 | +You can also process up to a specific point in time: |
| 274 | + |
| 275 | +```java |
| 276 | +// Process events up to a historical checkpoint |
| 277 | +ProjectorMetrics metrics = projector.runUntil(historicalReference); |
| 278 | + |
| 279 | +// Projection now reflects state as of that moment |
| 280 | +``` |
| 281 | + |
| 282 | +This enables: |
| 283 | +- **Point-in-time queries**: Reconstruct historical states |
| 284 | +- **Controlled updates**: Process events in stages |
| 285 | +- **Testing**: Verify projection behavior at specific points |
| 286 | + |
| 287 | +## Interpreting Metrics |
| 288 | + |
| 289 | +The Projector returns `ProjectorMetrics` containing detailed statistics about projection execution: |
| 290 | + |
| 291 | +```java |
| 292 | +ProjectorMetrics metrics = projector.run(); |
| 293 | + |
| 294 | +System.out.println("Events streamed: " + metrics.eventsStreamed()); |
| 295 | +System.out.println("Events handled: " + metrics.eventsHandled()); |
| 296 | +System.out.println("Queries done: " + metrics.queriesDone()); |
| 297 | +System.out.println("Last event: " + metrics.lastEventReference()); |
| 298 | +``` |
| 299 | + |
| 300 | +### Metrics from the Last Run |
| 301 | + |
| 302 | +`ProjectorMetrics` returned from `run()` or `runUntil()` describes that specific execution: |
| 303 | + |
| 304 | +- **eventsStreamed**: Total events retrieved from the event source (may include filtered events) |
| 305 | +- **eventsHandled**: Events actually processed by the projection handler |
| 306 | +- **queriesDone**: Number of batch queries executed against the event source |
| 307 | +- **lastEventReference**: Reference to the last processed event |
| 308 | + |
| 309 | +```java |
| 310 | +ProjectorMetrics metrics = projector.run(); |
| 311 | + |
| 312 | +if (metrics.eventsHandled() == 0) { |
| 313 | + System.out.println("No new events to process"); |
| 314 | +} else { |
| 315 | + System.out.println("Processed " + metrics.eventsHandled() + |
| 316 | + " events in " + metrics.queriesDone() + " batches"); |
| 317 | +} |
| 318 | +``` |
| 319 | + |
| 320 | +The difference between `eventsStreamed` and `eventsHandled` indicates filtering efficiency: events matched by the query but filtered out by the projection logic. |
| 321 | + |
| 322 | +### Accumulated Metrics |
| 323 | + |
| 324 | +The Projector tracks total metrics across all runs: |
| 325 | + |
| 326 | +```java |
| 327 | +Projector<CustomerEvent> projector = Projector.from(stream) |
| 328 | + .towards(projection) |
| 329 | + .build(); |
| 330 | + |
| 331 | +// Run 1 |
| 332 | +projector.run(); // Handles 100 events |
| 333 | + |
| 334 | +// Run 2 |
| 335 | +projector.run(); // Handles 10 new events |
| 336 | + |
| 337 | +// View totals |
| 338 | +ProjectorMetrics total = projector.accumulatedMetrics(); |
| 339 | +System.out.println("Total events handled: " + total.eventsHandled()); // 110 |
| 340 | +System.out.println("Total queries: " + total.queriesDone()); |
| 341 | +System.out.println("Current position: " + total.lastEventReference()); |
| 342 | +``` |
| 343 | + |
| 344 | +Accumulated metrics are useful for: |
| 345 | +- **Monitoring**: Track total events processed over time |
| 346 | +- **Debugging**: Identify performance issues across runs |
| 347 | +- **Resumption**: Get the current position for checkpointing |
| 348 | +- **Reporting**: Calculate processing statistics |
0 commit comments