Skip to content

Commit 31eeeaf

Browse files
christophstroblodrotbohm
authored andcommitted
#517 - Add example for declarative reactive transactions in MongoDB.
Original pull request: #518.
1 parent 6bf0258 commit 31eeeaf

File tree

3 files changed

+243
-8
lines changed

3 files changed

+243
-8
lines changed

mongodb/transactions/README.md

+42-8
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ be patient.
1515
## Sync Transactions
1616

1717
`MongoTransactionManager` is the gateway to the well known Spring transaction support. It lets applications use
18-
[the managed transaction features of Spring](http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/html/transaction.html).
18+
[the managed transaction features of Spring](http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/data-access.html#transaction).
1919
The `MongoTransactionManager` binds a `ClientSession` to the thread. `MongoTemplate` detects the session and operates
2020
on these resources which are associated with the transaction accordingly. `MongoTemplate` can also participate in
2121
other, ongoing transactions.
@@ -51,17 +51,16 @@ public class TransitionService {
5151
}
5252
```
5353

54-
## Reactive transactions
54+
## Programmatic Reactive transactions
5555

56-
`ReactiveMongoTemplate` offers dedicated methods for operating within a transaction without having to worry about the
57-
commit/abort actions depending on the operations outcome. There's currently no session or transaction integration
58-
with reactive repositories - we apologize for that!
56+
`ReactiveMongoTemplate` offers dedicated methods (like `inTransaction()`) for operating within a transaction without having to worry about the
57+
commit/abort actions depending on the operations outcome.
5958

6059
**NOTE:** Please note that you cannot preform meta operations, like collection creation within a transaction.
6160

6261
```java
63-
@Component
64-
public class RactiveTransitionService {
62+
@Service
63+
public class ReactiveTransitionService {
6564

6665
public Mono<Integer> run(Integer id) {
6766

@@ -76,4 +75,39 @@ public class RactiveTransitionService {
7675
}).next().map(Process::getId);
7776
}
7877
}
79-
```
78+
```
79+
80+
## Declarative Reactive transactions
81+
82+
`ReactiveMongoTransactionManager` is the gateway to the reactive Spring transaction support. It lets applications use
83+
[the managed transaction features of Spring](http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/data-access.html#transaction).
84+
The `ReactiveMongoTransactionManager` adds the `ClientSession` to the `reactor.util.context.Context`. `ReactiveMongoTemplate` detects the session and operates
85+
on these resources which are associated with the transaction accordingly.
86+
87+
```java
88+
@EnableTransactionManagement
89+
class Config extends AbstractReactiveMongoConfiguration {
90+
91+
@Bean
92+
ReactiveTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
93+
return new ReactiveMongoTransactionManager(factory);
94+
}
95+
96+
// ...
97+
}
98+
99+
100+
@Service
101+
class ReactiveManagedTransitionService {
102+
103+
@Transactional
104+
public Mono<Integer> run(Integer id) {
105+
106+
return lookup(id)
107+
.flatMap(process -> start(template, process))
108+
.flatMap(it -> verify(it)) //
109+
.flatMap(process -> finish(template, process))
110+
.map(Process::getId);
111+
}
112+
}
113+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package example.springdata.mongodb.reactive;
17+
18+
import example.springdata.mongodb.Process;
19+
import example.springdata.mongodb.State;
20+
import lombok.RequiredArgsConstructor;
21+
import reactor.core.publisher.Mono;
22+
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.springframework.data.mongodb.core.ReactiveMongoOperations;
26+
import org.springframework.data.mongodb.core.ReactiveMongoTemplate;
27+
import org.springframework.data.mongodb.core.query.Criteria;
28+
import org.springframework.data.mongodb.core.query.Query;
29+
import org.springframework.data.mongodb.core.query.Update;
30+
import org.springframework.stereotype.Service;
31+
import org.springframework.transaction.annotation.Transactional;
32+
import org.springframework.util.Assert;
33+
34+
/**
35+
* @author Christoph Strobl
36+
*/
37+
@Service
38+
@RequiredArgsConstructor
39+
public class ReactiveManagedTransitionService {
40+
41+
final ReactiveProcessRepository repository;
42+
final ReactiveMongoTemplate template;
43+
44+
final AtomicInteger counter = new AtomicInteger(0);
45+
46+
public Mono<Process> newProcess() {
47+
return repository.save(new Process(counter.incrementAndGet(), State.CREATED, 0));
48+
}
49+
50+
@Transactional
51+
public Mono<Integer> run(Integer id) {
52+
53+
return lookup(id) //
54+
.flatMap(process -> start(template, process)) //
55+
.flatMap(it -> verify(it)) //
56+
.flatMap(process -> finish(template, process)) //
57+
.map(Process::getId);
58+
}
59+
60+
private Mono<Process> finish(ReactiveMongoOperations operations, Process process) {
61+
62+
return operations.update(Process.class).matching(Query.query(Criteria.where("id").is(process.getId())))
63+
.apply(Update.update("state", State.DONE).inc("transitionCount", 1)).first() //
64+
.then(Mono.just(process));
65+
}
66+
67+
Mono<Process> start(ReactiveMongoOperations operations, Process process) {
68+
69+
return operations.update(Process.class).matching(Query.query(Criteria.where("id").is(process.getId())))
70+
.apply(Update.update("state", State.ACTIVE).inc("transitionCount", 1)).first() //
71+
.then(Mono.just(process));
72+
}
73+
74+
Mono<Process> lookup(Integer id) {
75+
return repository.findById(id);
76+
}
77+
78+
Mono<Process> verify(Process process) {
79+
80+
Assert.state(process.getId() % 3 != 0, "We're sorry but we needed to drop that one");
81+
return Mono.just(process);
82+
}
83+
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package example.springdata.mongodb.reactive;
17+
18+
import static org.assertj.core.api.Assertions.*;
19+
20+
import example.springdata.mongodb.Process;
21+
import example.springdata.mongodb.State;
22+
import example.springdata.mongodb.util.EmbeddedMongo;
23+
import reactor.core.publisher.Flux;
24+
import reactor.test.StepVerifier;
25+
26+
import org.bson.Document;
27+
import org.junit.ClassRule;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.springframework.beans.factory.annotation.Autowired;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.context.annotation.ComponentScan;
33+
import org.springframework.context.annotation.Configuration;
34+
import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
35+
import org.springframework.data.mongodb.ReactiveMongoTransactionManager;
36+
import org.springframework.data.mongodb.config.AbstractReactiveMongoConfiguration;
37+
import org.springframework.data.mongodb.repository.config.EnableReactiveMongoRepositories;
38+
import org.springframework.test.context.ContextConfiguration;
39+
import org.springframework.test.context.junit4.SpringRunner;
40+
import org.springframework.transaction.ReactiveTransactionManager;
41+
import org.springframework.transaction.annotation.EnableTransactionManagement;
42+
43+
import com.mongodb.reactivestreams.client.MongoClient;
44+
import com.mongodb.reactivestreams.client.MongoClients;
45+
46+
/**
47+
* Test showing MongoDB Transaction usage through a reactive API.
48+
*
49+
* @author Christoph Strobl
50+
* @currentRead The Core - Peter V. Brett
51+
*/
52+
@RunWith(SpringRunner.class)
53+
@ContextConfiguration
54+
public class ReactiveManagedTransitionServiceTests {
55+
56+
public static @ClassRule EmbeddedMongo replSet = EmbeddedMongo.replSet().configure();
57+
58+
@Autowired ReactiveManagedTransitionService managedTransitionService;
59+
@Autowired MongoClient client;
60+
61+
static final String DB_NAME = "spring-data-reactive-tx-examples";
62+
63+
@Configuration
64+
@ComponentScan
65+
@EnableReactiveMongoRepositories
66+
@EnableTransactionManagement
67+
static class Config extends AbstractReactiveMongoConfiguration {
68+
69+
@Bean
70+
ReactiveTransactionManager transactionManager(ReactiveMongoDatabaseFactory dbFactory) {
71+
return new ReactiveMongoTransactionManager(dbFactory);
72+
}
73+
74+
@Bean
75+
@Override
76+
public MongoClient reactiveMongoClient() {
77+
return MongoClients.create(replSet.getConnectionString());
78+
}
79+
80+
@Override
81+
protected String getDatabaseName() {
82+
return DB_NAME;
83+
}
84+
}
85+
86+
@Test
87+
public void reactiveTxCommitRollback() {
88+
89+
for (int i = 0; i < 10; i++) {
90+
managedTransitionService.newProcess() //
91+
.map(Process::getId) //
92+
.flatMap(managedTransitionService::run) //
93+
.onErrorReturn(-1).as(StepVerifier::create) //
94+
.consumeNextWith(val -> {}) //
95+
.verifyComplete();
96+
}
97+
98+
Flux.from(client.getDatabase(DB_NAME).getCollection("processes").find(new Document())) //
99+
.buffer(10) //
100+
.as(StepVerifier::create) //
101+
.consumeNextWith(list -> {
102+
103+
for (Document document : list) {
104+
105+
System.out.println("document: " + document);
106+
107+
if (document.getInteger("_id") % 3 == 0) {
108+
assertThat(document.getString("state")).isEqualTo(State.CREATED.toString());
109+
} else {
110+
assertThat(document.getString("state")).isEqualTo(State.DONE.toString());
111+
}
112+
}
113+
114+
}) //
115+
.verifyComplete();
116+
}
117+
}

0 commit comments

Comments
 (0)