From 2ab7ce0065058913908660b576d93758a0b59594 Mon Sep 17 00:00:00 2001 From: Jhora Zakaryan Date: Wed, 14 Dec 2022 11:22:04 -0800 Subject: [PATCH] BMM Restart Improvements Part 1. Leader Coordinator Issuing Assignment Tokens (#919) Leader Coordinator is issuing assignment tokens for stopping streams --- .../datastream/server/AssignmentToken.java | 91 +++++++++++++++++++ .../datastream/server/Coordinator.java | 6 +- .../datastream/server/CoordinatorConfig.java | 7 ++ .../datastream/server/zk/KeyBuilder.java | 21 +++++ .../datastream/server/zk/ZkAdapter.java | 90 ++++++++++++++++++ .../datastream/server/zk/TestZkAdapter.java | 55 +++++++++++ 6 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java b/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java new file mode 100644 index 000000000..20c77db61 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/AssignmentToken.java @@ -0,0 +1,91 @@ +/** + * Copyright 2022 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +import com.linkedin.datastream.common.JsonUtils; + + +/** + * Data structure to store assignment tokens. These are used as a mechanism for followers to signal the leader that + * they handled assignment change + */ +public final class AssignmentToken { + private String _issuedBy; + private String _issuedFor; + private long _timestamp; + + /** + * Constructor for {@link AssignmentToken} + */ + public AssignmentToken(String issuedBy, String issuedFor) { + _issuedBy = issuedBy; + _issuedFor = issuedFor; + _timestamp = System.currentTimeMillis(); + } + + /** + * Default constructor for {@link AssignmentToken}, required for json ser/de + */ + public AssignmentToken() { + + } + + /** + * Creates {@link AssignmentToken} instance from JSON + */ + public static AssignmentToken fromJson(String json) { + return JsonUtils.fromJson(json, AssignmentToken.class); + } + + /** + * Converts the object to JSON + */ + public String toJson() { + return JsonUtils.toJson(this); + } + + /** + * Gets the name of the leader host that issued the token + */ + public String getIssuedBy() { + return _issuedBy; + } + + /** + * Gets the name of the host for which the token was issued + */ + public String getIssuedFor() { + return _issuedFor; + } + + /** + * Gets the timestamp (in UNIX epoch format) for when the token was issued + */ + public long getTimestamp() { + return _timestamp; + } + + /** + * Sets the name of the leader host that issued the token + */ + public void setIssuedBy(String issuedBy) { + _issuedBy = issuedBy; + } + + /** + * Sets the name of the host for which the token was issued + */ + public void setIssuedFor(String issuedFor) { + _issuedFor = issuedFor; + } + + /** + * Sets the timestamp (in UNIX epoch format) for when the token was issued + */ + public void setTimestamp(long timestamp) { + _timestamp = timestamp; + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java index 3477e0479..8958de117 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/Coordinator.java @@ -1219,7 +1219,11 @@ private void handleLeaderDoAssignment(boolean cleanUpOrphanNodes) { // assignment and do remove and add zNodes accordingly. In the case of ZooKeeper failure (when // it failed to create or delete zNodes), we will do our best to continue the current process // and schedule a retry. The retry should be able to diff the remaining ZooKeeper work - _adapter.updateAllAssignments(newAssignmentsByInstance); + if (_config.getEnableAssignmentTokens()) { + _adapter.updateAllAssignmentsAndIssueTokens(newAssignmentsByInstance, stoppingDatastreamGroups); + } else { + _adapter.updateAllAssignments(newAssignmentsByInstance); + } for (DatastreamGroup datastreamGroup : stoppingDatastreamGroups) { for (Datastream datastream : datastreamGroup.getDatastreams()) { diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java index d1c25b520..c0cc3f3d9 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/CoordinatorConfig.java @@ -31,6 +31,7 @@ public final class CoordinatorConfig { public static final String CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP = PREFIX + "performPreAssignmentCleanup"; public static final String CONFIG_REINIT_ON_NEW_ZK_SESSION = PREFIX + "reinitOnNewZKSession"; public static final String CONFIG_MAX_ASSIGNMENT_RETRY_COUNT = PREFIX + "maxAssignmentRetryCount"; + public static final String CONFIG_ENABLE_ASSIGNMENT_TOKENS = PREFIX + "enableAssignmentTokens"; public static final int DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT = 100; @@ -50,6 +51,7 @@ public final class CoordinatorConfig { private final boolean _performPreAssignmentCleanup; private final boolean _reinitOnNewZkSession; private final int _maxAssignmentRetryCount; + private final boolean _enableAssignmentTokens; /** * Construct an instance of CoordinatorConfig @@ -75,6 +77,7 @@ public CoordinatorConfig(Properties config) { _performPreAssignmentCleanup = _properties.getBoolean(CONFIG_PERFORM_PRE_ASSIGNMENT_CLEANUP, false); _reinitOnNewZkSession = _properties.getBoolean(CONFIG_REINIT_ON_NEW_ZK_SESSION, false); _maxAssignmentRetryCount = _properties.getInt(CONFIG_MAX_ASSIGNMENT_RETRY_COUNT, DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT); + _enableAssignmentTokens = _properties.getBoolean(CONFIG_ENABLE_ASSIGNMENT_TOKENS, false); } public Properties getConfigProperties() { @@ -136,4 +139,8 @@ public boolean getReinitOnNewZkSession() { public int getMaxAssignmentRetryCount() { return _maxAssignmentRetryCount; } + + public boolean getEnableAssignmentTokens() { + return _enableAssignmentTokens; + } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/KeyBuilder.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/KeyBuilder.java index 328ff9370..8c79c6a7f 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/KeyBuilder.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/KeyBuilder.java @@ -22,6 +22,8 @@ public final class KeyBuilder { private static final String INSTANCE_ASSIGNMENT = "/%s/instances/%s/assignments/%s"; private static final String DATASTREAMS = "/%s/dms"; private static final String DATASTREAM = "/%s/dms/%s"; + private static final String DATASTREAM_ASSIGNMENT_TOKENS = "/%s/dms/%s/assignmentTokens"; + private static final String DATASTREAM_ASSIGNMENT_TOKEN_FOR_INSTANCE = "/%s/dms/%s/assignmentTokens/%s"; private static final String CONNECTORS = "/%s/connectors"; private static final String CONNECTOR = "/%s/connectors/%s"; @@ -200,6 +202,25 @@ public static String datastream(String cluster, String stream) { return String.format(DATASTREAM, cluster, stream); } + /** + * Get the ZooKeeper znode for a datastream's assignment tokens in a Brooklin cluster + * @param cluster Brooklin cluster name + * @param stream Datastream name + */ + public static String datastreamAssignmentTokens(String cluster, String stream) { + return String.format(DATASTREAM_ASSIGNMENT_TOKENS, cluster, stream); + } + + /** + * Get the ZooKeeper znode for a datastream's assignment token for an instance in a Brooklin cluster + * @param cluster Brooklin cluster name + * @param stream Datastream name + * @param instance Instance name + */ + public static String datastreamAssignmentTokenForInstance(String cluster, String stream, String instance) { + return String.format(DATASTREAM_ASSIGNMENT_TOKEN_FOR_INSTANCE, cluster, stream, instance); + } + /** * Get the ZooKeeeper znode for numTasks of a specific datastream in a Brooklin cluster * This node will only be present for datastreams that use elastic task assignment diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java index 13d7a2f4b..455c5e71b 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/zk/ZkAdapter.java @@ -29,6 +29,7 @@ import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.Validate; import org.apache.helix.zookeeper.zkclient.IZkChildListener; import org.apache.helix.zookeeper.zkclient.IZkDataListener; import org.apache.helix.zookeeper.zkclient.IZkStateListener; @@ -47,6 +48,8 @@ import com.linkedin.datastream.common.DatastreamUtils; import com.linkedin.datastream.common.ErrorLogger; import com.linkedin.datastream.common.zk.ZkClient; +import com.linkedin.datastream.server.AssignmentToken; +import com.linkedin.datastream.server.DatastreamGroup; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.DatastreamTaskImpl; import com.linkedin.datastream.server.HostTargetAssignment; @@ -728,6 +731,93 @@ private void removeTaskNode(String instance, String name) { _zkclient.deleteRecursive(instancePath); } + /** + * Update the task assignment for the given instances and issue assignment tokens for stopping tasks. + * Assignment tokens are used by the follower hosts to signal the leader that they finished handling their assignments. + * + * In addition to the work done by updateAllAssignments, this method writes to + *
    + * *
  1. {@code //dms//assignmentTokens/,...}
  2. + *
+ */ + public void updateAllAssignmentsAndIssueTokens(Map> assignmentsByInstance, + List stoppingDatastreamGroups) { + Validate.notNull(assignmentsByInstance, "null assignmentsByInstance"); + Validate.notNull(stoppingDatastreamGroups, "null stoppingDatastreamGroups"); + + Map> stoppingDgInstances = + getStoppingDatastreamGroupInstances(stoppingDatastreamGroups); + issueAssignmentTokensForStoppingDatastreams(stoppingDatastreamGroups, stoppingDgInstances); + + updateAllAssignments(assignmentsByInstance); + } + + /** + * For each stopping datastream group, finds all the instances that currently have tasks assigned + */ + private Map> getStoppingDatastreamGroupInstances( + List stoppingDatastreamGroups) { + Map> currentAssignment = getAllAssignedDatastreamTasks(); + Set stoppingDatastreamTaskPrefixes = stoppingDatastreamGroups.stream(). + map(DatastreamGroup::getTaskPrefix).collect(toSet()); + Map taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream(). + collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity())); + + Map> stoppingDgInstances = new HashMap<>(); + currentAssignment.keySet() + .forEach(i -> currentAssignment.get(i).stream() + .filter(t -> stoppingDatastreamTaskPrefixes.contains(t.getTaskPrefix())) + .forEach(st -> { + DatastreamGroup datastreamGroup = taskPrefixDatastreamGroups.get(st.getTaskPrefix()); + stoppingDgInstances.computeIfAbsent(datastreamGroup, k -> new HashSet<>()).add(i); + })); + + return stoppingDgInstances; + } + + /** + * Issues assignment tokens for each instance and each stopping datastream + */ + private void issueAssignmentTokensForStoppingDatastreams(List stoppingDatastreamGroups, + Map> stoppingDgInstances) { + String hostname = getLocalHostName(); + for (DatastreamGroup stoppingGroup : stoppingDatastreamGroups) { + for (Datastream stoppingStream : stoppingGroup.getDatastreams()) { + String path = KeyBuilder.datastream(_cluster, stoppingStream.getName()); + + if (!_zkclient.exists(path)) { + LOG.warn("Trying to issue assignment tokens for non-existing stream: " + stoppingStream.getName()); + continue; + } + + String assignmentTokensPath = KeyBuilder.datastreamAssignmentTokens(_cluster, stoppingStream.getName()); + _zkclient.ensurePath(assignmentTokensPath); + + Set instances = stoppingDgInstances.get(stoppingGroup); + LOG.info("Issuing assignment tokens for stream {} and {} instance(s)", stoppingStream.getName(), instances.size()); + for (String instance : instances) { + String assignmentTokenPath = KeyBuilder.datastreamAssignmentTokenForInstance(_cluster, + stoppingStream.getName(), instance); + AssignmentToken token = new AssignmentToken(hostname, instance); + _zkclient.create(assignmentTokenPath, token.toJson(), CreateMode.PERSISTENT); + } + } + } + } + + /** + * Gets the name of the local host + */ + private String getLocalHostName() { + String hostname = "localhost"; + try { + hostname = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException ex) { + LOG.warn("Unable to obtain hostname for leader"); + } + return hostname; + } + /** * Update the task assignment of a given instance * diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java index 165d14c97..4bf203abf 100644 --- a/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/zk/TestZkAdapter.java @@ -6,6 +6,7 @@ package com.linkedin.datastream.server.zk; import java.io.IOException; +import java.net.InetAddress; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -30,6 +31,7 @@ import com.linkedin.datastream.common.Datastream; import com.linkedin.datastream.common.PollUtils; import com.linkedin.datastream.common.zk.ZkClient; +import com.linkedin.datastream.server.AssignmentToken; import com.linkedin.datastream.server.DatastreamGroup; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.DatastreamTaskImpl; @@ -1059,6 +1061,59 @@ public void testUpdateAssignmentDeleteUnusedTasks() { adapter.disconnect(); } + @Test + public void testUpdateAllAssignmentAndIssueTokens() throws Exception { + String testCluster = "testUpdateAllAssignmentAndIssueTokens"; + String connectorType = "connectorType"; + ZkClient zkClient = new ZkClient(_zkConnectionString); + ZkAdapter adapter = createZkAdapter(testCluster); + adapter.connect(); + + // assigning 2 tasks to the cluster + Datastream[] datastreams1 = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, + "datastream1"); + DatastreamGroup datastreamGroup1 = new DatastreamGroup(Arrays.asList(datastreams1)); + + Datastream[] datastreams2 = DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, + "datastream2"); + DatastreamGroup datastreamGroup2 = new DatastreamGroup(Arrays.asList(datastreams2)); + + DatastreamTaskImpl task1 = new DatastreamTaskImpl(); + task1.setTaskPrefix(datastreamGroup1.getTaskPrefix()); + task1.setConnectorType(connectorType); + + DatastreamTaskImpl task2 = new DatastreamTaskImpl(); + task2.setTaskPrefix(datastreamGroup2.getTaskPrefix()); + task2.setConnectorType(connectorType); + + Map> oldAassignment = new HashMap<>(); + oldAassignment.put("instance1", Collections.singletonList(task1)); + oldAassignment.put("instance2", Collections.singletonList(task2)); + + adapter.updateAllAssignments(oldAassignment); + + // simulating a stopping datastream which has a task assigned to instance2 + List stoppingDatastreamGroups = Collections.singletonList(datastreamGroup2); + Map> newAssignment = new HashMap<>(); + newAssignment.put("instance1", Collections.singletonList(task1)); + adapter.updateAllAssignmentsAndIssueTokens(newAssignment, stoppingDatastreamGroups); + + // asserting that: + // (1) the assignment tokens path was created for stopping stream + // (2) a token has ben issued for instance2, and only for instance2 + // (3) the token data is correct + String assignmentTokensPath = KeyBuilder.datastreamAssignmentTokens(testCluster, datastreamGroup2.getTaskPrefix()); + Assert.assertTrue(zkClient.exists(assignmentTokensPath)); + List tokenNodes = zkClient.getChildren(assignmentTokensPath); + Assert.assertEquals(tokenNodes.size(), 1); + String tokenData = zkClient.readData( + KeyBuilder.datastreamAssignmentTokenForInstance(testCluster, datastreamGroup2.getTaskPrefix(), tokenNodes.get(0))); + AssignmentToken token = AssignmentToken.fromJson(tokenData); + Assert.assertEquals(token.getIssuedFor(), "instance2"); + String hostname = InetAddress.getLocalHost().getHostName(); + Assert.assertEquals(token.getIssuedBy(), hostname); + } + @Test public void testSaveStateOnlyWhenTaskExists() { String testCluster = "testSaveStateOnlyWhenTaskExists";