Skip to content

Commit

Permalink
LoadBasedPartitionAssignmentStrategy - base framework (#823)
Browse files Browse the repository at this point in the history
LoadBasedPartitionAssignmentStrategy - base framework
  • Loading branch information
jzakaryan authored May 14, 2021
1 parent f1111e5 commit 591e62b
Show file tree
Hide file tree
Showing 17 changed files with 1,044 additions and 59 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

import java.util.Map;


/**
* A structure that holds per-partition throughput information for a Kafka cluster
*/
public class ClusterThroughputInfo {

private final String _clusterName;
private final Map<String, PartitionThroughputInfo> _partitionInfoMap;

/**
* Creates an instance of {@link ClusterThroughputInfo}
* @param clusterName Cluster name
* @param partitionInfoMap A map, where the key is the partition name, and the value is the
* {@link PartitionThroughputInfo} for the partition.
*/
public ClusterThroughputInfo(String clusterName, Map<String, PartitionThroughputInfo> partitionInfoMap) {
_clusterName = clusterName;
_partitionInfoMap = partitionInfoMap;
}

/**
* Gets the cluster name
* @return Name of the cluster
*/
public String getClusterName() {
return _clusterName;
}

/**
* Gets the partition information map for partitions in the cluster
* @return A map, where the key is the partition name, and the value is a {@link PartitionThroughputInfo} for the
* partition
*/
public Map<String, PartitionThroughputInfo> getPartitionInfoMap() {
return _partitionInfoMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

/**
* An interface that resolves the source Kafka cluster from the given {@link DatastreamGroup} instance
*/
public interface DatastreamSourceClusterResolver {

/**
* Given a datastream group, gets the name of the source cluster
* @param datastreamGroup Datastream group
* @return The name of the source cluster
*/
String getSourceCluster(DatastreamGroup datastreamGroup);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;


/**
* A structure for partition throughput information.
*/
public class PartitionThroughputInfo {
private final int _bytesInKBRate;
private final int _messagesInRate;
private final String _partitionName;

/**
* Creates an instance of {@link PartitionThroughputInfo}
* @param bytesInKBRate Bytes in rate for the partition
* @param messagesInRate Messages in rate for the partition
*/
public PartitionThroughputInfo(int bytesInKBRate, int messagesInRate, String partitionName) {
_bytesInKBRate = bytesInKBRate;
_messagesInRate = messagesInRate;
_partitionName = partitionName;
}

/**
* Gets the bytes in rate (in KB)
* @return Bytes in rate (in KB)
*/
public int getBytesInKBRate() {
return _bytesInKBRate;
}

/**
* Gets the messages in rate
* @return Messages in rate
*/
public int getMessagesInRate() {
return _messagesInRate;
}

/**
* Gets the partition name
* @return Partition name
*/
public String getPartitionName() {
return _partitionName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server.providers;

import java.util.Map;

import com.linkedin.datastream.server.ClusterThroughputInfo;


/**
* Abstraction that provides topic partition throughput information. Used by load-based assignment strategies to do
* partition assignment based on throughput
*/
public interface PartitionThroughputProvider {

/**
* Retrieves per-partition throughput information for the given cluster
* @param clusterName Name of the cluster
* @return Throughput information for the requested cluster
*/
ClusterThroughputInfo getThroughputInfo(String clusterName);

/**
* Retrieves per-partition throughput information for all clusters
* @return A map, where keys are cluster names and values are throughput information for the cluster
*/
Map<String, ClusterThroughputInfo> getThroughputInfo();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server;

/**
* A dummy implementation for {@link DatastreamSourceClusterResolver}
*/
public class DummyDatastreamSourceClusterResolver implements DatastreamSourceClusterResolver {
private static final String DUMMY_CLUSTER_NAME = "dummy";

@Override
public String getSourceCluster(DatastreamGroup datastreamGroup) {
return DUMMY_CLUSTER_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server.assignment;

import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.NotImplementedException;

import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamTask;


/**
* Performs partition assignment based on partition throughput information
*/
public class LoadBasedPartitionAssigner {
/**
* Get partition assignment
* @param throughputInfo Per-partition throughput information
* @param assignedPartitions List of assigned partitions
* @param unassignedPartitions List of unassigned partitions
* @param taskCount Task count
* @return Partition assignment
*/
public Map<String, Set<DatastreamTask>> assignPartitions(ClusterThroughputInfo throughputInfo,
List<String> assignedPartitions, List<String> unassignedPartitions, int taskCount) {
throw new NotImplementedException();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server.assignment;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.datastream.common.PollUtils;
import com.linkedin.datastream.common.RetriesExhaustedException;
import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.ClusterThroughputInfo;
import com.linkedin.datastream.server.DatastreamGroup;
import com.linkedin.datastream.server.DatastreamGroupPartitionsMetadata;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DatastreamTask;
import com.linkedin.datastream.server.Pair;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;


/**
* Partition assignment strategy that does assignment based on throughput information supplied by a
* {@link PartitionThroughputProvider}
*/
public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignmentStrategy {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategy.class.getName());

// TODO Make these constants configurable
private static final long THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = Duration.ofSeconds(10).toMillis();
private static final long THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = Duration.ofSeconds(1).toMillis();

private final PartitionThroughputProvider _throughputProvider;
private final DatastreamSourceClusterResolver _sourceClusterResolver;


/**
* Creates an instance of {@link LoadBasedPartitionAssignmentStrategy}
*/
public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider,
DatastreamSourceClusterResolver sourceClusterResolver, Optional<Integer> maxTasks,
Optional<Integer> imbalanceThreshold, Optional<Integer> maxPartitionPerTask, boolean enableElasticTaskAssignment,
Optional<Integer> partitionsPerTask, Optional<Integer> partitionFullnessFactorPct, Optional<ZkClient> zkClient,
String clusterName) {
super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask,
partitionFullnessFactorPct, zkClient, clusterName);
_throughputProvider = throughputProvider;
_sourceClusterResolver = sourceClusterResolver;
}

@Override
public Map<String, Set<DatastreamTask>> assignPartitions(Map<String, Set<DatastreamTask>> currentAssignment,
DatastreamGroupPartitionsMetadata datastreamPartitions) {
DatastreamGroup datastreamGroup = datastreamPartitions.getDatastreamGroup();
String datastreamGroupName = datastreamGroup.getName();
Pair<List<String>, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup(
currentAssignment, datastreamGroupName);
List<String> assignedPartitions = assignedPartitionsAndTaskCount.getKey();

// Do throughput based assignment only initially, when no partitions have been assigned yet
if (!assignedPartitions.isEmpty()) {
return super.assignPartitions(currentAssignment, datastreamPartitions);
}

Map<String, ClusterThroughputInfo> partitionThroughputInfo;
// Attempting to retrieve partition throughput info with a fallback mechanism to StickyPartitionAssignmentStrategy
try {
partitionThroughputInfo = fetchPartitionThroughputInfo();
} catch (RetriesExhaustedException ex) {
LOG.warn("Attempts to fetch partition throughput timed out. Falling back to regular partition assignment strategy");
return super.assignPartitions(currentAssignment, datastreamPartitions);
}

LOG.info("Old partition assignment info, assignment: {}", currentAssignment);
Validate.isTrue(currentAssignment.size() > 0,
"Zero tasks assigned. Retry leader partition assignment.");

// TODO Get task count estimate and perform elastic task count validation
// TODO Get task count estimate based on throughput and pick a winner
int maxTaskCount = 0;

// TODO Get unassigned partitions
// Calculating unassigned partitions
List<String> unassignedPartitions = new ArrayList<>();

// Resolving cluster name from datastream group
String clusterName = _sourceClusterResolver.getSourceCluster(datastreamPartitions.getDatastreamGroup());
ClusterThroughputInfo clusterThroughputInfo = partitionThroughputInfo.get(clusterName);

// Doing assignment
LoadBasedPartitionAssigner partitionAssigner = new LoadBasedPartitionAssigner();
return partitionAssigner.assignPartitions(clusterThroughputInfo, assignedPartitions, unassignedPartitions,
maxTaskCount);
}

private Map<String, ClusterThroughputInfo> fetchPartitionThroughputInfo() {
return PollUtils.poll(() -> {
try {
return _throughputProvider.getThroughputInfo();
} catch (Exception ex) {
// TODO print exception and retry count
LOG.warn("Failed to fetch partition throughput info.");
return null;
}
}, Objects::nonNull, THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT, THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT)
.orElseThrow(RetriesExhaustedException::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright 2021 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.server.assignment;

import java.util.Optional;
import java.util.Properties;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.linkedin.datastream.common.zk.ZkClient;
import com.linkedin.datastream.server.DatastreamSourceClusterResolver;
import com.linkedin.datastream.server.DummyDatastreamSourceClusterResolver;
import com.linkedin.datastream.server.api.strategy.AssignmentStrategy;
import com.linkedin.datastream.server.providers.NoOpPartitionThroughputProvider;
import com.linkedin.datastream.server.providers.PartitionThroughputProvider;


/**
* A factory for creating {@link LoadBasedPartitionAssignmentStrategy} instances
*/
public class LoadBasedPartitionAssignmentStrategyFactory extends StickyPartitionAssignmentStrategyFactory {
private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategyFactory.class.getName());

@Override
public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) {
_config = new PartitionAssignmentStrategyConfig(assignmentStrategyProperties);

boolean enableElasticTaskAssignment = _config.isElasticTaskAssignmentEnabled();
// Create the zookeeper client
Optional<ZkClient> zkClient = Optional.empty();
try {
zkClient = constructZooKeeperClient();
} catch (IllegalStateException ex) {
LOG.warn("Disabling elastic task assignment as zkClient initialization failed", ex);
enableElasticTaskAssignment = false;
}

PartitionThroughputProvider provider = constructPartitionThroughputProvider();
DatastreamSourceClusterResolver clusterResolver = constructDatastreamSourceClusterResolver();

return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(),
_config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment,
_config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), zkClient, _config.getCluster());
}

protected PartitionThroughputProvider constructPartitionThroughputProvider() {
return new NoOpPartitionThroughputProvider();
}

protected DatastreamSourceClusterResolver constructDatastreamSourceClusterResolver() {
return new DummyDatastreamSourceClusterResolver();
}
}
Loading

0 comments on commit 591e62b

Please sign in to comment.