|
| 1 | +--- |
| 2 | +layout: post |
| 3 | +toc: true |
| 4 | +title: Appending Events |
| 5 | +description: Appending Domain Events to the Eventstore |
| 6 | +categories: [Eventstore Documentation] |
| 7 | +tags: [events,append,tags,optimistic locking,dcb,idempotency] |
| 8 | +--- |
| 9 | + |
| 10 | +This guide covers how to append events to an event stream, including adding tags, understanding event metadata, optimistic locking, and implementing idempotency. |
| 11 | + |
| 12 | +## Appending an Event to a Stream |
| 13 | + |
| 14 | +Events are appended to an `EventStream` using the `append()` method. Before appending, events are created as `EphemeralEvent` instances—lightweight representations without stream association, reference, or timestamp. |
| 15 | + |
| 16 | +```java |
| 17 | +EventStream<CustomerEvent> stream = eventstore.getEventStream( |
| 18 | + EventStreamId.forContext("customer").withPurpose("123"), |
| 19 | + CustomerEvent.class |
| 20 | +); |
| 21 | + |
| 22 | +stream.append( |
| 23 | + AppendCriteria.none(), |
| 24 | + Event.of(new CustomerRegistered("John"), Tags.none()) |
| 25 | +); |
| 26 | +``` |
| 27 | + |
| 28 | +The `append()` method returns a list of fully-formed `Event` objects with assigned metadata (reference, position, timestamp). |
| 29 | + |
| 30 | +## Adding Tags |
| 31 | + |
| 32 | +Tags are key-value pairs that enable dynamic querying and correlation of events across different event types. They are central to the Dynamic Consistency Boundary (DCB) pattern. |
| 33 | + |
| 34 | +```java |
| 35 | +// Single tag |
| 36 | +stream.append( |
| 37 | + AppendCriteria.none(), |
| 38 | + Event.of(new CustomerRegistered("John"), Tags.of("customer", "123")) |
| 39 | +); |
| 40 | + |
| 41 | +// Multiple tags |
| 42 | +stream.append( |
| 43 | + AppendCriteria.none(), |
| 44 | + Event.of( |
| 45 | + new CustomerRegistered("John"), |
| 46 | + Tags.of("customer", "123", "region", "EU", "priority", "high") |
| 47 | + ) |
| 48 | +); |
| 49 | +``` |
| 50 | + |
| 51 | +Tags enable querying events across different event types based on shared business identifiers: |
| 52 | + |
| 53 | +```java |
| 54 | +// Find all events for a specific customer, regardless of event type |
| 55 | +Stream<Event<CustomerEvent>> customerEvents = stream.query( |
| 56 | + EventQuery.forEvents(EventTypesFilter.any(), Tags.of("customer", "123")) |
| 57 | +); |
| 58 | +``` |
| 59 | + |
| 60 | +You can also add tags to annotate events with any application-level metadata you need, but that you don't like to put in your event definition payloads. |
| 61 | + |
| 62 | + |
| 63 | +## Appended Event Metadata |
| 64 | + |
| 65 | +When an event is appended, the EventStore enriches it with metadata: |
| 66 | + |
| 67 | +- **EventReference**: A unique reference containing both a global `EventId` (UUID) and a `position` (sequential number starting at 1) |
| 68 | +- **Timestamp**: When the event was persisted |
| 69 | +- **Stream**: The `EventStreamId` the event belongs to |
| 70 | + |
| 71 | +```java |
| 72 | +List<Event<CustomerEvent>> appended = stream.append( |
| 73 | + AppendCriteria.none(), |
| 74 | + Event.of(new CustomerRegistered("John"), Tags.none()) |
| 75 | +); |
| 76 | + |
| 77 | +Event<CustomerEvent> event = appended.get(0); |
| 78 | +EventReference ref = event.reference(); |
| 79 | + |
| 80 | +System.out.println("Event ID: " + ref.id()); |
| 81 | +System.out.println("Position: " + ref.position()); // Sequential: 1, 2, 3, ... |
| 82 | +System.out.println("Timestamp: " + event.timestamp()); |
| 83 | +``` |
| 84 | + |
| 85 | +### EventReference: A unique reference to your event |
| 86 | + |
| 87 | +The `EventReference` combines identity and ordering: |
| 88 | +- **EventId**: Globally unique identifier (UUID-based) |
| 89 | +- **Position**: Sequential position within the stream (starts at 1, unique over all streams stored in the same storage) |
| 90 | + |
| 91 | +This way, the `EventReference` provides a great reference to: |
| 92 | +- determine where you were in processing events in your stream (querying a next batch of Events after that reference next time) |
| 93 | +- version a projection built from events, to determine up until which event |
| 94 | +- passing a reference to a client to demand a view that has been updated to at least the information that was submitted by that specific client (consistency) |
| 95 | +- compare the sequence in which two events have happened, based on their `position` in the stream |
| 96 | +- etc... |
| 97 | + |
| 98 | +EventReference is also crucial for implementing optimistic locking in the DCB pattern. It allows you to note the last relevant event when making a decision, then verify no new relevant facts have emerged when appending the result. |
| 99 | + |
| 100 | +```java |
| 101 | +List<Event<CustomerEvent>> events = stream.query( |
| 102 | + EventQuery.forEvents(EventTypesFilter.any(), Tags.of("customer", "123")) |
| 103 | +).toList(); |
| 104 | + |
| 105 | +EventReference lastRef = events.getLast().reference(); |
| 106 | +// Use this reference for optimistic locking |
| 107 | +``` |
| 108 | + |
| 109 | +## Appending Multiple Events to a Stream |
| 110 | + |
| 111 | +Multiple events can be appended in a single atomic operation by passing a list: |
| 112 | + |
| 113 | +```java |
| 114 | +stream.append( |
| 115 | + AppendCriteria.none(), |
| 116 | + List.of( |
| 117 | + Event.of(new CustomerRegistered("John"), Tags.of("customer", "123")), |
| 118 | + Event.of(new CustomerAddressChanged("Main St"), Tags.of("customer", "123")), |
| 119 | + Event.of(new CustomerEmailChanged("john@example.com"), Tags.of("customer", "123")) |
| 120 | + ) |
| 121 | +); |
| 122 | +``` |
| 123 | + |
| 124 | +All events in the list are appended atomically: either all succeed or all fail. Each event receives a consecutive position number within the stream. |
| 125 | + |
| 126 | +**Important**: When using optimistic locking with batch appends, the `AppendCriteria` check is performed once before appending any events. If the check passes, all events are appended together. |
| 127 | + |
| 128 | +## Optimistic Locking |
| 129 | + |
| 130 | +The EventStore implements optimistic locking through the DCB pattern using `AppendCriteria`. This ensures that business decisions based on historical facts remain valid at the moment of appending new events. |
| 131 | + |
| 132 | +### How It Works |
| 133 | + |
| 134 | +1. Query relevant events using an `EventQuery` |
| 135 | +2. Make a business decision based on those facts |
| 136 | +3. Note the reference of the last relevant event |
| 137 | +4. Append new events with `AppendCriteria` containing the same query and last reference |
| 138 | +5. If new events matching the query exist after the reference, `OptimisticLockingException` is thrown |
| 139 | + |
| 140 | +```java |
| 141 | +// Step 1: Query relevant facts |
| 142 | +EventQuery relevantQuery = EventQuery.forEvents( |
| 143 | + EventTypesFilter.any(), |
| 144 | + Tags.of("customer", "123") |
| 145 | +); |
| 146 | +List<Event<CustomerEvent>> relevantEvents = stream.query(relevantQuery).toList(); |
| 147 | + |
| 148 | +// Step 2 & 3: Make decision and note last reference |
| 149 | +EventReference lastRef = relevantEvents.getLast().reference(); |
| 150 | + |
| 151 | +// Step 4 & 5: Conditional append |
| 152 | +try { |
| 153 | + stream.append( |
| 154 | + AppendCriteria.of(relevantQuery, Optional.of(lastRef)), |
| 155 | + Event.of(new CustomerNameChanged("Jane"), Tags.of("customer", "123")) |
| 156 | + ); |
| 157 | +} catch (OptimisticLockingException e) { |
| 158 | + // New relevant facts emerged - retry with updated information |
| 159 | +} |
| 160 | +``` |
| 161 | + |
| 162 | +### First Append (Empty Stream) |
| 163 | + |
| 164 | +When appending to an empty stream or when no previous relevant events exist, not EventReference can be obtained and passed: |
| 165 | + |
| 166 | +```java |
| 167 | +stream.append( |
| 168 | + AppendCriteria.of( |
| 169 | + EventQuery.forEvents(EventTypesFilter.any(), Tags.of("customer", "123")), |
| 170 | + Optional.empty() // No last reference expected |
| 171 | + ), |
| 172 | + Event.of(new CustomerRegistered("John"), Tags.of("customer", "123")) |
| 173 | +); |
| 174 | +``` |
| 175 | + |
| 176 | +## Idempotency |
| 177 | + |
| 178 | +Idempotency ensures that duplicate command submissions don't create duplicate events. |
| 179 | +When handling for example incoming REST calls or asynchronous messages (JMS, Kafka, ...), it could happen that your correctly process and append the information, but that you're not able to acknowledge proper processing to the client or messaging system due to a system or connection failure. |
| 180 | +In that case, it is to be expected that the client assumes processing hasn't happened yet, and that it resubmits the same information. Idempotency in your system then allows to detect and silently ignore the duplicate processing, while confirming (again) to the client that reception and processing has happened correctly. |
| 181 | + |
| 182 | +An easy way to implement this is by tagging events with an idempotency key and using optimistic locking to prevent duplicate appends. |
| 183 | + |
| 184 | +### Implementation Pattern |
| 185 | + |
| 186 | +```java |
| 187 | +public void appendIdempotently( |
| 188 | + EventStream<CustomerEvent> stream, |
| 189 | + CustomerEvent eventData, |
| 190 | + String idempotencyKey) { |
| 191 | + |
| 192 | + // Add idempotency tag to the event |
| 193 | + EphemeralEvent<CustomerEvent> event = Event.of( |
| 194 | + eventData, |
| 195 | + Tags.of("idempotency", idempotencyKey) |
| 196 | + ); |
| 197 | + |
| 198 | + // Define criteria: no event with this idempotency key should exist |
| 199 | + AppendCriteria criteria = AppendCriteria.of( |
| 200 | + EventQuery.forEvents( |
| 201 | + EventTypesFilter.any(), |
| 202 | + Tags.of("idempotency", idempotencyKey) |
| 203 | + ), |
| 204 | + Optional.empty() // Expect no prior event with this key |
| 205 | + ); |
| 206 | + |
| 207 | + try { |
| 208 | + stream.append(criteria, event); |
| 209 | + } catch (OptimisticLockingException e) { |
| 210 | + // Event with this idempotency key already exists |
| 211 | + // Safe to ignore - this is a duplicate submission |
| 212 | + System.out.println("Event already processed: " + idempotencyKey); |
| 213 | + } |
| 214 | +} |
| 215 | +``` |
| 216 | + |
| 217 | +### Usage Example |
| 218 | + |
| 219 | +```java |
| 220 | +String requestId = "req-2024-01-15-abc123"; |
| 221 | + |
| 222 | +// First submission - succeeds |
| 223 | +appendIdempotently( |
| 224 | + stream, |
| 225 | + new CustomerRegistered("John"), |
| 226 | + requestId |
| 227 | +); |
| 228 | + |
| 229 | +// Duplicate submission - OptimisticLockingException thrown and ignored |
| 230 | +appendIdempotently( |
| 231 | + stream, |
| 232 | + new CustomerRegistered("John"), |
| 233 | + requestId |
| 234 | +); |
| 235 | +``` |
| 236 | + |
| 237 | +The `OptimisticLockingException` indicates the event was already appended previously, making the operation idempotent. The exception can be safely caught and ignored, as it signals successful deduplication rather than an error condition. |
0 commit comments