Skip to content

Commit

Permalink
[FLINK-18677][fix] Added handling of suspended or lost connections wi…
Browse files Browse the repository at this point in the history
…thin the ZooKeeperLeaderRetrievalService.

The listener needs to be notified in case of a connection loss so that it is able to initiate necessary actions on its side.

[FLINK-18677][runtime] [style] Replaced spaces by TABs to follow the Apache Flink code styles.

[FLINK-18677][runtime] [fix] Synchronize notifyLeaderLoss through lock and removed redundant code.

The redundant code was moved into notifyIfNewLeaderAddress(String, UUID) which is then used by notifyLeaderLoss() and within nodeChanged(). Additionally, the method call of notifyLeaderLoss() is guarded now by a lock to synchronize the state change (i.e. lastLeaderAddress and lastLeaderSessionID).

[FLINK-18677][runtime] The exception was added to make it more explicit that the method is not expected to be called.

[FLINK-18677][runtime] Decreased wait time the queue to be filled since we're not expecting any objects.

The test does not expect any calls happening. Hence, no CompletableFuture instance will be queued. The longer wait time would just result in a longer running test.

[FLINK-18677][runtime] Added infinite wait time to happy test.

The previous implementation had a fixed timeout. Slower machines might need longer to process the test which might result in test failures. The new implementation removes the timeout so that the test wouldn't fail just because of a poor performance of the machine the test is running on.

[FLINK-18677][runtime] Moved log messages out of synchronization blocks.

This closes apache#13055.
  • Loading branch information
XComp authored and tillrohrmann committed Aug 5, 2020
1 parent 8ece02f commit cf8b882
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.concurrent.GuardedBy;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
Expand Down Expand Up @@ -165,17 +167,7 @@ public void nodeChanged() throws Exception {
}
}

if (!(Objects.equals(leaderAddress, lastLeaderAddress) &&
Objects.equals(leaderSessionID, lastLeaderSessionID))) {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
leaderAddress,
leaderSessionID);

lastLeaderAddress = leaderAddress;
lastLeaderSessionID = leaderSessionID;
leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
}
notifyIfNewLeaderAddress(leaderAddress, leaderSessionID);
} catch (Exception e) {
leaderListener.handleError(new Exception("Could not handle node changed event.", e));
throw e;
Expand All @@ -193,14 +185,20 @@ protected void handleStateChange(ConnectionState newState) {
break;
case SUSPENDED:
LOG.warn("Connection to ZooKeeper suspended. Can no longer retrieve the leader from " +
"ZooKeeper.");
"ZooKeeper.");
synchronized (lock) {
notifyLeaderLoss();
}
break;
case RECONNECTED:
LOG.info("Connection to ZooKeeper was reconnected. Leader retrieval can be restarted.");
break;
case LOST:
LOG.warn("Connection to ZooKeeper lost. Can no longer retrieve the leader from " +
"ZooKeeper.");
"ZooKeeper.");
synchronized (lock) {
notifyLeaderLoss();
}
break;
}
}
Expand All @@ -209,4 +207,28 @@ protected void handleStateChange(ConnectionState newState) {
public void unhandledError(String s, Throwable throwable) {
leaderListener.handleError(new FlinkException("Unhandled error in ZooKeeperLeaderRetrievalService:" + s, throwable));
}

@GuardedBy("lock")
private void notifyIfNewLeaderAddress(String newLeaderAddress, UUID newLeaderSessionID) {
if (!(Objects.equals(newLeaderAddress, lastLeaderAddress) &&
Objects.equals(newLeaderSessionID, lastLeaderSessionID))) {
if (newLeaderAddress == null && newLeaderSessionID == null) {
LOG.debug("Leader information was lost: The listener will be notified accordingly.");
} else {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
newLeaderAddress,
newLeaderSessionID);
}

lastLeaderAddress = newLeaderAddress;
lastLeaderSessionID = newLeaderSessionID;
leaderListener.notifyLeaderAddress(newLeaderAddress, newLeaderSessionID);
}
}

@GuardedBy("lock")
private void notifyLeaderLoss() {
notifyIfNewLeaderAddress(null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.leaderelection;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.util.TestLogger;

import org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;

import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

/**
* Tests for the error handling in case of a suspended connection to the ZooKeeper instance.
*/
public class ZooKeeperLeaderElectionConnectionHandlingTest extends TestLogger {

private TestingServer testingServer;

private Configuration config;

private CuratorFramework zooKeeperClient;

@Before
public void before() throws Exception {
testingServer = new TestingServer();

config = new Configuration();
config.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, testingServer.getConnectString());

zooKeeperClient = ZooKeeperUtils.startCuratorFramework(config);
}

@After
public void after() throws Exception {
stopTestServer();

if (zooKeeperClient != null) {
zooKeeperClient.close();
zooKeeperClient = null;
}
}

@Test
public void testConnectionSuspendedHandlingDuringInitialization() throws Exception {
QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(1, Duration.ofMillis(50));

ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config);
testInstance.start(queueLeaderElectionListener);

// do the testing
CompletableFuture<String> firstAddress = queueLeaderElectionListener.next();
assertThat("No results are expected, yet, since no leader was elected.", firstAddress, is(nullValue()));

stopTestServer();

CompletableFuture<String> secondAddress = queueLeaderElectionListener.next();
assertThat("No result is expected since there was no leader elected before stopping the server, yet.", secondAddress, is(nullValue()));
}

@Test
public void testConnectionSuspendedHandling() throws Exception {
String leaderAddress = "localhost";
LeaderElectionService leaderElectionService = ZooKeeperUtils.createLeaderElectionService(zooKeeperClient, config);
TestingContender contender = new TestingContender(leaderAddress, leaderElectionService);
leaderElectionService.start(contender);

QueueLeaderElectionListener queueLeaderElectionListener = new QueueLeaderElectionListener(2);

ZooKeeperLeaderRetrievalService testInstance = ZooKeeperUtils.createLeaderRetrievalService(zooKeeperClient, config);
testInstance.start(queueLeaderElectionListener);

// do the testing
CompletableFuture<String> firstAddress = queueLeaderElectionListener.next();
assertThat("The first result is expected to be the initially set leader address.", firstAddress.get(), is(leaderAddress));

stopTestServer();

CompletableFuture<String> secondAddress = queueLeaderElectionListener.next();
assertThat("The next result must not be missing.", secondAddress, is(notNullValue()));
assertThat("The next result is expected to be null.", secondAddress.get(), is(nullValue()));
}

private void stopTestServer() throws IOException {
if (testingServer != null) {
testingServer.stop();
testingServer = null;
}
}

private static class QueueLeaderElectionListener implements LeaderRetrievalListener {

private final BlockingQueue<CompletableFuture<String>> queue;
private final Duration timeout;

public QueueLeaderElectionListener(int expectedCalls) {
this(expectedCalls, null);
}

public QueueLeaderElectionListener(int expectedCalls, Duration timeout) {
this.queue = new ArrayBlockingQueue<>(expectedCalls);
this.timeout = timeout;
}

@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
try {
if (timeout == null) {
queue.put(CompletableFuture.completedFuture(leaderAddress));
} else {
queue.offer(CompletableFuture.completedFuture(leaderAddress), timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

public CompletableFuture<String> next() {
try {
if (timeout == null) {
return queue.take();
} else {
return this.queue.poll(timeout.toMillis(), TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}

@Override
public void handleError(Exception exception) {
throw new UnsupportedOperationException("handleError(Exception) shouldn't have been called, but it was triggered anyway.", exception);
}
}
}

0 comments on commit cf8b882

Please sign in to comment.