Skip to content

Commit

Permalink
Handle the new session after session expiry (#770)
Browse files Browse the repository at this point in the history
This is the final change to handle new session after session expiry. In this change, we have re-initialized all the local states, listeners, event threads and made the node re-join the cluster.
  • Loading branch information
vmaheshw authored Aug 3, 2021
1 parent 7f7c7c5 commit 457ac60
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2882,11 +2882,29 @@ public void testCoordinatorLeaderCleanupTasksPostElection() throws Exception {

@Test
public void testOnSessionExpired() throws Exception {
testOnSessionExpired(false);
}

@Test
public void testOnSessionExpiredHandleNewSession() throws Exception {
testOnSessionExpired(true);
}

void testOnSessionExpired(boolean handleNewSession) throws DatastreamException, InterruptedException {
String testCluster = "testCoordinationSmoke3";
String testConnectorType = "testConnectorType";
String datastreamName = "datastreamNameSessionExpired";

Coordinator instance1 = createCoordinator(_zkConnectionString, testCluster);
Properties props = new Properties();
props.put(CoordinatorConfig.CONFIG_CLUSTER, testCluster);
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, _zkConnectionString);
props.put(CoordinatorConfig.CONFIG_ZK_SESSION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_SESSION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_ZK_CONNECTION_TIMEOUT, String.valueOf(ZkClient.DEFAULT_CONNECTION_TIMEOUT));
props.put(CoordinatorConfig.CONFIG_REINIT_ON_NEW_ZK_SESSION, String.valueOf(handleNewSession));

ZkClient zkClient = new ZkClient(_zkConnectionString);
_cachedDatastreamReader = new CachedDatastreamReader(zkClient, testCluster);
Coordinator instance1 = new TestCoordinatorWithSpyZkAdapter(_cachedDatastreamReader, props);
instance1.addTransportProvider(DummyTransportProviderAdminFactory.PROVIDER_NAME,
new DummyTransportProviderAdminFactory().createTransportProviderAdmin(
DummyTransportProviderAdminFactory.PROVIDER_NAME, new Properties()));
Expand All @@ -2897,20 +2915,29 @@ public void testOnSessionExpired() throws Exception {
new SourceBasedDeduper(), null);
instance1.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, testConnectorType, datastreamName);
ZkClient zkClient1 = new ZkClient(_zkConnectionString);
DatastreamTestUtils.createAndStoreDatastreams(zkClient1, testCluster, testConnectorType, datastreamName);
// verify the assignment
assertConnectorAssignment(connector1, WAIT_TIMEOUT_MS, datastreamName);

zkClient.delete(KeyBuilder.liveInstance(testCluster, "0000000000"));
instance1.onSessionExpired();
PollUtils.poll(() -> {
return connector1._tasks.size() == 0;
}, 1000, WAIT_TIMEOUT_MS);
PollUtils.poll(() -> connector1._tasks.size() == 0, 1000, WAIT_TIMEOUT_MS);
Assert.assertEquals(instance1.getDatastreamTasks().size(), 0);
Thread t = instance1.getEventThread();
Assert.assertFalse(t != null && t.isAlive());
Assert.assertTrue(PollUtils.poll(instance1::isZkSessionExpired, 100, 30000));
verify(mockStrategy, times(1)).cleanupStrategy();

if (handleNewSession) {
instance1.onNewSession();
PollUtils.poll(() -> connector1._tasks.size() == 1, 1000, WAIT_TIMEOUT_MS);
Assert.assertEquals(instance1.getDatastreamTasks().size(), 1);
t = instance1.getEventThread();
Assert.assertTrue(t != null && t.isAlive());
Assert.assertTrue(PollUtils.poll(() -> instance1.getIsLeader().getAsBoolean(), 100, 30000));
}

instance1.stop();
instance1.getDatastreamCache().getZkclient().close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@
* Coordinator Connector
*
* ┌──────────────┐ ┌─────────────────────────────────────────┐ ┌─────────────────┐
* │ │ │ │ │ │
* │ │ │ │ │ │
* │ │ │ ┌──────────┐ ┌────────────────┐ │ │ │
* │ │ │ | |──▶ onNewSession │ │ | │
* │ │ │ │ │ └────────────────┘ │ │ │
* │ │ │ | | ┌────────────────┐ │ │ │
* │ │ │ │ZkAdapter ├──▶ onBecomeLeader │ │ │ │
* │ │ │ │ │ └────────────────┘ │ │ │
* │ ├───────┼─▶ │ ┌──────────────────┐ │ │ │
Expand Down Expand Up @@ -248,7 +249,7 @@ ZkAdapter createZkAdapter() {
public void start() {
_log.info("Starting coordinator");
startEventThread();
_adapter.connect();
_adapter.connect(_config.getReinitOnNewZkSession());

for (String connectorType : _connectors.keySet()) {
ConnectorInfo connectorInfo = _connectors.get(connectorType);
Expand Down Expand Up @@ -530,6 +531,21 @@ boolean isZkSessionExpired() {
return _zkSessionExpired;
}

@Override
public void onNewSession() {
createEventThread();
startEventThread();
_adapter.connect(true);
// ensure it doesn't miss any assignment created
_eventQueue.put(CoordinatorEvent.createHandleAssignmentChangeEvent());

// Queue up one heartbeat per period with a initial delay of 3 periods
_executor.scheduleAtFixedRate(() -> _eventQueue.put(CoordinatorEvent.HEARTBEAT_EVENT),
_heartbeatPeriod.toMillis() * 3, _heartbeatPeriod.toMillis(), TimeUnit.MILLISECONDS);

_zkSessionExpired = false;
}

private void getAssignmentsFuture(List<Future<Boolean>> assignmentChangeFutures, Instant start)
throws TimeoutException, InterruptedException {
for (Future<Boolean> assignmentChangeFuture : assignmentChangeFutures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class CoordinatorConfig {
public static final String CONFIG_ZK_CLEANUP_ORPHAN_CONNECTOR_TASK_LOCK = PREFIX + "zkCleanUpOrphanConnectorTaskLock";
public static final String CONFIG_MAX_DATASTREAM_TASKS_PER_INSTANCE = PREFIX + "maxDatastreamTasksPerInstance";
public static final String CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP = PREFIX + "performPreAssignmentCleanup";
public static final String CONFIG_REINIT_ON_NEW_ZK_SESSION = PREFIX + "reinitOnNewZKSession";

private final String _cluster;
private final String _zkAddress;
Expand All @@ -44,6 +45,7 @@ public final class CoordinatorConfig {
private final boolean _zkCleanUpOrphanConnectorTaskLock;
private final int _maxDatastreamTasksPerInstance;
private final boolean _performPreAssignmentCleanup;
private final boolean _reinitOnNewZkSession;

/**
* Construct an instance of CoordinatorConfig
Expand All @@ -65,6 +67,7 @@ public CoordinatorConfig(Properties config) {
_zkCleanUpOrphanConnectorTaskLock = _properties.getBoolean(CONFIG_ZK_CLEANUP_ORPHAN_CONNECTOR_TASK_LOCK, false);
_maxDatastreamTasksPerInstance = _properties.getInt(CONFIG_MAX_DATASTREAM_TASKS_PER_INSTANCE, 0);
_performPreAssignmentCleanup = _properties.getBoolean(CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP, false);
_reinitOnNewZkSession = _properties.getBoolean(CONFIG_REINIT_ON_NEW_ZK_SESSION, false);
}

public Properties getConfigProperties() {
Expand Down Expand Up @@ -118,4 +121,8 @@ public boolean getPerformPreAssignmentCleanup() {
public long getDebounceTimerMs() {
return _debounceTimerMs;
}

public boolean getReinitOnNewZkSession() {
return _reinitOnNewZkSession;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public class ZkAdapter {
private String _currentSubscription = null;

private ZkLeaderElectionListener _leaderElectionListener = null;
private ZkBackedTaskListProvider _assignmentList = null;
private ZkBackedTaskListProvider _assignmentListProvider = null;
private ZkStateChangeListener _stateChangeListener = null;

// only the leader should maintain this list and listen to the changes of live instances
Expand All @@ -159,6 +159,8 @@ public class ZkAdapter {
// object to synchronize zk session handling states
private final Object _zkSessionLock = new Object();

private boolean _reinitOnNewSession = false;

/**
* Constructor
* @param zkServers ZooKeeper server address to connect to
Expand Down Expand Up @@ -229,7 +231,7 @@ public void disconnect() {
} catch (ZkException zke) {
// do nothing, best effort clean up
} finally {
closeZkListener(true);
closeZkListeners(true);
_zkclient.close();
_zkclient = null;
_leaderElectionListener = null;
Expand All @@ -248,10 +250,32 @@ ZkClient createZkClient() {
* the actions that need to be taken with them, which are implemented in the Coordinator class
*/
public void connect() {
disconnect(); // Guard against leaking an existing zookeeper session
_zkclient = createZkClient();
_stateChangeListener = new ZkStateChangeListener();
_leaderElectionListener = new ZkLeaderElectionListener();
connect(false);
}

/**
* Connect the adapter so that it can connect and bridge events between ZooKeeper changes and
* the actions that need to be taken with them, which are implemented in the Coordinator class
*
* @param reinitOnNewSession re-initialize the object on new session after session expiry
*/
public void connect(boolean reinitOnNewSession) {
_reinitOnNewSession = reinitOnNewSession;
if (_zkclient == null) {
_zkclient = createZkClient();
}

if (_stateChangeListener == null) {
_stateChangeListener = new ZkStateChangeListener();
}

if (_leaderElectionListener == null) {
_leaderElectionListener = new ZkLeaderElectionListener();
}

if (_liveInstancesProvider == null) {
_liveInstancesProvider = new ZkBackedLiveInstanceListProvider();
}

// create a globally unique instance name and create a live instance node in ZooKeeper
_instanceName = createLiveInstanceNode();
Expand All @@ -260,7 +284,9 @@ public void connect() {

// both leader and follower needs to listen to its own instance change
// under /{cluster}/instances/{instance}
_assignmentList = new ZkBackedTaskListProvider(_cluster, _instanceName);
if (_assignmentListProvider == null) {
_assignmentListProvider = new ZkBackedTaskListProvider(_cluster, _instanceName);
}

// start with follower state, then join leader election
onBecomeFollower();
Expand Down Expand Up @@ -295,11 +321,15 @@ private void onBecomeFollower() {

LOG.info("Instance " + _instanceName + " becomes follower");

closeZkListener(false);
closeZkListeners(false);
_isLeader = false;
}

private void closeZkListener(boolean isDisconnect) {
private void closeZkListeners(boolean isDisconnect) {
closeZkListeners(isDisconnect, false);
}

private void closeZkListeners(boolean isDisconnect, boolean isSessionExpired) {

// Clean the following listeners only during zookeeper disconnect
if (isDisconnect) {
Expand All @@ -308,30 +338,32 @@ private void closeZkListener(boolean isDisconnect) {
_stateChangeListener = null;
}

if (_assignmentList != null) {
_assignmentList.close();
_assignmentList = null;
// unsubscribe any other left subscription.
_zkclient.unsubscribeAll();
}

if (isDisconnect || isSessionExpired) {
if (_assignmentListProvider != null) {
_assignmentListProvider.close();
_assignmentListProvider = null;
}

if (_currentSubscription != null) {
_zkclient.unsubscribeDataChanges(KeyBuilder.liveInstance(_cluster, _currentSubscription), _leaderElectionListener);
_currentSubscription = null;
}

// unsubscribe any other left subscription.
_zkclient.unsubscribeAll();
if (_liveInstancesProvider != null) {
_liveInstancesProvider.close();
_liveInstancesProvider = null;
}
}

if (_datastreamList != null) {
_datastreamList.close();
_datastreamList = null;
}

if (_liveInstancesProvider != null) {
_liveInstancesProvider.close();
_liveInstancesProvider = null;
}

if (_targetAssignmentProvider != null) {
_targetAssignmentProvider.close();
_targetAssignmentProvider = null;
Expand Down Expand Up @@ -377,8 +409,7 @@ private void joinLeaderElection() {
if (index < 0) {
// only when the ZooKeeper session already expired by the time this adapter joins for leader election.
// mostly because the zkclient session expiration timeout.
LOG.error("Failed to join leader election. Try reconnect the zookeeper");
connect();
LOG.error("Failed to join leader election. wait for the new session to be established");
return;
}

Expand Down Expand Up @@ -1415,7 +1446,6 @@ public void releaseTask(DatastreamTaskImpl task) {
task.getDatastreamTaskName());
return;
}

_zkclient.delete(lockPath);
LOG.info("{} successfully released the lock on {}-{}/{}", _instanceName, task.getConnectorType(), task.getTaskPrefix(),
task.getDatastreamTaskName());
Expand Down Expand Up @@ -1498,6 +1528,11 @@ public interface ZkAdapterListener {
* onSessionExpired is called when the zookeeper session expires.
*/
void onSessionExpired();

/**
* onNewSession is called when the zookeeper session is established after expiry.
*/
void onNewSession();
}

/**
Expand Down Expand Up @@ -1797,6 +1832,9 @@ public void handleStateChanged(Watcher.Event.KeeperState state) {
public void handleNewSession() {
synchronized (_zkSessionLock) {
LOG.info("ZkStateChangeListener::A new session has been established.");
if (_reinitOnNewSession) {
onNewSession();
}
}
}

Expand All @@ -1816,25 +1854,63 @@ private void scheduleExpiryTimerAfterSessionTimeout() {
}
}

@VisibleForTesting
void onNewSession() {
if (_listener != null) {
_listener.onNewSession();
}
}

@VisibleForTesting
void onSessionExpired() {
synchronized (_zkSessionLock) {
LOG.error("Zookeeper session expired.");
// cancel the lock clean up
_orphanLockCleanupFuture.cancel(true);
_orphanLockCleanupFuture = CompletableFuture.completedFuture("completed");
closeZkListeners(false, true);
onBecomeFollower();
if (_listener != null) {
_listener.onSessionExpired();
}
// currently it will try to disconnect and fail. TODO: fix the connect and listen to handleNewSession.
// Temporary hack to kill the zkEventThread at this point, to ensure that the connection to zookeeper
// is not re-initialized till reconnect path is fixed.
disconnect();
if (!_reinitOnNewSession) {
disconnect();
}
}
}

@VisibleForTesting
long getSessionId() {
return _zkclient.getSessionId();
}

@VisibleForTesting
ZkLeaderElectionListener getLeaderElectionListener() {
return _leaderElectionListener;
}

@VisibleForTesting
ZkBackedTaskListProvider getAssignmentListProvider() {
return _assignmentListProvider;
}

@VisibleForTesting
ZkStateChangeListener getStateChangeListener() {
return _stateChangeListener;
}

@VisibleForTesting
ZkBackedLiveInstanceListProvider getLiveInstancesProvider() {
return _liveInstancesProvider;
}

@VisibleForTesting
ZkBackedDMSDatastreamList getDatastreamList() {
return _datastreamList;
}

@VisibleForTesting
ZkTargetAssignmentProvider getTargetAssignmentProvider() {
return _targetAssignmentProvider;
}
}
Loading

0 comments on commit 457ac60

Please sign in to comment.