Skip to content

Commit 4680cb8

Browse files
committed
Atomic processing of create/remove of keepalive thread
1 parent 91677e9 commit 4680cb8

File tree

1 file changed

+58
-42
lines changed

1 file changed

+58
-42
lines changed

src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

+58-42
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,7 @@ public X509Certificate[] getAcceptedIssuers() {
163163
private volatile ExecutorService keepAliveThreadExecutor;
164164

165165
private final Lock connectLock = new ReentrantLock();
166+
private final Lock keepAliveThreadExecutorLock = new ReentrantLock();
166167

167168
/**
168169
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
@@ -735,46 +736,51 @@ public Thread newThread(Runnable runnable) {
735736
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
736737
}
737738
});
738-
threadExecutor.submit(new Runnable() {
739-
@Override
740-
public void run() {
741-
while (!threadExecutor.isShutdown()) {
742-
try {
743-
Thread.sleep(keepAliveInterval);
744-
} catch (InterruptedException e) {
745-
// expected in case of disconnect
746-
}
747-
if (threadExecutor.isShutdown()) {
748-
return;
749-
}
750-
boolean connectionLost = false;
751-
if (heartbeatInterval > 0) {
752-
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
753-
} else {
739+
try {
740+
keepAliveThreadExecutorLock.lock();
741+
threadExecutor.submit(new Runnable() {
742+
@Override
743+
public void run() {
744+
while (!threadExecutor.isShutdown()) {
754745
try {
755-
channel.write(new PingCommand());
756-
} catch (IOException e) {
757-
connectionLost = true;
746+
Thread.sleep(keepAliveInterval);
747+
} catch (InterruptedException e) {
748+
// expected in case of disconnect
758749
}
759-
}
760-
if (connectionLost) {
761-
if (logger.isLoggable(Level.INFO)) {
762-
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
750+
if (threadExecutor.isShutdown()) {
751+
return;
763752
}
764-
try {
765-
terminateConnect();
766-
connect(connectTimeout);
767-
} catch (Exception ce) {
768-
if (logger.isLoggable(Level.WARNING)) {
769-
logger.warning("Failed to restore connection to " + hostname + ":" + port +
770-
". Next attempt in " + keepAliveInterval + "ms");
753+
boolean connectionLost = false;
754+
if (heartbeatInterval > 0) {
755+
connectionLost = System.currentTimeMillis() - eventLastSeen > keepAliveInterval;
756+
} else {
757+
try {
758+
channel.write(new PingCommand());
759+
} catch (IOException e) {
760+
connectionLost = true;
761+
}
762+
}
763+
if (connectionLost) {
764+
if (logger.isLoggable(Level.INFO)) {
765+
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
766+
}
767+
try {
768+
terminateConnect();
769+
connect(connectTimeout);
770+
} catch (Exception ce) {
771+
if (logger.isLoggable(Level.WARNING)) {
772+
logger.warning("Failed to restore connection to " + hostname + ":" + port +
773+
". Next attempt in " + keepAliveInterval + "ms");
774+
}
771775
}
772776
}
773777
}
774778
}
775-
}
776-
});
777-
keepAliveThreadExecutor = threadExecutor;
779+
});
780+
keepAliveThreadExecutor = threadExecutor;
781+
} finally {
782+
keepAliveThreadExecutorLock.unlock();
783+
}
778784
}
779785

780786
private Thread newNamedThread(Runnable runnable, String threadName) {
@@ -784,7 +790,12 @@ private Thread newNamedThread(Runnable runnable, String threadName) {
784790
}
785791

786792
boolean isKeepAliveThreadRunning() {
787-
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
793+
try {
794+
keepAliveThreadExecutorLock.lock();
795+
return keepAliveThreadExecutor != null && !keepAliveThreadExecutor.isShutdown();
796+
} finally {
797+
keepAliveThreadExecutorLock.unlock();
798+
}
788799
}
789800

790801
/**
@@ -1134,14 +1145,19 @@ public void disconnect() throws IOException {
11341145
}
11351146

11361147
private void terminateKeepAliveThread() {
1137-
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1138-
if (keepAliveThreadExecutor == null) {
1139-
return;
1140-
}
1141-
keepAliveThreadExecutor.shutdownNow();
1142-
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1143-
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1144-
// ignore
1148+
try {
1149+
keepAliveThreadExecutorLock.lock();
1150+
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
1151+
if (keepAliveThreadExecutor == null) {
1152+
return;
1153+
}
1154+
keepAliveThreadExecutor.shutdownNow();
1155+
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
1156+
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
1157+
// ignore
1158+
}
1159+
} finally {
1160+
keepAliveThreadExecutorLock.unlock();
11451161
}
11461162
}
11471163

0 commit comments

Comments
 (0)