Skip to content

Commit ca1e007

Browse files
committed
Update CancelSignalTest
I don't know if this test actually makes sense: the cancel opertion happens in a different thread that causes some error when calling session.close because the thread doesn't match the one the session was open. I'm going to keep it because it was introduced to solve #1436 but we might need to review it again in the future.
1 parent 403e12e commit ca1e007

File tree

1 file changed

+69
-50
lines changed

1 file changed

+69
-50
lines changed

hibernate-reactive-core/src/test/java/org/hibernate/reactive/CancelSignalTest.java

Lines changed: 69 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,20 @@
55
*/
66
package org.hibernate.reactive;
77

8+
import java.util.ArrayList;
89
import java.util.Collection;
910
import java.util.List;
1011
import java.util.Objects;
1112
import java.util.Queue;
1213
import java.util.concurrent.CompletableFuture;
14+
import java.util.concurrent.CompletionStage;
1315
import java.util.concurrent.ConcurrentLinkedQueue;
1416
import java.util.concurrent.CountDownLatch;
1517
import java.util.concurrent.ExecutorService;
1618
import java.util.concurrent.Executors;
17-
import java.util.stream.IntStream;
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
1822

1923
import org.junit.jupiter.api.Test;
2024

@@ -27,73 +31,88 @@
2731
import jakarta.persistence.Id;
2832
import jakarta.persistence.Table;
2933

30-
import static java.util.Arrays.stream;
3134
import static java.util.concurrent.CompletableFuture.allOf;
3235
import static java.util.concurrent.CompletableFuture.runAsync;
33-
import static java.util.stream.Stream.concat;
3436
import static org.assertj.core.api.Assertions.assertThat;
37+
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;
3538

3639
public class CancelSignalTest extends BaseReactiveTest {
3740
private static final Logger LOG = Logger.getLogger( CancelSignalTest.class );
3841

42+
private static final int EXECUTION_SIZE = 10;
43+
3944
@Override
4045
protected Collection<Class<?>> annotatedEntities() {
4146
return List.of( GuineaPig.class );
4247
}
4348

49+
@Override
50+
public CompletionStage<Void> deleteEntities(Class<?>... entities) {
51+
// We don't need to delete anything
52+
return voidFuture();
53+
}
54+
4455
@Test
4556
public void cleanupConnectionWhenCancelSignal(VertxTestContext context) {
4657
// larger than 'sql pool size' to check entering the 'pool waiting queue'
47-
int executeSize = 10;
4858
CountDownLatch firstSessionWaiter = new CountDownLatch( 1 );
4959
Queue<Cancellable> cancellableQueue = new ConcurrentLinkedQueue<>();
5060

51-
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( executeSize );
52-
// Create some jobs that are going to be cancelled asynchronously
53-
CompletableFuture[] withSessionFutures = IntStream
54-
.range( 0, executeSize )
55-
.mapToObj( i -> runAsync(
56-
() -> {
57-
CountDownLatch countDownLatch = new CountDownLatch( 1 );
58-
Cancellable cancellable = getMutinySessionFactory()
59-
.withSession( s -> {
60-
LOG.debug( "start withSession: " + i );
61-
sleep( 100 );
62-
firstSessionWaiter.countDown();
63-
return s.find( GuineaPig.class, 1 );
64-
} )
65-
.onTermination().invoke( () -> {
66-
countDownLatch.countDown();
67-
LOG.debug( "future " + i + " terminated" );
68-
} )
69-
.subscribe().with( item -> LOG.debug( "end withSession: " + i ) );
70-
cancellableQueue.add( cancellable );
71-
await( countDownLatch );
72-
},
73-
withSessionExecutor
74-
) )
75-
.toArray( CompletableFuture[]::new );
76-
77-
// Create jobs that are going to cancel the previous ones
78-
ExecutorService cancelExecutor = Executors.newFixedThreadPool( executeSize );
79-
CompletableFuture[] cancelFutures = IntStream
80-
.range( 0, executeSize )
81-
.mapToObj( i -> runAsync(
82-
() -> {
83-
await( firstSessionWaiter );
84-
cancellableQueue.poll().cancel();
85-
sleep( 500 );
86-
},
87-
cancelExecutor
88-
) )
89-
.toArray( CompletableFuture[]::new );
90-
91-
CompletableFuture<Void> allFutures = allOf( concat( stream( withSessionFutures ), stream( cancelFutures ) )
92-
.toArray( CompletableFuture[]::new )
93-
);
94-
95-
// Test that there shouldn't be any pending process
96-
test( context, allFutures.thenAccept( x -> assertThat( sqlPendingMetric() ).isEqualTo( 0.0 ) ) );
61+
final AtomicInteger cancelledCount = new AtomicInteger( 0 );
62+
final AtomicInteger terminatedCount = new AtomicInteger( 0 );
63+
64+
final List<CompletableFuture<?>> allFutures = new ArrayList<>();
65+
66+
ExecutorService withSessionExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );
67+
for ( int j = 0; j < EXECUTION_SIZE; j++ ) {
68+
final int i = j;
69+
allFutures.add( runAsync( () -> {
70+
CountDownLatch countDownLatch = new CountDownLatch( 1 );
71+
Cancellable cancellable = getMutinySessionFactory()
72+
.withSession( s -> {
73+
LOG.info( "start withSession: " + i );
74+
sleep( 100 );
75+
firstSessionWaiter.countDown();
76+
return s.find( GuineaPig.class, 1 );
77+
} )
78+
.onCancellation().invoke( () -> {
79+
cancelledCount.incrementAndGet();
80+
LOG.info( "future " + i + " cancelled" );
81+
} )
82+
.onTermination().invoke( () -> {
83+
terminatedCount.incrementAndGet();
84+
countDownLatch.countDown();
85+
} )
86+
.subscribe()
87+
// We cancelled the job, it shouldn't really finish
88+
.with( item -> LOG.info( "end withSession: " + i ) );
89+
cancellableQueue.add( cancellable );
90+
await( countDownLatch );
91+
},
92+
withSessionExecutor
93+
) );
94+
}
95+
96+
ExecutorService cancelExecutor = Executors.newFixedThreadPool( EXECUTION_SIZE );
97+
for ( int i = 0; i < EXECUTION_SIZE; i++ ) {
98+
allFutures.add( runAsync( () -> {
99+
await( firstSessionWaiter );
100+
cancellableQueue.poll().cancel();
101+
sleep( 500 );
102+
},
103+
cancelExecutor
104+
) );
105+
}
106+
107+
CompletableFuture<Void> allOf = allOf( allFutures.toArray( new CompletableFuture<?>[0] ) );
108+
test( context, allOf.thenAccept( x -> {
109+
LOG.info( "Asserting test results" );
110+
// Test that there shouldn't be any pending process
111+
assertThat( sqlPendingMetric() ).isEqualTo( 0.0 );
112+
// All jobs should have been cancelled
113+
assertThat( cancelledCount.get() ).isEqualTo( EXECUTION_SIZE );
114+
assertThat( terminatedCount.get() ).isEqualTo( EXECUTION_SIZE );
115+
} ) );
97116
}
98117

99118
private static double sqlPendingMetric() {

0 commit comments

Comments
 (0)