Skip to content

Commit 80607e2

Browse files
committed
RFC: FIXUP: Race condition in TarantoolClientImpl
Here I propose changes that make reconnection awaiting looks more 'native' with surrounding code, because of using CountDownLatch'es instead of ReentrantLock + Conditon. I don't sure whether the change is good from performance point of view. Maybe don't. So, please, consider this patch as RFC. I have stripped comments about synchronization tricks, because I think about rewriting them to be more clear (at least for me), but failed that attempt. I'll start a discussion in PR #145 about them.
1 parent 1bb1f73 commit 80607e2

File tree

1 file changed

+32
-77
lines changed

1 file changed

+32
-77
lines changed

src/main/java/org/tarantool/TarantoolClientImpl.java

Lines changed: 32 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,10 @@ public void run() {
7171
while (!Thread.currentThread().isInterrupted()) {
7272
reconnect(0, thumbstone);
7373
try {
74-
state.awaitReconnection();
75-
} catch (InterruptedException e) {
74+
state.awaitState(StateHelper.RECONNECT);
75+
} catch (IllegalStateException ignored) {
76+
/* No-op. */
77+
} catch (InterruptedException ignored) {
7678
Thread.currentThread().interrupt();
7779
}
7880
}
@@ -162,21 +164,22 @@ protected void connect(final SocketChannel channel) throws Exception {
162164
}
163165

164166
protected void startThreads(String threadName) throws InterruptedException {
165-
final CountDownLatch init = new CountDownLatch(2);
166-
final AtomicInteger generationSync = new AtomicInteger(2);
167+
final CountDownLatch startedThreads = new CountDownLatch(2);
168+
169+
/* Set RECONNECT state again once both reader and writer threads exit. */
170+
final AtomicInteger exitedThreads = new AtomicInteger(2);
171+
167172
reader = new Thread(new Runnable() {
168173
@Override
169174
public void run() {
170-
init.countDown();
175+
startedThreads.countDown();
171176
if (state.acquire(StateHelper.READING)) {
172177
try {
173178
readThread();
174179
} finally {
175180
state.release(StateHelper.READING);
176-
// avoid a case when this thread falls asleep here
177-
// after READING flag released and then can pollute the state
178-
if (generationSync.decrementAndGet() == 0) {
179-
state.trySignalForReconnection();
181+
if (exitedThreads.decrementAndGet() == 0) {
182+
state.compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT);
180183
}
181184
}
182185
}
@@ -185,33 +188,26 @@ public void run() {
185188
writer = new Thread(new Runnable() {
186189
@Override
187190
public void run() {
188-
init.countDown();
191+
startedThreads.countDown();
189192
if (state.acquire(StateHelper.WRITING)) {
190193
try {
191194
writeThread();
192195
} finally {
193196
state.release(StateHelper.WRITING);
194-
// avoid a case when this thread falls asleep here
195-
// after WRITING flag released and then can pollute the state
196-
if (generationSync.decrementAndGet() == 0) {
197-
state.trySignalForReconnection();
197+
if (exitedThreads.decrementAndGet() == 0) {
198+
state.compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT);
198199
}
199200
}
200201
}
201202
}
202203
});
203204

204-
// reconnection preparation is done
205-
// before reconnection the state will be released
206-
// reader/writer threads have been replaced by new ones
207-
// it's required to be sure that old r/w threads see correct
208-
// client's r/w references
209205
state.release(StateHelper.RECONNECT);
210206

211207
configureThreads(threadName);
212208
reader.start();
213209
writer.start();
214-
init.await();
210+
startedThreads.await();
215211
}
216212

217213
protected void configureThreads(String threadName) {
@@ -635,23 +631,12 @@ protected final class StateHelper {
635631

636632
private final AtomicInteger state;
637633

634+
private final AtomicReference<CountDownLatch> nextReconnectLatch =
635+
new AtomicReference<>(new CountDownLatch(1));
638636
private final AtomicReference<CountDownLatch> nextAliveLatch =
639637
new AtomicReference<>(new CountDownLatch(1));
640-
641638
private final CountDownLatch closedLatch = new CountDownLatch(1);
642639

643-
/**
644-
* The condition variable to signal a reconnection is needed from reader /
645-
* writer threads and waiting for that signal from the reconnection thread.
646-
*
647-
* The lock variable to access this condition.
648-
*
649-
* @see #awaitReconnection()
650-
* @see #trySignalForReconnection()
651-
*/
652-
protected final ReentrantLock connectorLock = new ReentrantLock();
653-
protected final Condition reconnectRequired = connectorLock.newCondition();
654-
655640
protected StateHelper(int state) {
656641
this.state = new AtomicInteger(state);
657642
}
@@ -722,7 +707,10 @@ protected boolean compareAndSet(int expect, int update) {
722707
return false;
723708
}
724709

725-
if (update == ALIVE) {
710+
if (update == RECONNECT) {
711+
CountDownLatch latch = nextReconnectLatch.getAndSet(new CountDownLatch(1));
712+
latch.countDown();
713+
} else if (update == ALIVE) {
726714
CountDownLatch latch = nextAliveLatch.getAndSet(new CountDownLatch(1));
727715
latch.countDown();
728716
onReconnect();
@@ -732,18 +720,10 @@ protected boolean compareAndSet(int expect, int update) {
732720
return true;
733721
}
734722

735-
/**
736-
* Reconnection uses another way to await state via receiving a signal
737-
* instead of latches.
738-
*/
739723
protected void awaitState(int state) throws InterruptedException {
740-
if (state == RECONNECT) {
741-
awaitReconnection();
742-
} else {
743-
CountDownLatch latch = getStateLatch(state);
744-
if (latch != null) {
745-
latch.await();
746-
}
724+
CountDownLatch latch = getStateLatch(state);
725+
if (latch != null) {
726+
latch.await();
747727
}
748728
}
749729

@@ -756,6 +736,13 @@ private CountDownLatch getStateLatch(int state) {
756736
if (state == CLOSED) {
757737
return closedLatch;
758738
}
739+
if (state == RECONNECT) {
740+
if (getState() == CLOSED) {
741+
throw new IllegalStateException("State is CLOSED.");
742+
}
743+
CountDownLatch latch = nextReconnectLatch.get();
744+
return (getState() == RECONNECT) ? null : latch;
745+
}
759746
if (state == ALIVE) {
760747
if (getState() == CLOSED) {
761748
throw new IllegalStateException("State is CLOSED.");
@@ -767,38 +754,6 @@ private CountDownLatch getStateLatch(int state) {
767754
}
768755
return null;
769756
}
770-
771-
/**
772-
* Blocks until a reconnection signal will be received.
773-
*
774-
* @see #trySignalForReconnection()
775-
*/
776-
private void awaitReconnection() throws InterruptedException {
777-
connectorLock.lock();
778-
try {
779-
while (getState() != StateHelper.RECONNECT) {
780-
reconnectRequired.await();
781-
}
782-
} finally {
783-
connectorLock.unlock();
784-
}
785-
}
786-
787-
/**
788-
* Signals to the connector that reconnection process can be performed.
789-
*
790-
* @see #awaitReconnection()
791-
*/
792-
private void trySignalForReconnection() {
793-
if (compareAndSet(StateHelper.UNINITIALIZED, StateHelper.RECONNECT)) {
794-
connectorLock.lock();
795-
try {
796-
reconnectRequired.signal();
797-
} finally {
798-
connectorLock.unlock();
799-
}
800-
}
801-
}
802757
}
803758

804759
protected static class TarantoolOp<V> extends CompletableFuture<V> {

0 commit comments

Comments
 (0)