Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replacing Coordinator Queue With Deque & Fixing Usage Of toMap Util #950

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright 2023 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

import java.util.Properties;

/**
* Callable Coordinator is used for overriding coordinator behaviors for tests
*/
public interface CallableCoordinatorForTest {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this interface? Seems like TestCoordinator.java has all you need.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didnt wanted to add another constructor in the TestCoordinator.java to override another method of coordinator. With this interface, we could minimize code duplication and pass the overrides of coordinator as an argument.

For the test "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath", I overrode performPreAssignmentCleanup method to test a failure path, where I am using this.

/**
* invoking constructor of coordinator with params,
* - datastreamCache to maintain all the datastreams in the cluster.
* - properties to use while creating coordinator.
* */
Coordinator invoke(CachedDatastreamReader cachedDatastreamReader, Properties properties);
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -21,6 +23,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -33,6 +36,7 @@
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.commons.lang3.Validate;
import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -136,7 +140,12 @@ private Coordinator createCoordinator(String zkAddr, String cluster) throws Exce
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override) throws Exception {
return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory());
return createCoordinator(zkAddr, cluster, override, new DummyTransportProviderAdminFactory(), Coordinator::new);
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
TransportProviderAdminFactory transportProviderAdminFactory) throws Exception {
return createCoordinator(zkAddr, cluster, override, transportProviderAdminFactory, Coordinator::new);
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
Expand All @@ -163,7 +172,7 @@ protected synchronized void handleEvent(CoordinatorEvent event) {
}

private Coordinator createCoordinator(String zkAddr, String cluster, Properties override,
TransportProviderAdminFactory transportProviderAdminFactory) throws Exception {
TransportProviderAdminFactory transportProviderAdminFactory, CallableCoordinatorForTest callableCoordinatorForTest) throws Exception {
Properties props = new Properties();
props.put(CoordinatorConfig.CONFIG_CLUSTER, cluster);
props.put(CoordinatorConfig.CONFIG_ZK_ADDRESS, zkAddr);
Expand All @@ -172,7 +181,7 @@ private Coordinator createCoordinator(String zkAddr, String cluster, Properties
props.putAll(override);
ZkClient client = new ZkClient(zkAddr);
_cachedDatastreamReader = new CachedDatastreamReader(client, cluster);
Coordinator coordinator = new Coordinator(_cachedDatastreamReader, props);
Coordinator coordinator = callableCoordinatorForTest.invoke(_cachedDatastreamReader, props);
coordinator.addTransportProvider(DummyTransportProviderAdminFactory.PROVIDER_NAME,
transportProviderAdminFactory.createTransportProviderAdmin(DummyTransportProviderAdminFactory.PROVIDER_NAME,
new Properties()));
Expand Down Expand Up @@ -3945,6 +3954,127 @@ public void testThroughputViolatingTopicsHandlingForSingleDatastreamOnCreateWith
coordinator.getDatastreamCache().getZkclient().close();
}

@Test
public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePath() throws Exception {
String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath";
String connectorType = "connectorType";
String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePath";

Queue<CoordinatorEvent> shadowCoordinatorQueue = new ArrayDeque<>();
Properties properties = new Properties();
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {

// This override generates an exception while the newly elected leader performs pre assignment cleanup.
// The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event
// in the queue for a reattempt.
@Override
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
throw new RuntimeException("testing exception path in assignment cleanup routine");
}

// This override collects the coordinator queue events in a shadow queue for test purposes.
@Override
protected synchronized void handleEvent(CoordinatorEvent event) {
shadowCoordinatorQueue.add(event);
super.handleEvent(event);
}
});
TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType);
coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);

Datastream testDatastream =
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0];

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();

// This is the event which should be added to the front of the queue once the handler exits on an exception.
CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader =
new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true);

// while-ing until the newly elected leader performs the handlerLeaderDoAssignment request for the first time.
while (!Objects.equals(shadowCoordinatorQueue.peek(), leaderDoAssignmentForNewlyElectedLeader)) {
shadowCoordinatorQueue.poll();
}

// Take out the initial leaderDoAssignmentForNewlyElectedLeader
shadowCoordinatorQueue.poll();

// As we expect the reattempt event to be added to the front, the front of the queue should now be the same.
Assert.assertEquals(shadowCoordinatorQueue.poll(), leaderDoAssignmentForNewlyElectedLeader);
}

@Test
public void testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation() throws Exception {
String testCluster = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation";
String connectorType = "connectorType";
String streamName = "testLeaderDoAssignmentForNewlyElectedLeaderFailurePathVariation";

// This is the event which should be added to the front of the queue once the handler exits on an exception.
CoordinatorEvent leaderDoAssignmentForNewlyElectedLeader =
new CoordinatorEvent(CoordinatorEvent.EventType.LEADER_DO_ASSIGNMENT, true);

List<Map.Entry<CoordinatorEvent, CoordinatorEvent>>
shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent = new ArrayList<>();

Properties properties = new Properties();
Coordinator coordinator =
createCoordinator(_zkConnectionString, testCluster, properties, new DummyTransportProviderAdminFactory(),
(cachedDatastreamReader, props) -> new Coordinator(cachedDatastreamReader, props) {

// This override generates an exception while the newly elected leader performs pre assignment cleanup.
// The exception causes the handleLeaderDoAssignment handler to exit, along with inserting the same event
// in the queue for a reattempt.
@Override
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
throw new RuntimeException("testing exception path in assignment cleanup routine");
}

// This override collects the coordinator queue events in a shadow queue for test purposes.
@Override
protected synchronized void handleEvent(CoordinatorEvent event) {
CoordinatorEvent previousHead = peekCoordinatorEventBlockingQueue();
super.handleEvent(event);
PollUtils.poll(() -> peekCoordinatorEventBlockingQueue() != null, 50, 1000);
CoordinatorEvent nextHead = peekCoordinatorEventBlockingQueue();

// recording previous and new heads of the CoordinatorEventBlockingQueue
if (event.equals(leaderDoAssignmentForNewlyElectedLeader)) {
shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.add(
new AbstractMap.SimpleEntry<>(previousHead, nextHead));
}
}
});
TestHookConnector dummyConnector = new TestHookConnector("dummyConnector", connectorType);
coordinator.addConnector(connectorType, dummyConnector, new BroadcastStrategy(Optional.empty()), false,
new SourceBasedDeduper(), null);
coordinator.start();

ZkClient zkClient = new ZkClient(_zkConnectionString);

Datastream testDatastream =
DatastreamTestUtils.createAndStoreDatastreams(zkClient, testCluster, connectorType, streamName)[0];

coordinator.stop();
zkClient.close();
coordinator.getDatastreamCache().getZkclient().close();

// Comparing the previous and new head values when the NewLeaderDoAssignmentEvent fails.
IntStream.range(0, 3).forEach(index -> {
Assert.assertNotEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getKey(),
leaderDoAssignmentForNewlyElectedLeader);
Assert.assertEquals(shadowListWithPreviousAndNewHeadPairsAtNewLeaderDoAssignmentEvent.get(index).getValue(),
leaderDoAssignmentForNewlyElectedLeader);
});
}

// This helper function helps compare the requesting topics with the topics reflected in the server.
private BooleanSupplier validateIfViolatingTopicsAreReflectedInServer(Datastream testStream, Coordinator coordinator,
Set<String> requestedThroughputViolatingTopics) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,8 @@ private void handleAssignmentChange(boolean isDatastreamUpdate) throws TimeoutEx
_assignedDatastreamTasks.putAll(currentAssignment.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity())));
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(),
(existingTask, duplicateTask) -> existingTask)));
List<DatastreamTask> newAssignment = new ArrayList<>(_assignedDatastreamTasks.values());

if ((totalTasks - submittedTasks) > 0) {
Expand Down Expand Up @@ -1524,10 +1525,11 @@ private void scheduleLeaderDoAssignmentRetry(boolean isNewlyElectedLeader) {
_log.info("Schedule retry for leader assigning tasks");
_metrics.updateKeyedMeter(CoordinatorMetrics.KeyedMeter.HANDLE_LEADER_DO_ASSIGNMENT_NUM_RETRIES, 1);
_leaderDoAssignmentScheduled.set(true);
// scheduling LEADER_DO_ASSIGNMENT event instantly to prevent any other event being handled before the reattempt.
_leaderDoAssignmentScheduledFuture = _scheduledExecutor.schedule(() -> {
_eventQueue.put(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader));
_eventQueue.putFirst(CoordinatorEvent.createLeaderDoAssignmentEvent(isNewlyElectedLeader));
_leaderDoAssignmentScheduled.set(false);
}, _config.getRetryIntervalMs(), TimeUnit.MILLISECONDS);
}, 0, TimeUnit.MILLISECONDS);
}

@VisibleForTesting
Expand Down Expand Up @@ -1614,7 +1616,7 @@ private void revokeUnclaimedAssignmentTokens(Map<String, List<AssignmentToken>>
}
}

private void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {
protected void performPreAssignmentCleanup(List<DatastreamGroup> datastreamGroups) {

// Map between instance to tasks assigned to the instance.
Map<String, Set<DatastreamTask>> previousAssignmentByInstance = _adapter.getAllAssignedDatastreamTasks();
Expand Down Expand Up @@ -2325,6 +2327,11 @@ CoordinatorConfig getConfig() {
return _config;
}

@VisibleForTesting
CoordinatorEvent peekCoordinatorEventBlockingQueue() {
return _eventQueue.peek();
}

@VisibleForTesting
static String getNumThroughputViolatingTopicsMetricName() {
return CoordinatorMetrics.NUM_THROUGHPUT_VIOLATING_TOPICS_PER_DATASTREAM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private CoordinatorEvent(EventType eventType) {
_eventMetadata = null;
}

private CoordinatorEvent(EventType eventType, Object eventMetadata) {
protected CoordinatorEvent(EventType eventType, Object eventMetadata) {
_eventType = eventType;
_eventMetadata = eventMetadata;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -44,7 +44,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware {
static final String GAUGE_KEY = "queuedEvents";

private final Set<CoordinatorEvent> _eventSet;
private final Queue<CoordinatorEvent> _eventQueue;
private final Deque<CoordinatorEvent> _eventQueue;
private final DynamicMetricsManager _dynamicMetricsManager;
private final Gauge<Integer> _gauge;
private final Counter _counter;
Expand All @@ -59,7 +59,7 @@ class CoordinatorEventBlockingQueue implements MetricsAware {
*/
CoordinatorEventBlockingQueue(String key) {
_eventSet = new HashSet<>();
_eventQueue = new LinkedBlockingQueue<>();
_eventQueue = new LinkedBlockingDeque<>();
_dynamicMetricsManager = DynamicMetricsManager.getInstance();

String prefix = buildMetricName(key);
Expand All @@ -73,16 +73,47 @@ class CoordinatorEventBlockingQueue implements MetricsAware {


/**
* Add a single event to the queue, overwriting events with the same name and same metadata.
* Add a single event to the queue. Defaults to adding the event at the end of the queue.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void put(CoordinatorEvent event) {
LOG.info("Queuing event {} to event queue", event.getType());
put(event, true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When do we need to support for inserting at rear?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the events to this coordinator queue are inserted in the rear. The only case in which we have to insert in the front is what the PR proposes.

}

/**
* Add a single event to the queue. Adds the event to the front of the queue.
* @param event CoordinatorEvent event to add to the queue
*/
public synchronized void putFirst(CoordinatorEvent event) {
// If the requested event is already in the CoordinatorEventBlockingQueue, it will be removed to prioritize the
// event to be putFirst.
if (_eventSet.contains(event)) {
LOG.info("Prioritizing the event to be putFirst by removing the existing CoordinatorEvent " + event);
// Since the distinct content of the CoordinatorEventBlockingQueue is not anticipated to be extensive, the
// linear complexity removal operation deemed acceptable.
_eventQueue.remove(event);
_eventSet.remove(event);
}
put(event, false);
}

/**
* Add a single event to the queue, de-duping events with the same name and same metadata.
* @param event CoordinatorEvent event to add to the queue
* @param insertInTheEnd if true, indicates to add the event to the end of the queue and front, otherwise.
*/
private synchronized void put(CoordinatorEvent event, boolean insertInTheEnd) {
LOG.info("Queuing event {} at the " + (insertInTheEnd ? "end" : "front") + " of the event queue", event.getType());
if (_eventSet.contains(event)) {
_counter.inc(); // count duplicate event
} else {
// only insert if there isn't an event present in the queue with the same name and same metadata.
boolean result = _eventQueue.offer(event);
boolean result;
if (insertInTheEnd) {
result = _eventQueue.offer(event);
} else {
result = _eventQueue.offerFirst(event);
}
if (!result) {
return;
}
ehoner marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ public Map<String, List<DatastreamTask>> getTasksToCleanUp(List<DatastreamGroup>
Map<String, DatastreamTask> assignmentsMap = currentAssignment.values()
.stream()
.flatMap(Collection::stream)
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity()));
.collect(Collectors.toMap(DatastreamTask::getDatastreamTaskName, Function.identity(),
(existingTask, duplicateTask) -> existingTask));
ehoner marked this conversation as resolved.
Show resolved Hide resolved

for (String instance : currentAssignment.keySet()) {
// find the dependency tasks which also exist in the assignmentsMap.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,10 +759,11 @@ public void updateAllAssignmentsAndIssueTokens(Map<String, List<DatastreamTask>>
private Map<DatastreamGroup, Set<String>> getStoppingDatastreamGroupInstances(
List<DatastreamGroup> stoppingDatastreamGroups) {
Map<String, Set<DatastreamTask>> currentAssignment = getAllAssignedDatastreamTasks();
Set<String> stoppingDatastreamTaskPrefixes = stoppingDatastreamGroups.stream().
map(DatastreamGroup::getTaskPrefix).collect(toSet());
Map<String, DatastreamGroup> taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream().
collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity()));
Set<String> stoppingDatastreamTaskPrefixes =
stoppingDatastreamGroups.stream().map(DatastreamGroup::getTaskPrefix).collect(toSet());
Map<String, DatastreamGroup> taskPrefixDatastreamGroups = stoppingDatastreamGroups.stream()
.collect(Collectors.toMap(DatastreamGroup::getTaskPrefix, Function.identity(),
(existingDatastreamGroup, duplicateDatastreamGroup) -> existingDatastreamGroup));

Map<DatastreamGroup, Set<String>> stoppingDgInstances = new HashMap<>();
currentAssignment.keySet()
Expand Down
Loading