From cf8b882af788d467f43643d22d82a6a221860e24 Mon Sep 17 00:00:00 2001 From: xcomp Date: Thu, 30 Jul 2020 21:04:01 +0200 Subject: [PATCH] [FLINK-18677][fix] Added handling of suspended or lost connections within the ZooKeeperLeaderRetrievalService. The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side. [FLINK-18677][runtime] [style] Replaced spaces by TABs to follow the Apache Flink code styles. [FLINK-18677][runtime] [fix] Synchronize notifyLeaderLoss through lock and removed redundant code. The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID). [FLINK-18677][runtime] The exception was added to make it more explicit that the method is not expected to be called. [FLINK-18677][runtime] Decreased wait time the queue to be filled since we're not expecting any objects. The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test. [FLINK-18677][runtime] Added infinite wait time to happy test. The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on. [FLINK-18677][runtime] Moved log messages out of synchronization blocks. This closes #13055. --- .../ZooKeeperLeaderRetrievalService.java | 48 +++-- ...rLeaderElectionConnectionHandlingTest.java | 171 ++++++++++++++++++ 2 files changed, 206 insertions(+), 13 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java index 4fcd9c535a63b..df4853c2cacfd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/ZooKeeperLeaderRetrievalService.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.concurrent.GuardedBy; + import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.ObjectInputStream; @@ -165,17 +167,7 @@ public void nodeChanged() throws Exception { } } - if (!(Objects.equals(leaderAddress, lastLeaderAddress) && - Objects.equals(leaderSessionID, lastLeaderSessionID))) { - LOG.debug( - "New leader information: Leader={}, session ID={}.", - leaderAddress, - leaderSessionID); - - lastLeaderAddress = leaderAddress; - lastLeaderSessionID = leaderSessionID; - leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID); - } + notifyIfNewLeaderAddress(leaderAddress, leaderSessionID); } catch (Exception e) { leaderListener.handleError(new Exception("Could not handle node changed event.", e)); throw e; @@ -193,14 +185,20 @@ protected void handleStateChange(ConnectionState newState) { break; case SUSPENDED: LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " + - "ZooKeeper."); + "ZooKeeper."); + synchronized (lock) { + notifyLeaderLoss(); + } break; case RECONNECTED: LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted."); break; case LOST: LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " + - "ZooKeeper."); + "ZooKeeper."); + synchronized (lock) { + notifyLeaderLoss(); + } break; } } @@ -209,4 +207,28 @@ protected void handleStateChange(ConnectionState newState) { public void unhandledError(String s, Throwable throwable) { leaderListener.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable)); } + + @GuardedBy("lock") + private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) { + if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) && + Objects.equals(newLeaderSessionID, lastLeaderSessionID))) { + if (newLeaderAddress == null && newLeaderSessionID == null) { + LOG.debug("Leader information was lost: The listener will be notified accordingly."); + } else { + LOG.debug( + "New leader information: Leader={}, session ID={}.", + newLeaderAddress, + newLeaderSessionID); + } + + lastLeaderAddress = newLeaderAddress; + lastLeaderSessionID = newLeaderSessionID; + leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID); + } + } + + @GuardedBy("lock") + private void notifyLeaderLoss() { + notifyIfNewLeaderAddress(null, null); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java new file mode 100644 index 0000000000000..65e25d3ec146d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/ZooKeeperLeaderElectionConnectionHandlingTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.leaderelection; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework; + +import org.apache.curator.test.TestingServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.time.Duration; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; + +/** + * Tests for the error handling in case of a suspended connection to the ZooKeeper instance. + */ +public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger { + + private TestingServer testingServer; + + private Configuration config; + + private CuratorFramework zooKeeperClient; + + @Before + public void before() throws Exception { + testingServer = new TestingServer(); + + config = new Configuration(); + config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString()); + + zooKeeperClient = ZooKeeperUtils.startCuratorFramework(config); + } + + @After + public void after() throws Exception { + stopTestServer(); + + if (zooKeeperClient != null) { + zooKeeperClient.close(); + zooKeeperClient = null; + } + } + + @Test + public void testConnectionSuspendedHandlingDuringInitialization() throws Exception { + QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1, Duration.ofMillis(50)); + + ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config); + testInstance.start(queueLeaderElectionListener); + + // do the testing + CompletableFuture firstAddress = queueLeaderElectionListener.next(); + assertThat("No results are expected, yet, since no leader was elected.", firstAddress, is(nullValue())); + + stopTestServer(); + + CompletableFuture secondAddress = queueLeaderElectionListener.next(); + assertThat("No result is expected since there was no leader elected before stopping the server, yet.", secondAddress, is(nullValue())); + } + + @Test + public void testConnectionSuspendedHandling() throws Exception { + String leaderAddress = "localhost"; + LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(zooKeeperClient, config); + TestingContender contender = new TestingContender(leaderAddress, leaderElectionService); + leaderElectionService.start(contender); + + QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2); + + ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config); + testInstance.start(queueLeaderElectionListener); + + // do the testing + CompletableFuture firstAddress = queueLeaderElectionListener.next(); + assertThat("The first result is expected to be the initially set leader address.", firstAddress.get(), is(leaderAddress)); + + stopTestServer(); + + CompletableFuture secondAddress = queueLeaderElectionListener.next(); + assertThat("The next result must not be missing.", secondAddress, is(notNullValue())); + assertThat("The next result is expected to be null.", secondAddress.get(), is(nullValue())); + } + + private void stopTestServer() throws IOException { + if (testingServer != null) { + testingServer.stop(); + testingServer = null; + } + } + + private static class QueueLeaderElectionListener implements LeaderRetrievalListener { + + private final BlockingQueue> queue; + private final Duration timeout; + + public QueueLeaderElectionListener(int expectedCalls) { + this(expectedCalls, null); + } + + public QueueLeaderElectionListener(int expectedCalls, Duration timeout) { + this.queue = new ArrayBlockingQueue<>(expectedCalls); + this.timeout = timeout; + } + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + try { + if (timeout == null) { + queue.put(CompletableFuture.completedFuture(leaderAddress)); + } else { + queue.offer(CompletableFuture.completedFuture(leaderAddress), timeout.toMillis(), TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + public CompletableFuture next() { + try { + if (timeout == null) { + return queue.take(); + } else { + return this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void handleError(Exception exception) { + throw new UnsupportedOperationException("handleError(Exception) shouldn't have been called, but it was triggered anyway.", exception); + } + } +}