Skip to content

Commit 3e1e516

Browse files
committed
feat: Implement Transactional Outbox pattern
Resolves #2671
1 parent ede37bd commit 3e1e516

File tree

10 files changed

+584
-0
lines changed

10 files changed

+584
-0
lines changed
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
5+
6+
The MIT License
7+
Copyright © 2014-2022 Ilkka Seppälä
8+
9+
Permission is hereby granted, free of charge, to any person obtaining a copy
10+
of this software and associated documentation files (the "Software"), to deal
11+
in the Software without restriction, including without limitation the rights
12+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13+
copies of the Software, and to permit persons to whom the Software is
14+
furnished to do so, subject to the following conditions:
15+
16+
The above copyright notice and this permission notice shall be included in
17+
all copies or substantial portions of the Software.
18+
19+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25+
THE SOFTWARE.
26+
27+
-->
28+
<project xmlns="http://maven.apache.org/POM/4.0.0"
29+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
30+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
31+
<modelVersion>4.0.0</modelVersion>
32+
<parent>
33+
<groupId>com.iluwatar</groupId>
34+
<artifactId>java-design-patterns</artifactId>
35+
<version>1.26.0-SNAPSHOT</version>
36+
</parent>
37+
38+
<artifactId>microservices-transactional-outbox</artifactId>
39+
40+
<dependencies>
41+
<dependency>
42+
<groupId>org.slf4j</groupId>
43+
<artifactId>slf4j-api</artifactId>
44+
</dependency>
45+
<dependency>
46+
<groupId>jakarta.persistence</groupId>
47+
<artifactId>jakarta.persistence-api</artifactId>
48+
<version>3.1.0</version>
49+
</dependency>
50+
<dependency>
51+
<groupId>ch.qos.logback</groupId>
52+
<artifactId>logback-classic</artifactId>
53+
</dependency>
54+
<dependency>
55+
<groupId>org.hibernate.orm</groupId>
56+
<artifactId>hibernate-core</artifactId>
57+
<version>6.4.4.Final</version>
58+
</dependency>
59+
<dependency>
60+
<groupId>com.h2database</groupId>
61+
<artifactId>h2</artifactId>
62+
<version>2.2.224</version>
63+
</dependency>
64+
<dependency>
65+
<groupId>com.fasterxml.jackson.core</groupId>
66+
<artifactId>jackson-databind</artifactId>
67+
<version>2.17.0</version>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.junit.jupiter</groupId>
71+
<artifactId>junit-jupiter-engine</artifactId>
72+
<scope>test</scope>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.mockito</groupId>
76+
<artifactId>mockito-junit-jupiter</artifactId>
77+
<version>5.16.1</version>
78+
<scope>test</scope>
79+
</dependency>
80+
</dependencies>
81+
<build>
82+
<plugins>
83+
<plugin>
84+
<groupId>org.apache.maven.plugins</groupId>
85+
<artifactId>maven-assembly-plugin</artifactId>
86+
<executions>
87+
<execution>
88+
<configuration>
89+
<archive>
90+
<manifest>
91+
<mainClass>com.iluwatar.transactionaloutbox.App</mainClass>
92+
</manifest>
93+
</archive>
94+
</configuration>
95+
</execution>
96+
</executions>
97+
</plugin>
98+
</plugins>
99+
</build>
100+
</project>
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
25+
package com.iluwatar.transactionaloutbox;
26+
27+
import jakarta.persistence.Persistence;
28+
import lombok.extern.slf4j.Slf4j;
29+
30+
@Slf4j
31+
public class App {
32+
33+
public static void main(String[] args) throws Exception {
34+
var entityManagerFactory = Persistence.createEntityManagerFactory("transactional-outbox-pu");
35+
var entityManager = entityManagerFactory.createEntityManager();
36+
37+
var customerService = new CustomerService(entityManager);
38+
var messageBroker = new MessageBroker();
39+
var eventPoller = new EventPoller(entityManager, messageBroker);
40+
41+
eventPoller.start();
42+
43+
LOGGER.info("Running simulation...");
44+
Thread.sleep(1000);
45+
46+
customerService.createCustomer("john.doe");
47+
Thread.sleep(5000);
48+
49+
customerService.createCustomer("jane.doe");
50+
Thread.sleep(5000);
51+
52+
eventPoller.stop();
53+
entityManager.close();
54+
entityManagerFactory.close();
55+
LOGGER.info("Simulation finished.");
56+
}
57+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
25+
package com.iluwatar.transactionaloutbox;
26+
27+
import jakarta.persistence.Entity;
28+
import jakarta.persistence.GeneratedValue;
29+
import jakarta.persistence.Id;
30+
import lombok.Getter;
31+
import lombok.Setter;
32+
33+
@Getter
34+
@Entity
35+
public class Customer {
36+
37+
private final String username;
38+
@Setter @Id @GeneratedValue private Integer id;
39+
40+
public Customer(String username) {
41+
this.username = username;
42+
}
43+
}
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
25+
package com.iluwatar.transactionaloutbox;
26+
27+
import com.fasterxml.jackson.databind.ObjectMapper;
28+
import jakarta.persistence.EntityManager;
29+
import lombok.extern.slf4j.Slf4j;
30+
31+
@Slf4j
32+
public class CustomerService {
33+
34+
private final EntityManager entityManager;
35+
private final OutboxRepository outboxRepository;
36+
private final ObjectMapper objectMapper = new ObjectMapper();
37+
38+
public CustomerService(EntityManager entityManager) {
39+
this.entityManager = entityManager;
40+
this.outboxRepository = new OutboxRepository(entityManager);
41+
}
42+
43+
public void createCustomer(String username) throws Exception {
44+
entityManager.getTransaction().begin();
45+
try {
46+
var customer = new Customer(username);
47+
entityManager.persist(customer);
48+
49+
String payload = objectMapper.writeValueAsString(customer);
50+
var event = new OutboxEvent("CUSTOMER_CREATED", payload);
51+
outboxRepository.save(event);
52+
53+
entityManager.getTransaction().commit();
54+
LOGGER.info("SUCCESS: Customer and OutboxEvent saved transactionally.");
55+
56+
} catch (Exception e) {
57+
entityManager.getTransaction().rollback();
58+
LOGGER.error("ERROR: Transaction rolled back.");
59+
throw e;
60+
}
61+
}
62+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* This project is licensed under the MIT license. Module model-view-viewmodel is using ZK framework licensed under LGPL (see lgpl-3.0.txt).
3+
*
4+
* The MIT License
5+
* Copyright © 2014-2022 Ilkka Seppälä
6+
*
7+
* Permission is hereby granted, free of charge, to any person obtaining a copy
8+
* of this software and associated documentation files (the "Software"), to deal
9+
* in the Software without restriction, including without limitation the rights
10+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
* copies of the Software, and to permit persons to whom the Software is
12+
* furnished to do so, subject to the following conditions:
13+
*
14+
* The above copyright notice and this permission notice shall be included in
15+
* all copies or substantial portions of the Software.
16+
*
17+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
* THE SOFTWARE.
24+
*/
25+
26+
package com.iluwatar.transactionaloutbox;
27+
28+
import jakarta.persistence.EntityManager;
29+
import java.util.List;
30+
import java.util.concurrent.Executors;
31+
import java.util.concurrent.ScheduledExecutorService;
32+
import java.util.concurrent.TimeUnit;
33+
import lombok.Getter;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
public class EventPoller {
38+
39+
private static final Logger LOGGER = LoggerFactory.getLogger(EventPoller.class);
40+
private static final int MAX_RETRIES = 3;
41+
private static final long RETRY_DELAY_MS = 1000;
42+
private static final java.security.SecureRandom RANDOM = new java.security.SecureRandom();
43+
44+
@Getter
45+
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
46+
47+
private final EntityManager entityManager;
48+
private final OutboxRepository outboxRepository;
49+
private final MessageBroker messageBroker;
50+
@Getter private long processedEventsCount = 0;
51+
@Getter private long failedEventsCount = 0;
52+
53+
public EventPoller(EntityManager entityManager, MessageBroker messageBroker) {
54+
this.entityManager = entityManager;
55+
this.outboxRepository = new OutboxRepository(entityManager);
56+
this.messageBroker = messageBroker;
57+
}
58+
59+
public void start() {
60+
scheduler.scheduleAtFixedRate(this::processOutboxEvents, 0, 2, TimeUnit.SECONDS);
61+
LOGGER.info("EventPoller started.");
62+
}
63+
64+
public void stop() {
65+
scheduler.shutdown();
66+
LOGGER.info(
67+
"EventPoller stopped with {} events processed and {} failures.",
68+
processedEventsCount,
69+
failedEventsCount);
70+
}
71+
72+
void processOutboxEvents() {
73+
LOGGER.info("Polling for unprocessed events...");
74+
entityManager.getTransaction().begin();
75+
try {
76+
List<OutboxEvent> events = outboxRepository.findUnprocessedEvents();
77+
if (events.isEmpty()) {
78+
LOGGER.info("No new events found.");
79+
} else {
80+
LOGGER.info("Found {} events to process.", events.size());
81+
for (var event : events) {
82+
processEventWithRetry(event);
83+
}
84+
}
85+
entityManager.getTransaction().commit();
86+
} catch (Exception e) {
87+
LOGGER.error("Error processing outbox events, rolling back transaction.", e);
88+
entityManager.getTransaction().rollback();
89+
failedEventsCount++;
90+
}
91+
}
92+
93+
private void processEventWithRetry(OutboxEvent event) {
94+
int retryCount = 0;
95+
while (retryCount < MAX_RETRIES) {
96+
try {
97+
messageBroker.sendMessage(event);
98+
outboxRepository.markAsProcessed(event);
99+
processedEventsCount++;
100+
LOGGER.info("Successfully processed event.");
101+
return;
102+
} catch (Exception e) {
103+
retryCount++;
104+
if (retryCount < MAX_RETRIES) {
105+
LOGGER.warn(
106+
"Failed to process event (attempt {}/{}). Retrying...", retryCount, MAX_RETRIES);
107+
try {
108+
long sleepTime =
109+
Math.min(RETRY_DELAY_MS * (1L << (retryCount - 1)) + RANDOM.nextLong(100), 10000L);
110+
Thread.sleep(sleepTime);
111+
} catch (InterruptedException ie) {
112+
Thread.currentThread().interrupt();
113+
break;
114+
}
115+
} else {
116+
LOGGER.error("Failed to process event after {} attempts", MAX_RETRIES);
117+
failedEventsCount++;
118+
}
119+
}
120+
}
121+
}
122+
}

0 commit comments

Comments
 (0)