diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/ClusterThroughputInfo.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/ClusterThroughputInfo.java new file mode 100644 index 000000000..e3ab2b965 --- /dev/null +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/ClusterThroughputInfo.java @@ -0,0 +1,46 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +import java.util.Map; + + +/** + * A structure that holds per-partition throughput information for a Kafka cluster + */ +public class ClusterThroughputInfo { + + private final String _clusterName; + private final Map _partitionInfoMap; + + /** + * Creates an instance of {@link ClusterThroughputInfo} + * @param clusterName Cluster name + * @param partitionInfoMap A map, where the key is the partition name, and the value is the + * {@link PartitionThroughputInfo} for the partition. + */ + public ClusterThroughputInfo(String clusterName, Map partitionInfoMap) { + _clusterName = clusterName; + _partitionInfoMap = partitionInfoMap; + } + + /** + * Gets the cluster name + * @return Name of the cluster + */ + public String getClusterName() { + return _clusterName; + } + + /** + * Gets the partition information map for partitions in the cluster + * @return A map, where the key is the partition name, and the value is a {@link PartitionThroughputInfo} for the + * partition + */ + public Map getPartitionInfoMap() { + return _partitionInfoMap; + } +} diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamSourceClusterResolver.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamSourceClusterResolver.java new file mode 100644 index 000000000..580764539 --- /dev/null +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/DatastreamSourceClusterResolver.java @@ -0,0 +1,19 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +/** + * An interface that resolves the source Kafka cluster from the given {@link DatastreamGroup} instance + */ +public interface DatastreamSourceClusterResolver { + + /** + * Given a datastream group, gets the name of the source cluster + * @param datastreamGroup Datastream group + * @return The name of the source cluster + */ + String getSourceCluster(DatastreamGroup datastreamGroup); +} diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/PartitionThroughputInfo.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/PartitionThroughputInfo.java new file mode 100644 index 000000000..570e779fb --- /dev/null +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/PartitionThroughputInfo.java @@ -0,0 +1,51 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + + +/** + * A structure for partition throughput information. + */ +public class PartitionThroughputInfo { + private final int _bytesInKBRate; + private final int _messagesInRate; + private final String _partitionName; + + /** + * Creates an instance of {@link PartitionThroughputInfo} + * @param bytesInKBRate Bytes in rate for the partition + * @param messagesInRate Messages in rate for the partition + */ + public PartitionThroughputInfo(int bytesInKBRate, int messagesInRate, String partitionName) { + _bytesInKBRate = bytesInKBRate; + _messagesInRate = messagesInRate; + _partitionName = partitionName; + } + + /** + * Gets the bytes in rate (in KB) + * @return Bytes in rate (in KB) + */ + public int getBytesInKBRate() { + return _bytesInKBRate; + } + + /** + * Gets the messages in rate + * @return Messages in rate + */ + public int getMessagesInRate() { + return _messagesInRate; + } + + /** + * Gets the partition name + * @return Partition name + */ + public String getPartitionName() { + return _partitionName; + } +} diff --git a/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/PartitionThroughputProvider.java b/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/PartitionThroughputProvider.java new file mode 100644 index 000000000..de0fce8ae --- /dev/null +++ b/datastream-server-api/src/main/java/com/linkedin/datastream/server/providers/PartitionThroughputProvider.java @@ -0,0 +1,31 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.providers; + +import java.util.Map; + +import com.linkedin.datastream.server.ClusterThroughputInfo; + + +/** + * Abstraction that provides topic partition throughput information. Used by load-based assignment strategies to do + * partition assignment based on throughput + */ +public interface PartitionThroughputProvider { + + /** + * Retrieves per-partition throughput information for the given cluster + * @param clusterName Name of the cluster + * @return Throughput information for the requested cluster + */ + ClusterThroughputInfo getThroughputInfo(String clusterName); + + /** + * Retrieves per-partition throughput information for all clusters + * @return A map, where keys are cluster names and values are throughput information for the cluster + */ + Map getThroughputInfo(); +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/DummyDatastreamSourceClusterResolver.java b/datastream-server/src/main/java/com/linkedin/datastream/server/DummyDatastreamSourceClusterResolver.java new file mode 100644 index 000000000..742d89e8b --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/DummyDatastreamSourceClusterResolver.java @@ -0,0 +1,18 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +/** + * A dummy implementation for {@link DatastreamSourceClusterResolver} + */ +public class DummyDatastreamSourceClusterResolver implements DatastreamSourceClusterResolver { + private static final String DUMMY_CLUSTER_NAME = "dummy"; + + @Override + public String getSourceCluster(DatastreamGroup datastreamGroup) { + return DUMMY_CLUSTER_NAME; + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java new file mode 100644 index 000000000..d91a4182f --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssigner.java @@ -0,0 +1,34 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.assignment; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.lang.NotImplementedException; + +import com.linkedin.datastream.server.ClusterThroughputInfo; +import com.linkedin.datastream.server.DatastreamTask; + + +/** + * Performs partition assignment based on partition throughput information + */ +public class LoadBasedPartitionAssigner { + /** + * Get partition assignment + * @param throughputInfo Per-partition throughput information + * @param assignedPartitions List of assigned partitions + * @param unassignedPartitions List of unassigned partitions + * @param taskCount Task count + * @return Partition assignment + */ + public Map> assignPartitions(ClusterThroughputInfo throughputInfo, + List assignedPartitions, List unassignedPartitions, int taskCount) { + throw new NotImplementedException(); + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java new file mode 100644 index 000000000..00b88b372 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategy.java @@ -0,0 +1,118 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.assignment; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import org.apache.commons.lang3.Validate; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.datastream.common.PollUtils; +import com.linkedin.datastream.common.RetriesExhaustedException; +import com.linkedin.datastream.common.zk.ZkClient; +import com.linkedin.datastream.server.ClusterThroughputInfo; +import com.linkedin.datastream.server.DatastreamGroup; +import com.linkedin.datastream.server.DatastreamGroupPartitionsMetadata; +import com.linkedin.datastream.server.DatastreamSourceClusterResolver; +import com.linkedin.datastream.server.DatastreamTask; +import com.linkedin.datastream.server.Pair; +import com.linkedin.datastream.server.providers.PartitionThroughputProvider; + + +/** + * Partition assignment strategy that does assignment based on throughput information supplied by a + * {@link PartitionThroughputProvider} + */ +public class LoadBasedPartitionAssignmentStrategy extends StickyPartitionAssignmentStrategy { + private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategy.class.getName()); + + // TODO Make these constants configurable + private static final long THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT = Duration.ofSeconds(10).toMillis(); + private static final long THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT = Duration.ofSeconds(1).toMillis(); + + private final PartitionThroughputProvider _throughputProvider; + private final DatastreamSourceClusterResolver _sourceClusterResolver; + + + /** + * Creates an instance of {@link LoadBasedPartitionAssignmentStrategy} + */ + public LoadBasedPartitionAssignmentStrategy(PartitionThroughputProvider throughputProvider, + DatastreamSourceClusterResolver sourceClusterResolver, Optional maxTasks, + Optional imbalanceThreshold, Optional maxPartitionPerTask, boolean enableElasticTaskAssignment, + Optional partitionsPerTask, Optional partitionFullnessFactorPct, Optional zkClient, + String clusterName) { + super(maxTasks, imbalanceThreshold, maxPartitionPerTask, enableElasticTaskAssignment, partitionsPerTask, + partitionFullnessFactorPct, zkClient, clusterName); + _throughputProvider = throughputProvider; + _sourceClusterResolver = sourceClusterResolver; + } + + @Override + public Map> assignPartitions(Map> currentAssignment, + DatastreamGroupPartitionsMetadata datastreamPartitions) { + DatastreamGroup datastreamGroup = datastreamPartitions.getDatastreamGroup(); + String datastreamGroupName = datastreamGroup.getName(); + Pair, Integer> assignedPartitionsAndTaskCount = getAssignedPartitionsAndTaskCountForDatastreamGroup( + currentAssignment, datastreamGroupName); + List assignedPartitions = assignedPartitionsAndTaskCount.getKey(); + + // Do throughput based assignment only initially, when no partitions have been assigned yet + if (!assignedPartitions.isEmpty()) { + return super.assignPartitions(currentAssignment, datastreamPartitions); + } + + Map partitionThroughputInfo; + // Attempting to retrieve partition throughput info with a fallback mechanism to StickyPartitionAssignmentStrategy + try { + partitionThroughputInfo = fetchPartitionThroughputInfo(); + } catch (RetriesExhaustedException ex) { + LOG.warn("Attempts to fetch partition throughput timed out. Falling back to regular partition assignment strategy"); + return super.assignPartitions(currentAssignment, datastreamPartitions); + } + + LOG.info("Old partition assignment info, assignment: {}", currentAssignment); + Validate.isTrue(currentAssignment.size() > 0, + "Zero tasks assigned. Retry leader partition assignment."); + + // TODO Get task count estimate and perform elastic task count validation + // TODO Get task count estimate based on throughput and pick a winner + int maxTaskCount = 0; + + // TODO Get unassigned partitions + // Calculating unassigned partitions + List unassignedPartitions = new ArrayList<>(); + + // Resolving cluster name from datastream group + String clusterName = _sourceClusterResolver.getSourceCluster(datastreamPartitions.getDatastreamGroup()); + ClusterThroughputInfo clusterThroughputInfo = partitionThroughputInfo.get(clusterName); + + // Doing assignment + LoadBasedPartitionAssigner partitionAssigner = new LoadBasedPartitionAssigner(); + return partitionAssigner.assignPartitions(clusterThroughputInfo, assignedPartitions, unassignedPartitions, + maxTaskCount); + } + + private Map fetchPartitionThroughputInfo() { + return PollUtils.poll(() -> { + try { + return _throughputProvider.getThroughputInfo(); + } catch (Exception ex) { + // TODO print exception and retry count + LOG.warn("Failed to fetch partition throughput info."); + return null; + } + }, Objects::nonNull, THROUGHPUT_INFO_FETCH_RETRY_PERIOD_MS_DEFAULT, THROUGHPUT_INFO_FETCH_TIMEOUT_MS_DEFAULT) + .orElseThrow(RetriesExhaustedException::new); + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java new file mode 100644 index 000000000..9c51eb6a6 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedPartitionAssignmentStrategyFactory.java @@ -0,0 +1,57 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.assignment; + +import java.util.Optional; +import java.util.Properties; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.linkedin.datastream.common.zk.ZkClient; +import com.linkedin.datastream.server.DatastreamSourceClusterResolver; +import com.linkedin.datastream.server.DummyDatastreamSourceClusterResolver; +import com.linkedin.datastream.server.api.strategy.AssignmentStrategy; +import com.linkedin.datastream.server.providers.NoOpPartitionThroughputProvider; +import com.linkedin.datastream.server.providers.PartitionThroughputProvider; + + +/** + * A factory for creating {@link LoadBasedPartitionAssignmentStrategy} instances + */ +public class LoadBasedPartitionAssignmentStrategyFactory extends StickyPartitionAssignmentStrategyFactory { + private static final Logger LOG = LoggerFactory.getLogger(LoadBasedPartitionAssignmentStrategyFactory.class.getName()); + + @Override + public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) { + _config = new PartitionAssignmentStrategyConfig(assignmentStrategyProperties); + + boolean enableElasticTaskAssignment = _config.isElasticTaskAssignmentEnabled(); + // Create the zookeeper client + Optional zkClient = Optional.empty(); + try { + zkClient = constructZooKeeperClient(); + } catch (IllegalStateException ex) { + LOG.warn("Disabling elastic task assignment as zkClient initialization failed", ex); + enableElasticTaskAssignment = false; + } + + PartitionThroughputProvider provider = constructPartitionThroughputProvider(); + DatastreamSourceClusterResolver clusterResolver = constructDatastreamSourceClusterResolver(); + + return new LoadBasedPartitionAssignmentStrategy(provider, clusterResolver, _config.getMaxTasks(), + _config.getImbalanceThreshold(), _config.getMaxPartitions(), enableElasticTaskAssignment, + _config.getPartitionsPerTask(), _config.getPartitionFullnessThresholdPct(), zkClient, _config.getCluster()); + } + + protected PartitionThroughputProvider constructPartitionThroughputProvider() { + return new NoOpPartitionThroughputProvider(); + } + + protected DatastreamSourceClusterResolver constructDatastreamSourceClusterResolver() { + return new DummyDatastreamSourceClusterResolver(); + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java new file mode 100644 index 000000000..4c17baa0a --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/LoadBasedTaskCountEstimator.java @@ -0,0 +1,32 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.assignment; + +import java.util.List; + +import org.apache.commons.lang.NotImplementedException; + +import com.linkedin.datastream.server.ClusterThroughputInfo; + + +/** + * Estimates the minimum number of tasks for a datastream based on per-partition throughput information + */ +public class LoadBasedTaskCountEstimator { + + /** + * Gets the estimated number of tasks based on per-partition throughput information. + * NOTE: This does not take into account numPartitionsPerTask configuration + * @param throughputInfo Per-partition throughput information + * @param assignedPartitions The list of assigned partitions + * @param unassignedPartitions The list of unassigned partitions + * @return The estimated number of tasks + */ + public int getTaskCount(ClusterThroughputInfo throughputInfo, List assignedPartitions, + List unassignedPartitions) { + throw new NotImplementedException(); + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java new file mode 100644 index 000000000..054b42896 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/PartitionAssignmentStrategyConfig.java @@ -0,0 +1,161 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.assignment; + +import java.util.Optional; +import java.util.Properties; + +import com.linkedin.datastream.common.VerifiableProperties; +import com.linkedin.datastream.common.zk.ZkClient; + +import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS; +import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.CFG_IMBALANCE_THRESHOLD; + + +/** + * Configuration properties for {@link StickyPartitionAssignmentStrategy} and its extensions + */ +public final class PartitionAssignmentStrategyConfig { + public static final String CFG_MAX_PARTITION_PER_TASK = "maxPartitionsPerTask"; + public static final String CFG_PARTITIONS_PER_TASK = "partitionsPerTask"; + public static final String CFG_PARTITION_FULLNESS_THRESHOLD_PCT = "partitionFullnessThresholdPct"; + public static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT = "enableElasticTaskAssignment"; + public static final String CFG_CLUSTER_NAME = "cluster"; + public static final String CFG_ZK_ADDRESS = "zkAddress"; + public static final String CFG_ZK_SESSION_TIMEOUT = "zkSessionTimeout"; + public static final String CFG_ZK_CONNECTION_TIMEOUT = "zkConnectionTimeout"; + + public static final boolean DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT = false; + + private final Properties _config; + private final Optional _maxTasks; + private final Optional _imbalanceThreshold; + private final Optional _maxPartitions; + private final Optional _partitionsPerTask; + private final Optional _partitionFullnessThresholdPct; + private final String _cluster; + private final String _zkAddress; + private final int _zkSessionTimeout; + private final int _zkConnectionTimeout; + private final boolean _enableElasticTaskAssignment; + + /** + * Creates an instance of {@link PartitionAssignmentStrategyConfig} + * @param config Config properties + */ + public PartitionAssignmentStrategyConfig(Properties config) { + _config = config; + VerifiableProperties props = new VerifiableProperties(config); + int cfgMaxTasks = props.getInt(CFG_MAX_TASKS, 0); + int cfgImbalanceThreshold = props.getInt(CFG_IMBALANCE_THRESHOLD, 0); + int cfgMaxParitionsPerTask = props.getInt(CFG_MAX_PARTITION_PER_TASK, 0); + int cfgPartitionsPerTask = props.getInt(CFG_PARTITIONS_PER_TASK, 0); + int cfgPartitionFullnessThresholdPct = props.getIntInRange(CFG_PARTITION_FULLNESS_THRESHOLD_PCT, 0, 0, 100); + + // Set to Optional.empty() if the value is 0 + _maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty(); + _imbalanceThreshold = cfgImbalanceThreshold > 0 ? Optional.of(cfgImbalanceThreshold) : Optional.empty(); + _maxPartitions = cfgMaxParitionsPerTask > 0 ? Optional.of(cfgMaxParitionsPerTask) : Optional.empty(); + _enableElasticTaskAssignment = props.getBoolean(CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT, + DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT); + _partitionsPerTask = cfgPartitionsPerTask > 0 ? Optional.of(cfgMaxParitionsPerTask) : + Optional.empty(); + _partitionFullnessThresholdPct = cfgPartitionFullnessThresholdPct > 0 ? + Optional.of(cfgPartitionFullnessThresholdPct) : Optional.empty(); + _cluster = props.getString(CFG_CLUSTER_NAME, null); + _zkAddress = props.getString(CFG_ZK_ADDRESS, null); + _zkSessionTimeout = props.getInt(CFG_ZK_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT); + _zkConnectionTimeout = props.getInt(CFG_ZK_CONNECTION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT); + } + + /** + * Gets max tasks + * @return Max tasks config value + */ + public Optional getMaxTasks() { + return _maxTasks; + } + + /** + * Gets imbalance threshold + * @return Imbalance threshold + */ + public Optional getImbalanceThreshold() { + return _imbalanceThreshold; + } + + /** + * Gets max partitions + * @return Max partitions + */ + public Optional getMaxPartitions() { + return _maxPartitions; + } + + /** + * Gets partitions per task + * @return Partitions per task + */ + public Optional getPartitionsPerTask() { + return _partitionsPerTask; + } + + /** + * Gets partition fullness threshold percentage + * @return Partition fullness threshold percentage + */ + public Optional getPartitionFullnessThresholdPct() { + return _partitionFullnessThresholdPct; + } + + /** + * Gets cluster + * @return Cluster + */ + public String getCluster() { + return _cluster; + } + + /** + * Indicates whether elastic task assignment is enabled or not + * @return A boolean value, that, if set to true, indicates that elastic task assignment is enabled + */ + public boolean isElasticTaskAssignmentEnabled() { + return _enableElasticTaskAssignment; + } + + /** + * Returns configuration properties + * @return Configuration properties + */ + public Properties getConfigProperties() { + return _config; + } + + /** + * Returns ZooKeeper address + * @return ZooKeeper address + */ + public String getZkAddress() { + return _zkAddress; + } + + /** + * Returns ZooKeeper session timeout + * @return ZooKeeper session timeout + */ + public int getZkSessionTimeout() { + return _zkSessionTimeout; + } + + /** + * Returns ZooKeeper connection timeout + * @return ZooKeeper connection timeout + */ + public int getZkConnectionTimeout() { + return _zkConnectionTimeout; + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java index 39d59061b..5ac6380ae 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategy.java @@ -37,11 +37,12 @@ import com.linkedin.datastream.server.DatastreamGroupPartitionsMetadata; import com.linkedin.datastream.server.DatastreamTask; import com.linkedin.datastream.server.DatastreamTaskImpl; +import com.linkedin.datastream.server.Pair; import com.linkedin.datastream.server.zk.KeyBuilder; import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS; -import static com.linkedin.datastream.server.assignment.StickyPartitionAssignmentStrategyFactory.CFG_PARTITIONS_PER_TASK; -import static com.linkedin.datastream.server.assignment.StickyPartitionAssignmentStrategyFactory.CFG_PARTITION_FULLNESS_THRESHOLD_PCT; +import static com.linkedin.datastream.server.assignment.PartitionAssignmentStrategyConfig.CFG_PARTITIONS_PER_TASK; +import static com.linkedin.datastream.server.assignment.PartitionAssignmentStrategyConfig.CFG_PARTITION_FULLNESS_THRESHOLD_PCT; /** @@ -171,6 +172,19 @@ public Map> assign(List datastreams return super.assign(datastreams, instances, currentAssignment); } + protected Pair, Integer> getAssignedPartitionsAndTaskCountForDatastreamGroup( + Map> currentAssignment, String datastreamGroupName) { + List assignedPartitions = new ArrayList<>(); + int taskCount = 0; + for (Set tasks : currentAssignment.values()) { + Set dgTask = tasks.stream().filter(t -> datastreamGroupName.equals(t.getTaskPrefix())) + .collect(Collectors.toSet()); + taskCount += dgTask.size(); + dgTask.forEach(t -> assignedPartitions.addAll(t.getPartitionsV2())); + } + return new Pair<>(assignedPartitions, taskCount); + } + /** * Assign partitions to a particular datastream group * @@ -187,20 +201,17 @@ public Map> assignPartitions(Map 0, "Zero tasks assigned. Retry leader partition assignment."); - String dgName = datastreamPartitions.getDatastreamGroup().getName(); + DatastreamGroup datastreamGroup = datastreamPartitions.getDatastreamGroup(); + String dgName = datastreamGroup.getName(); // Step 1: collect the # of tasks and figured out the unassigned partitions - List assignedPartitions = new ArrayList<>(); - int totalTaskCount = 0; - for (Set tasks : currentAssignment.values()) { - Set dgTask = tasks.stream().filter(t -> dgName.equals(t.getTaskPrefix())).collect(Collectors.toSet()); - dgTask.forEach(t -> assignedPartitions.addAll(t.getPartitionsV2())); - totalTaskCount += dgTask.size(); - } - + Pair, Integer> assignedPartitionsAndTaskCount = + getAssignedPartitionsAndTaskCountForDatastreamGroup(currentAssignment, dgName); + List assignedPartitions = assignedPartitionsAndTaskCount.getKey(); + int totalTaskCount = assignedPartitionsAndTaskCount.getValue(); Validate.isTrue(totalTaskCount > 0, String.format("No tasks found for datastream group %s", dgName)); - if (getEnableElasticTaskAssignment(datastreamPartitions.getDatastreamGroup())) { + if (getEnableElasticTaskAssignment(datastreamGroup)) { if (assignedPartitions.isEmpty()) { performElasticTaskCountValidation(datastreamPartitions, totalTaskCount); } @@ -536,7 +547,7 @@ protected int constructExpectedNumberOfTasks(DatastreamGroup dg, List in return expectedNumberOfTasks; } - private void updateOrRegisterElasticTaskAssignmentMetrics(DatastreamGroupPartitionsMetadata datastreamPartitions, + protected void updateOrRegisterElasticTaskAssignmentMetrics(DatastreamGroupPartitionsMetadata datastreamPartitions, int totalTaskCount) { int totalPartitions = datastreamPartitions.getPartitions().size(); int actualPartitionsPerTask = (totalPartitions / totalTaskCount) @@ -554,7 +565,7 @@ private void updateOrRegisterElasticTaskAssignmentMetrics(DatastreamGroupPartiti _elasticTaskAssignmentInfoHashMap.put(taskPrefix, elasticTaskAssignmentInfo); } - private void performElasticTaskCountValidation(DatastreamGroupPartitionsMetadata datastreamPartitions, + protected void performElasticTaskCountValidation(DatastreamGroupPartitionsMetadata datastreamPartitions, int totalTaskCount) { // The partitions have not been assigned to any tasks yet and elastic task assignment has been enabled for this // datastream. Assess the number of tasks needed based on partitionsPerTask and the fullness threshold. If @@ -590,7 +601,7 @@ private void performElasticTaskCountValidation(DatastreamGroupPartitionsMetadata LOG.info("Number of tasks needed: {}, total task count: {}", numTasksNeeded, totalTaskCount); } - private boolean getEnableElasticTaskAssignment(DatastreamGroup datastreamGroup) { + protected boolean getEnableElasticTaskAssignment(DatastreamGroup datastreamGroup) { // Enable elastic assignment only if the config enables it and the datastream metadata for minTasks is present // and is greater than 0 int minTasks = resolveConfigWithMetadata(datastreamGroup, CFG_MIN_TASKS, 0); diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategyFactory.java b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategyFactory.java index 6af9902d4..4933bcc0d 100644 --- a/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategyFactory.java +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/assignment/StickyPartitionAssignmentStrategyFactory.java @@ -12,69 +12,48 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.linkedin.datastream.common.VerifiableProperties; import com.linkedin.datastream.common.zk.ZkClient; import com.linkedin.datastream.server.api.strategy.AssignmentStrategy; import com.linkedin.datastream.server.api.strategy.AssignmentStrategyFactory; -import static com.linkedin.datastream.server.assignment.BroadcastStrategyFactory.CFG_MAX_TASKS; -import static com.linkedin.datastream.server.assignment.StickyMulticastStrategyFactory.CFG_IMBALANCE_THRESHOLD; - /** * A factory for creating {@link StickyPartitionAssignmentStrategy} instances */ public class StickyPartitionAssignmentStrategyFactory implements AssignmentStrategyFactory { private static final Logger LOG = LoggerFactory.getLogger(StickyPartitionAssignmentStrategyFactory.class.getName()); - - public static final String CFG_MAX_PARTITION_PER_TASK = "maxPartitionsPerTask"; - public static final String CFG_PARTITIONS_PER_TASK = "partitionsPerTask"; - public static final String CFG_PARTITION_FULLNESS_THRESHOLD_PCT = "partitionFullnessThresholdPct"; - public static final String CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT = "enableElasticTaskAssignment"; - public static final String CFG_ZK_ADDRESS = "zkAddress"; - public static final String CFG_ZK_SESSION_TIMEOUT = "zkSessionTimeout"; - public static final String CFG_ZK_CONNECTION_TIMEOUT = "zkConnectionTimeout"; - public static final String CFG_CLUSTER_NAME = "cluster"; - - public static final boolean DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT = false; + protected PartitionAssignmentStrategyConfig _config; @Override public AssignmentStrategy createStrategy(Properties assignmentStrategyProperties) { - VerifiableProperties props = new VerifiableProperties(assignmentStrategyProperties); - int cfgMaxTasks = props.getInt(CFG_MAX_TASKS, 0); - int cfgImbalanceThreshold = props.getInt(CFG_IMBALANCE_THRESHOLD, 0); - int cfgMaxParitionsPerTask = props.getInt(CFG_MAX_PARTITION_PER_TASK, 0); - // Set to Optional.empty() if the value is 0 - Optional maxTasks = cfgMaxTasks > 0 ? Optional.of(cfgMaxTasks) : Optional.empty(); - Optional imbalanceThreshold = cfgImbalanceThreshold > 0 ? Optional.of(cfgImbalanceThreshold) - : Optional.empty(); - Optional maxPartitions = cfgMaxParitionsPerTask > 0 ? Optional.of(cfgMaxParitionsPerTask) : - Optional.empty(); - boolean enableElasticTaskAssignment = props.getBoolean(CFG_ENABLE_ELASTIC_TASK_ASSIGNMENT, - DEFAULT_ENABLE_ELASTIC_TASK_ASSIGNMENT); - int cfgPartitionsPerTask = props.getInt(CFG_PARTITIONS_PER_TASK, 0); - int cfgPartitionFullnessThresholdPct = props.getIntInRange(CFG_PARTITION_FULLNESS_THRESHOLD_PCT, 0, 0, 100); - Optional partitionsPerTask = cfgPartitionsPerTask > 0 ? Optional.of(cfgMaxParitionsPerTask) : - Optional.empty(); - Optional partitionFullnessThresholdPct = cfgPartitionFullnessThresholdPct > 0 ? - Optional.of(cfgPartitionFullnessThresholdPct) : Optional.empty(); - String cluster = props.getString(CFG_CLUSTER_NAME, null); + _config = new PartitionAssignmentStrategyConfig(assignmentStrategyProperties); - // Create the ZooKeeper Client + boolean enableElasticTaskAssignment = _config.isElasticTaskAssignmentEnabled(); + // Create the zookeeper client Optional zkClient = Optional.empty(); - String zkAddress = props.getString(CFG_ZK_ADDRESS, null); - if (enableElasticTaskAssignment && StringUtils.isBlank(zkAddress)) { - LOG.warn("Disabling elastic task assignment as zkAddress is not present or empty"); + try { + zkClient = constructZooKeeperClient(); + } catch (IllegalStateException ex) { + LOG.warn("Disabling elastic task assignment as zkClient initialization failed", ex); enableElasticTaskAssignment = false; } - if (enableElasticTaskAssignment) { - int zkSessionTimeout = props.getInt(CFG_ZK_SESSION_TIMEOUT, ZkClient.DEFAULT_SESSION_TIMEOUT); - int zkConnectionTimeout = props.getInt(CFG_ZK_CONNECTION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT); - zkClient = Optional.of(new ZkClient(zkAddress, zkSessionTimeout, zkConnectionTimeout)); + return new StickyPartitionAssignmentStrategy(_config.getMaxTasks(), _config.getImbalanceThreshold(), + _config.getMaxPartitions(), enableElasticTaskAssignment, _config.getPartitionsPerTask(), + _config.getPartitionFullnessThresholdPct(), zkClient, _config.getCluster()); + } + + protected Optional constructZooKeeperClient() { + if (!_config.isElasticTaskAssignmentEnabled()) { + return Optional.empty(); + } + + if (StringUtils.isBlank(_config.getZkAddress())) { + LOG.warn("ZkAddress is not present or empty"); + throw new IllegalStateException("ZkAddress is empty or not provided"); } - return new StickyPartitionAssignmentStrategy(maxTasks, imbalanceThreshold, maxPartitions, - enableElasticTaskAssignment, partitionsPerTask, partitionFullnessThresholdPct, zkClient, cluster); + return Optional.of(new ZkClient(_config.getZkAddress(), _config.getZkSessionTimeout(), + _config.getZkConnectionTimeout())); } } diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/FileBasedPartitionThroughputProvider.java b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/FileBasedPartitionThroughputProvider.java new file mode 100644 index 000000000..3a82f4e02 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/FileBasedPartitionThroughputProvider.java @@ -0,0 +1,137 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.providers; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.HashMap; +import java.util.Iterator; + +import org.apache.commons.lang3.StringUtils; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import com.linkedin.datastream.server.ClusterThroughputInfo; +import com.linkedin.datastream.server.PartitionThroughputInfo; + + +/** + * An implementation of {@link PartitionThroughputProvider} which reads throughput data from a resource file. + * Used for testing purposes. + */ +public class FileBasedPartitionThroughputProvider implements PartitionThroughputProvider { + private static final String ROOT_NODE_NAME = "stats"; + private static final String MESSAGES_IN_RATE_TOKEN = "msgIn:"; + private static final String BYTES_IN_KB_RATE_TOKEN = "bytesInKB:"; + + private final String _fileName; + + /** + * Creates an instance of {@link FileBasedPartitionThroughputProvider} + * @param fileName Resource file name + */ + public FileBasedPartitionThroughputProvider(String fileName) { + _fileName = fileName; + } + + /** + * {@inheritDoc} + */ + @Override + public ClusterThroughputInfo getThroughputInfo(String clusterName) { + File partitionThroughputFile = getThroughputFileFromResources(); + return readThroughputInfoFromFile(partitionThroughputFile, clusterName); + } + + /** + * {@inheritDoc} + */ + @Override + public HashMap getThroughputInfo() { + File partitionThroughputFile = getThroughputFileFromResources(); + return readThroughputInfoFromFile(partitionThroughputFile); + } + + private File getThroughputFileFromResources() { + URL resource = getClass().getClassLoader().getResource(_fileName); + File partitionThroughputFile = null; + if (resource == null) { + throw new IllegalArgumentException("File not found."); + } + + try { + partitionThroughputFile = new File(resource.toURI()); + } catch (URISyntaxException ex) { + throw new IllegalArgumentException("Failed to construct URI for the input file"); + } + return partitionThroughputFile; + } + + private HashMap readThroughputInfoFromFile(File file) { + ObjectMapper mapper = new ObjectMapper(); + HashMap clusterInfoMap = new HashMap<>(); + + try { + JsonNode root = mapper.readTree(file); + JsonNode allStats = root.get(ROOT_NODE_NAME); + Iterator clusterNames = allStats.getFieldNames(); + + while (clusterNames.hasNext()) { + String key = clusterNames.next(); + clusterInfoMap.put(key, getClusterThroughputInfoFromNode(mapper, key, allStats.get(key))); + } + } catch (IOException ex) { + ex.printStackTrace(); + } + + return clusterInfoMap; + } + + private ClusterThroughputInfo readThroughputInfoFromFile(File file, String clusterName) { + ObjectMapper mapper = new ObjectMapper(); + ClusterThroughputInfo clusterInfo = null; + + try { + JsonNode root = mapper.readTree(file); + JsonNode allStats = root.get(ROOT_NODE_NAME); + JsonNode clusterStats = allStats.get(clusterName); + + if (clusterStats == null) { + throw new IllegalArgumentException("Throughput info for cluster" + clusterName + "not found."); + } + + clusterInfo = getClusterThroughputInfoFromNode(mapper, clusterName, clusterStats); + } catch (IOException e) { + e.printStackTrace(); + } + + return clusterInfo; + } + + private ClusterThroughputInfo getClusterThroughputInfoFromNode(ObjectMapper mapper, String clusterName, + JsonNode clusterStats) { + TypeReference> mapTypeRef = new TypeReference>() { }; + HashMap partitionInfoMap = new HashMap<>(); + + try { + HashMap partitionStats = mapper.readValue(clusterStats, mapTypeRef); + for (String partition : partitionStats.keySet()) { + String value = partitionStats.get(partition); + String[] tokens = StringUtils.split(value, ","); + int bytesInKBRate = Integer.parseInt(StringUtils.substring(tokens[0], BYTES_IN_KB_RATE_TOKEN.length() + 1)); + int messagesInRate = Integer.parseInt(StringUtils.substring(tokens[1], MESSAGES_IN_RATE_TOKEN.length() + 1)); + partitionInfoMap.put(partition, new PartitionThroughputInfo(bytesInKBRate, messagesInRate, partition)); + } + } catch (IOException e) { + e.printStackTrace(); + } + + return new ClusterThroughputInfo(clusterName, partitionInfoMap); + } +} diff --git a/datastream-server/src/main/java/com/linkedin/datastream/server/providers/NoOpPartitionThroughputProvider.java b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/NoOpPartitionThroughputProvider.java new file mode 100644 index 000000000..dc3464632 --- /dev/null +++ b/datastream-server/src/main/java/com/linkedin/datastream/server/providers/NoOpPartitionThroughputProvider.java @@ -0,0 +1,26 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server.providers; + +import java.util.HashMap; + +import com.linkedin.datastream.server.ClusterThroughputInfo; + + +/** + * Dummy implementation of {@link PartitionThroughputProvider} + */ +public class NoOpPartitionThroughputProvider implements PartitionThroughputProvider { + @Override + public ClusterThroughputInfo getThroughputInfo(String clusterName) { + return null; + } + + @Override + public HashMap getThroughputInfo() { + return null; + } +} diff --git a/datastream-server/src/test/java/com/linkedin/datastream/server/TestFileBasedPartitionThroughputProvider.java b/datastream-server/src/test/java/com/linkedin/datastream/server/TestFileBasedPartitionThroughputProvider.java new file mode 100644 index 000000000..30e56c94a --- /dev/null +++ b/datastream-server/src/test/java/com/linkedin/datastream/server/TestFileBasedPartitionThroughputProvider.java @@ -0,0 +1,46 @@ +/** + * Copyright 2021 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.server; + +import java.util.Map; + +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.linkedin.datastream.server.providers.FileBasedPartitionThroughputProvider; +import com.linkedin.datastream.server.providers.PartitionThroughputProvider; + + +/** + * Tests for {@link FileBasedPartitionThroughputProvider} + */ +public class TestFileBasedPartitionThroughputProvider { + + private static final String THROUGHPUT_FILE_NAME = "partitionThroughput.json"; + + @Test + public void getPartitionThroughputForMetricsTest() { + String clusterName = "cookie"; + FileBasedPartitionThroughputProvider provider = new FileBasedPartitionThroughputProvider(THROUGHPUT_FILE_NAME); + ClusterThroughputInfo stats = provider.getThroughputInfo(clusterName); + Assert.assertNotNull(stats); + Assert.assertEquals(stats.getClusterName(), clusterName); + } + + @Test + public void getPartitionThroughputForAllClustersTest() { + FileBasedPartitionThroughputProvider provider = new FileBasedPartitionThroughputProvider(THROUGHPUT_FILE_NAME); + Map clusterInfoMap = provider.getThroughputInfo(); + Assert.assertNotNull(clusterInfoMap); + } + + @Test + public void getPartitionThroughputThrowsWhenFileNotFound() { + String dummyFileName = "dummy.json"; + PartitionThroughputProvider provider = new FileBasedPartitionThroughputProvider(dummyFileName); + Assert.assertThrows(IllegalArgumentException.class, provider::getThroughputInfo); + } +} diff --git a/datastream-server/src/test/resources/partitionThroughput.json b/datastream-server/src/test/resources/partitionThroughput.json new file mode 100644 index 000000000..e81a293a1 --- /dev/null +++ b/datastream-server/src/test/resources/partitionThroughput.json @@ -0,0 +1,218 @@ +{ + "stats" : { + "pizza" : { + "Pepperoni-0" : "bytesInKB: 15, msgIn:39", + "Pepperoni-1" : "bytesInKB: 11, msgIn:29", + "Pepperoni-10" : "bytesInKB: 13, msgIn:32", + "Pepperoni-11" : "bytesInKB: 12, msgIn:30", + "Pepperoni-12" : "bytesInKB: 13, msgIn:34", + "Pepperoni-13" : "bytesInKB: 11, msgIn:28", + "Pepperoni-14" : "bytesInKB: 10, msgIn:27", + "Pepperoni-15" : "bytesInKB: 15, msgIn:39", + "Pepperoni-2" : "bytesInKB: 12, msgIn:30", + "Pepperoni-3" : "bytesInKB: 10, msgIn:27", + "Pepperoni-4" : "bytesInKB: 11, msgIn:29", + "Pepperoni-5" : "bytesInKB: 13, msgIn:34", + "Pepperoni-6" : "bytesInKB: 15, msgIn:39", + "Pepperoni-7" : "bytesInKB: 16, msgIn:42", + "Pepperoni-8" : "bytesInKB: 10, msgIn:27", + "Pepperoni-9" : "bytesInKB: 15, msgIn:39", + "Margherita-0" : "bytesInKB: 3, msgIn:9", + "Margherita-1" : "bytesInKB: 3, msgIn:10", + "Margherita-10" : "bytesInKB: 2, msgIn:7", + "Margherita-11" : "bytesInKB: 4, msgIn:11", + "Margherita-12" : "bytesInKB: 4, msgIn:12", + "Margherita-13" : "bytesInKB: 3, msgIn:9", + "Margherita-14" : "bytesInKB: 3, msgIn:10", + "Margherita-15" : "bytesInKB: 3, msgIn:9", + "Margherita-16" : "bytesInKB: 2, msgIn:7", + "Margherita-17" : "bytesInKB: 4, msgIn:11", + "Margherita-18" : "bytesInKB: 2, msgIn:6", + "Margherita-19" : "bytesInKB: 3, msgIn:10", + "Margherita-2" : "bytesInKB: 3, msgIn:6", + "Margherita-20" : "bytesInKB: 3, msgIn:10", + "Margherita-21" : "bytesInKB: 4, msgIn:12", + "Margherita-22" : "bytesInKB: 3, msgIn:10", + "Margherita-23" : "bytesInKB: 2, msgIn:6", + "Margherita-24" : "bytesInKB: 3, msgIn:9", + "Margherita-25" : "bytesInKB: 3, msgIn:9", + "Margherita-26" : "bytesInKB: 4, msgIn:12", + "Margherita-27" : "bytesInKB: 3, msgIn:9", + "Margherita-28" : "bytesInKB: 3, msgIn:9", + "Margherita-29" : "bytesInKB: 4, msgIn:11", + "Margherita-3" : "bytesInKB: 3, msgIn:9", + "Margherita-30" : "bytesInKB: 3, msgIn:10", + "Margherita-31" : "bytesInKB: 2, msgIn:7", + "Margherita-4" : "bytesInKB: 3, msgIn:10", + "Margherita-5" : "bytesInKB: 2, msgIn:6", + "Margherita-6" : "bytesInKB: 3, msgIn:9", + "Margherita-7" : "bytesInKB: 3, msgIn:9", + "Margherita-8" : "bytesInKB: 3, msgIn:9", + "Margherita-9" : "bytesInKB: 4, msgIn:11", + "QuattroFormaggi-0" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-1" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-2" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-3" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-4" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-5" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-6" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-7" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-8" : "bytesInKB: 2, msgIn:3", + "QuattroFormaggi-9" : "bytesInKB: 2, msgIn:3", + "Calzone-0" : "bytesInKB: 8, msgIn:12", + "Calzone-1" : "bytesInKB: 4, msgIn:6", + "Calzone-2" : "bytesInKB: 4, msgIn:6", + "Calzone-3" : "bytesInKB: 4, msgIn:6", + "Calzone-4" : "bytesInKB: 4, msgIn:5", + "Calzone-5" : "bytesInKB: 4, msgIn:6", + "Calzone-6" : "bytesInKB: 4, msgIn:6", + "Calzone-7" : "bytesInKB: 7, msgIn:10", + "Calzone-8" : "bytesInKB: 6, msgIn:9", + "Calzone-9" : "bytesInKB: 9, msgIn:13", + "Hawaiian-0" : "bytesInKB: 1, msgIn:3", + "Hawaiian-1" : "bytesInKB: 1, msgIn:2", + "Hawaiian-2" : "bytesInKB: 1, msgIn:1", + "Hawaiian-3" : "bytesInKB: 1, msgIn:2", + "Hawaiian-4" : "bytesInKB: 1, msgIn:3", + "Hawaiian-5" : "bytesInKB: 1, msgIn:2", + "Hawaiian-6" : "bytesInKB: 0, msgIn:1", + "Hawaiian-7" : "bytesInKB: 1, msgIn:2", + "Hawaiian-8" : "bytesInKB: 1, msgIn:3", + "Hawaiian-9" : "bytesInKB: 1, msgIn:2", + "Capriccioza-0" : "bytesInKB: 4, msgIn:4", + "Capriccioza-1" : "bytesInKB: 4, msgIn:3", + "Capriccioza-10" : "bytesInKB: 5, msgIn:4", + "Capriccioza-11" : "bytesInKB: 5, msgIn:4", + "Capriccioza-12" : "bytesInKB: 4, msgIn:4", + "Capriccioza-13" : "bytesInKB: 4, msgIn:4", + "Capriccioza-14" : "bytesInKB: 4, msgIn:3", + "Capriccioza-15" : "bytesInKB: 4, msgIn:4", + "Capriccioza-2" : "bytesInKB: 5, msgIn:4", + "Capriccioza-3" : "bytesInKB: 5, msgIn:4", + "Capriccioza-4" : "bytesInKB: 5, msgIn:4", + "Capriccioza-5" : "bytesInKB: 5, msgIn:4", + "Capriccioza-6" : "bytesInKB: 5, msgIn:4", + "Capriccioza-7" : "bytesInKB: 5, msgIn:4", + "Capriccioza-8" : "bytesInKB: 6, msgIn:5", + "Capriccioza-9" : "bytesInKB: 4, msgIn:4" + }, + "cookie" : { + "ChocolateChip-0" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-1" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-2" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-3" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-4" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-5" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-6" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-7" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-8" : "bytesInKB: 0, msgIn:1", + "ChocolateChip-9" : "bytesInKB: 0, msgIn:1", + "MacadamiaWhiteChocolate-0" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-1" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-10" : "bytesInKB: 1, msgIn:5", + "MacadamiaWhiteChocolate-11" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-12" : "bytesInKB: 1, msgIn:1", + "MacadamiaWhiteChocolate-13" : "bytesInKB: 1, msgIn:3", + "MacadamiaWhiteChocolate-14" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-15" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-16" : "bytesInKB: 2, msgIn:3", + "MacadamiaWhiteChocolate-17" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-18" : "bytesInKB: 2, msgIn:3", + "MacadamiaWhiteChocolate-19" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-2" : "bytesInKB: 2, msgIn:3", + "MacadamiaWhiteChocolate-20" : "bytesInKB: 1, msgIn:3", + "MacadamiaWhiteChocolate-21" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-22" : "bytesInKB: 1, msgIn:3", + "MacadamiaWhiteChocolate-23" : "bytesInKB: 1, msgIn:1", + "MacadamiaWhiteChocolate-24" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-25" : "bytesInKB: 1, msgIn:1", + "MacadamiaWhiteChocolate-26" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-27" : "bytesInKB: 2, msgIn:2", + "MacadamiaWhiteChocolate-28" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-29" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-3" : "bytesInKB: 1, msgIn:1", + "MacadamiaWhiteChocolate-30" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-31" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-4" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-5" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-6" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-7" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-8" : "bytesInKB: 1, msgIn:2", + "MacadamiaWhiteChocolate-9" : "bytesInKB: 1, msgIn:2", + "Coconut-0" : "bytesInKB: 2, msgIn:4", + "Coconut-1" : "bytesInKB: 2, msgIn:4", + "Coconut-10" : "bytesInKB: 2, msgIn:4", + "Coconut-11" : "bytesInKB: 2, msgIn:3", + "Coconut-12" : "bytesInKB: 2, msgIn:4", + "Coconut-13" : "bytesInKB: 2, msgIn:4", + "Coconut-14" : "bytesInKB: 2, msgIn:4", + "Coconut-15" : "bytesInKB: 2, msgIn:4", + "Coconut-16" : "bytesInKB: 2, msgIn:3", + "Coconut-17" : "bytesInKB: 2, msgIn:3", + "Coconut-18" : "bytesInKB: 2, msgIn:4", + "Coconut-19" : "bytesInKB: 2, msgIn:4", + "Coconut-2" : "bytesInKB: 2, msgIn:4", + "Coconut-20" : "bytesInKB: 2, msgIn:3", + "Coconut-21" : "bytesInKB: 2, msgIn:3", + "Coconut-22" : "bytesInKB: 2, msgIn:4", + "Coconut-23" : "bytesInKB: 2, msgIn:3", + "Coconut-24" : "bytesInKB: 2, msgIn:3", + "Coconut-25" : "bytesInKB: 2, msgIn:3", + "Coconut-26" : "bytesInKB: 2, msgIn:3", + "Coconut-27" : "bytesInKB: 2, msgIn:3", + "Coconut-28" : "bytesInKB: 2, msgIn:4", + "Coconut-29" : "bytesInKB: 2, msgIn:4", + "Coconut-3" : "bytesInKB: 2, msgIn:3", + "Coconut-30" : "bytesInKB: 2, msgIn:4", + "Coconut-31" : "bytesInKB: 2, msgIn:4", + "Coconut-4" : "bytesInKB: 2, msgIn:3", + "Coconut-5" : "bytesInKB: 2, msgIn:4", + "Coconut-6" : "bytesInKB: 2, msgIn:4", + "Coconut-7" : "bytesInKB: 2, msgIn:4", + "Coconut-8" : "bytesInKB: 2, msgIn:4", + "Coconut-9" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-0" : "bytesInKB: 1, msgIn:3", + "OatmealRaisin-1" : "bytesInKB: 2, msgIn:5", + "OatmealRaisin-10" : "bytesInKB: 2, msgIn:4", + "OatmealRaisin-11" : "bytesInKB: 2, msgIn:2", + "OatmealRaisin-12" : "bytesInKB: 3, msgIn:4", + "OatmealRaisin-13" : "bytesInKB: 2, msgIn:4", + "OatmealRaisin-14" : "bytesInKB: 3, msgIn:5", + "OatmealRaisin-15" : "bytesInKB: 2, msgIn:4", + "OatmealRaisin-16" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-17" : "bytesInKB: 3, msgIn:3", + "OatmealRaisin-18" : "bytesInKB: 3, msgIn:2", + "OatmealRaisin-19" : "bytesInKB: 3, msgIn:5", + "OatmealRaisin-2" : "bytesInKB: 3, msgIn:3", + "OatmealRaisin-20" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-21" : "bytesInKB: 3, msgIn:4", + "OatmealRaisin-22" : "bytesInKB: 3, msgIn:3", + "OatmealRaisin-23" : "bytesInKB: 3, msgIn:4", + "OatmealRaisin-24" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-25" : "bytesInKB: 1, msgIn:3", + "OatmealRaisin-26" : "bytesInKB: 3, msgIn:6", + "OatmealRaisin-27" : "bytesInKB: 3, msgIn:5", + "OatmealRaisin-28" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-29" : "bytesInKB: 2, msgIn:2", + "OatmealRaisin-3" : "bytesInKB: 2, msgIn:3", + "OatmealRaisin-30" : "bytesInKB: 2, msgIn:6", + "OatmealRaisin-31" : "bytesInKB: 2, msgIn:4", + "OatmealRaisin-4" : "bytesInKB: 2, msgIn:2", + "OatmealRaisin-5" : "bytesInKB: 3, msgIn:6", + "OatmealRaisin-6" : "bytesInKB: 2, msgIn:4", + "OatmealRaisin-7" : "bytesInKB: 3, msgIn:6", + "OatmealRaisin-8" : "bytesInKB: 2, msgIn:2", + "OatmealRaisin-9" : "bytesInKB: 3, msgIn:4", + "PeanutButter-0" : "bytesInKB: 10, msgIn:39", + "PeanutButter-1" : "bytesInKB: 9, msgIn:27", + "PeanutButter-2" : "bytesInKB: 10, msgIn:39", + "PeanutButter-3" : "bytesInKB: 8, msgIn:25", + "PeanutButter-4" : "bytesInKB: 11, msgIn:39", + "PeanutButter-5" : "bytesInKB: 10, msgIn:39", + "PeanutButter-6" : "bytesInKB: 8, msgIn:25", + "PeanutButter-7" : "bytesInKB: 8, msgIn:24", + "PeanutButter-8" : "bytesInKB: 8, msgIn:26", + "PeanutButter-9" : "bytesInKB: 8, msgIn:26" + } + } +} diff --git a/gradle/license.gradle b/gradle/license.gradle index f2ff3d801..7d821baab 100644 --- a/gradle/license.gradle +++ b/gradle/license.gradle @@ -7,6 +7,7 @@ subprojects { // Skip Twitter bootstrap JS and CSS. skipExistingHeaders = true ext.year = Calendar.getInstance().get(Calendar.YEAR) + exclude "*.json" } // Check all source files in the main and