Skip to content

Commit

Permalink
BMM Restart Improvements Part 1. Leader Coordinator Issuing Assignmen…
Browse files Browse the repository at this point in the history
…t Tokens (#919)

Leader Coordinator is issuing assignment tokens for stopping streams
  • Loading branch information
jzakaryan authored Dec 14, 2022
1 parent 2fd5f0f commit 2ab7ce0
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/**
* Copyright 2022 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

import com.linkedin.datastream.common.JsonUtils;


/**
* Data structure to store assignment tokens. These are used as a mechanism for followers to signal the leader that
* they handled assignment change
*/
public final class AssignmentToken {
private String _issuedBy;
private String _issuedFor;
private long _timestamp;

/**
* Constructor for {@link AssignmentToken}
*/
public AssignmentToken(String issuedBy, String issuedFor) {
_issuedBy = issuedBy;
_issuedFor = issuedFor;
_timestamp = System.currentTimeMillis();
}

/**
* Default constructor for {@link AssignmentToken}, required for json ser/de
*/
public AssignmentToken() {

}

/**
* Creates {@link AssignmentToken} instance from JSON
*/
public static AssignmentToken fromJson(String json) {
return JsonUtils.fromJson(json, AssignmentToken.class);
}

/**
* Converts the object to JSON
*/
public String toJson() {
return JsonUtils.toJson(this);
}

/**
* Gets the name of the leader host that issued the token
*/
public String getIssuedBy() {
return _issuedBy;
}

/**
* Gets the name of the host for which the token was issued
*/
public String getIssuedFor() {
return _issuedFor;
}

/**
* Gets the timestamp (in UNIX epoch format) for when the token was issued
*/
public long getTimestamp() {
return _timestamp;
}

/**
* Sets the name of the leader host that issued the token
*/
public void setIssuedBy(String issuedBy) {
_issuedBy = issuedBy;
}

/**
* Sets the name of the host for which the token was issued
*/
public void setIssuedFor(String issuedFor) {
_issuedFor = issuedFor;
}

/**
* Sets the timestamp (in UNIX epoch format) for when the token was issued
*/
public void setTimestamp(long timestamp) {
_timestamp = timestamp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,11 @@ private void handleLeaderDoAssignment(boolean cleanUpOrphanNodes) {
// assignment and do remove and add zNodes accordingly. In the case of ZooKeeper failure (when
// it failed to create or delete zNodes), we will do our best to continue the current process
// and schedule a retry. The retry should be able to diff the remaining ZooKeeper work
_adapter.updateAllAssignments(newAssignmentsByInstance);
if (_config.getEnableAssignmentTokens()) {
_adapter.updateAllAssignmentsAndIssueTokens(newAssignmentsByInstance, stoppingDatastreamGroups);
} else {
_adapter.updateAllAssignments(newAssignmentsByInstance);
}

for (DatastreamGroup datastreamGroup : stoppingDatastreamGroups) {
for (Datastream datastream : datastreamGroup.getDatastreams()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public final class CoordinatorConfig {
public static final String CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP = PREFIX + "performPreAssignmentCleanup";
public static final String CONFIG_REINIT_ON_NEW_ZK_SESSION = PREFIX + "reinitOnNewZKSession";
public static final String CONFIG_MAX_ASSIGNMENT_RETRY_COUNT = PREFIX + "maxAssignmentRetryCount";
public static final String CONFIG_ENABLE_ASSIGNMENT_TOKENS = PREFIX + "enableAssignmentTokens";

public static final int DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT = 100;

Expand All @@ -50,6 +51,7 @@ public final class CoordinatorConfig {
private final boolean _performPreAssignmentCleanup;
private final boolean _reinitOnNewZkSession;
private final int _maxAssignmentRetryCount;
private final boolean _enableAssignmentTokens;

/**
* Construct an instance of CoordinatorConfig
Expand All @@ -75,6 +77,7 @@ public CoordinatorConfig(Properties config) {
_performPreAssignmentCleanup = _properties.getBoolean(CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP, false);
_reinitOnNewZkSession = _properties.getBoolean(CONFIG_REINIT_ON_NEW_ZK_SESSION, false);
_maxAssignmentRetryCount = _properties.getInt(CONFIG_MAX_ASSIGNMENT_RETRY_COUNT, DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT);
_enableAssignmentTokens = _properties.getBoolean(CONFIG_ENABLE_ASSIGNMENT_TOKENS, false);
}

public Properties getConfigProperties() {
Expand Down Expand Up @@ -136,4 +139,8 @@ public boolean getReinitOnNewZkSession() {
public int getMaxAssignmentRetryCount() {
return _maxAssignmentRetryCount;
}

public boolean getEnableAssignmentTokens() {
return _enableAssignmentTokens;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public final class KeyBuilder {
private static final String INSTANCE_ASSIGNMENT = "/%s/instances/%s/assignments/%s";
private static final String DATASTREAMS = "/%s/dms";
private static final String DATASTREAM = "/%s/dms/%s";
private static final String DATASTREAM_ASSIGNMENT_TOKENS = "/%s/dms/%s/assignmentTokens";
private static final String DATASTREAM_ASSIGNMENT_TOKEN_FOR_INSTANCE = "/%s/dms/%s/assignmentTokens/%s";
private static final String CONNECTORS = "/%s/connectors";
private static final String CONNECTOR = "/%s/connectors/%s";

Expand Down Expand Up @@ -200,6 +202,25 @@ public static String datastream(String cluster, String stream) {
return String.format(DATASTREAM, cluster, stream);
}

/**
* Get the ZooKeeper znode for a datastream's assignment tokens in a Brooklin cluster
* @param cluster Brooklin cluster name
* @param stream Datastream name
*/
public static String datastreamAssignmentTokens(String cluster, String stream) {
return String.format(DATASTREAM_ASSIGNMENT_TOKENS, cluster, stream);
}

/**
* Get the ZooKeeper znode for a datastream's assignment token for an instance in a Brooklin cluster
* @param cluster Brooklin cluster name
* @param stream Datastream name
* @param instance Instance name
*/
public static String datastreamAssignmentTokenForInstance(String cluster, String stream, String instance) {
return String.format(DATASTREAM_ASSIGNMENT_TOKEN_FOR_INSTANCE, cluster, stream, instance);
}

/**
* Get the ZooKeeeper znode for numTasks of a specific datastream in a Brooklin cluster
* This node will only be present for datastreams that use elastic task assignment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
Expand All @@ -47,6 +48,8 @@
import com.linkedin.datastream.common.DatastreamUtils;
import com.linkedin.datastream.common.ErrorLogger;
import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.AssignmentToken;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.DatastreamTaskImpl;
import com.linkedin.datastream.server.HostTargetAssignment;
Expand Down Expand Up @@ -728,6 +731,93 @@ private void removeTaskNode(String instance, String name) {
_zkclient.deleteRecursive(instancePath);
}

/**
* Update the task assignment for the given instances and issue assignment tokens for stopping tasks.
* Assignment tokens are used by the follower hosts to signal the leader that they finished handling their assignments.
*
* In addition to the work done by updateAllAssignments, this method writes to
* <ol>
* * <li>{@code /<cluster>/dms/<datastream-name>/assignmentTokens/<token1>,<token2>...}</li>
* </ol>
*/
public void updateAllAssignmentsAndIssueTokens(Map<String, List<DatastreamTask>> assignmentsByInstance,
List<DatastreamGroup> stoppingDatastreamGroups) {
Validate.notNull(assignmentsByInstance, "null assignmentsByInstance");
Validate.notNull(stoppingDatastreamGroups, "null stoppingDatastreamGroups");

Map<DatastreamGroup, Set<String>> stoppingDgInstances =
getStoppingDatastreamGroupInstances(stoppingDatastreamGroups);
issueAssignmentTokensForStoppingDatastreams(stoppingDatastreamGroups, stoppingDgInstances);

updateAllAssignments(assignmentsByInstance);
}

/**
* For each stopping datastream group, finds all the instances that currently have tasks assigned
*/
private Map<DatastreamGroup, Set<String>> getStoppingDatastreamGroupInstances(
List<DatastreamGroup> stoppingDatastreamGroups) {
Map<String, Set<DatastreamTask>> currentAssignment = getAllAssignedDatastreamTasks();
Set<String> stoppingDatastreamTaskPrefixes = stoppingDatastreamGroups.stream().
map(DatastreamGroup::getTaskPrefix).collect(toSet());
Map<String, DatastreamGroup> taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream().
collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity()));

Map<DatastreamGroup, Set<String>> stoppingDgInstances = new HashMap<>();
currentAssignment.keySet()
.forEach(i -> currentAssignment.get(i).stream()
.filter(t -> stoppingDatastreamTaskPrefixes.contains(t.getTaskPrefix()))
.forEach(st -> {
DatastreamGroup datastreamGroup = taskPrefixDatastreamGroups.get(st.getTaskPrefix());
stoppingDgInstances.computeIfAbsent(datastreamGroup, k -> new HashSet<>()).add(i);
}));

return stoppingDgInstances;
}

/**
* Issues assignment tokens for each instance and each stopping datastream
*/
private void issueAssignmentTokensForStoppingDatastreams(List<DatastreamGroup> stoppingDatastreamGroups,
Map<DatastreamGroup, Set<String>> stoppingDgInstances) {
String hostname = getLocalHostName();
for (DatastreamGroup stoppingGroup : stoppingDatastreamGroups) {
for (Datastream stoppingStream : stoppingGroup.getDatastreams()) {
String path = KeyBuilder.datastream(_cluster, stoppingStream.getName());

if (!_zkclient.exists(path)) {
LOG.warn("Trying to issue assignment tokens for non-existing stream: " + stoppingStream.getName());
continue;
}

String assignmentTokensPath = KeyBuilder.datastreamAssignmentTokens(_cluster, stoppingStream.getName());
_zkclient.ensurePath(assignmentTokensPath);

Set<String> instances = stoppingDgInstances.get(stoppingGroup);
LOG.info("Issuing assignment tokens for stream {} and {} instance(s)", stoppingStream.getName(), instances.size());
for (String instance : instances) {
String assignmentTokenPath = KeyBuilder.datastreamAssignmentTokenForInstance(_cluster,
stoppingStream.getName(), instance);
AssignmentToken token = new AssignmentToken(hostname, instance);
_zkclient.create(assignmentTokenPath, token.toJson(), CreateMode.PERSISTENT);
}
}
}
}

/**
* Gets the name of the local host
*/
private String getLocalHostName() {
String hostname = "localhost";
try {
hostname = InetAddress.getLocalHost().getHostName();
} catch (UnknownHostException ex) {
LOG.warn("Unable to obtain hostname for leader");
}
return hostname;
}

/**
* Update the task assignment of a given instance
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package com.linkedin.datastream.server.zk;

import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -30,6 +31,7 @@
import com.linkedin.datastream.common.Datastream;
import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.AssignmentToken;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.DatastreamTaskImpl;
Expand Down Expand Up @@ -1059,6 +1061,59 @@ public void testUpdateAssignmentDeleteUnusedTasks() {
adapter.disconnect();
}

@Test
public void testUpdateAllAssignmentAndIssueTokens() throws Exception {
String testCluster = "testUpdateAllAssignmentAndIssueTokens";
String connectorType = "connectorType";
ZkClient zkClient = new ZkClient(_zkConnectionString);
ZkAdapter adapter = createZkAdapter(testCluster);
adapter.connect();

// assigning 2 tasks to the cluster
Datastream[] datastreams1 = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType,
"datastream1");
DatastreamGroup datastreamGroup1 = new DatastreamGroup(Arrays.asList(datastreams1));

Datastream[] datastreams2 = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType,
"datastream2");
DatastreamGroup datastreamGroup2 = new DatastreamGroup(Arrays.asList(datastreams2));

DatastreamTaskImpl task1 = new DatastreamTaskImpl();
task1.setTaskPrefix(datastreamGroup1.getTaskPrefix());
task1.setConnectorType(connectorType);

DatastreamTaskImpl task2 = new DatastreamTaskImpl();
task2.setTaskPrefix(datastreamGroup2.getTaskPrefix());
task2.setConnectorType(connectorType);

Map<String, List<DatastreamTask>> oldAassignment = new HashMap<>();
oldAassignment.put("instance1", Collections.singletonList(task1));
oldAassignment.put("instance2", Collections.singletonList(task2));

adapter.updateAllAssignments(oldAassignment);

// simulating a stopping datastream which has a task assigned to instance2
List<DatastreamGroup> stoppingDatastreamGroups = Collections.singletonList(datastreamGroup2);
Map<String, List<DatastreamTask>> newAssignment = new HashMap<>();
newAssignment.put("instance1", Collections.singletonList(task1));
adapter.updateAllAssignmentsAndIssueTokens(newAssignment, stoppingDatastreamGroups);

// asserting that:
// (1) the assignment tokens path was created for stopping stream
// (2) a token has ben issued for instance2, and only for instance2
// (3) the token data is correct
String assignmentTokensPath = KeyBuilder.datastreamAssignmentTokens(testCluster, datastreamGroup2.getTaskPrefix());
Assert.assertTrue(zkClient.exists(assignmentTokensPath));
List<String> tokenNodes = zkClient.getChildren(assignmentTokensPath);
Assert.assertEquals(tokenNodes.size(), 1);
String tokenData = zkClient.readData(
KeyBuilder.datastreamAssignmentTokenForInstance(testCluster, datastreamGroup2.getTaskPrefix(), tokenNodes.get(0)));
AssignmentToken token = AssignmentToken.fromJson(tokenData);
Assert.assertEquals(token.getIssuedFor(), "instance2");
String hostname = InetAddress.getLocalHost().getHostName();
Assert.assertEquals(token.getIssuedBy(), hostname);
}

@Test
public void testSaveStateOnlyWhenTaskExists() {
String testCluster = "testSaveStateOnlyWhenTaskExists";
Expand Down

0 comments on commit 2ab7ce0

Please sign in to comment.