Skip to content

Commit

Permalink
[CARBONDATA-171] Block distribution not proper when the number of act…
Browse files Browse the repository at this point in the history
…ive executors more than the node size
  • Loading branch information
mohammadshahidkhan authored and gvramana committed Aug 23, 2016
1 parent c11058d commit eac5573
Show file tree
Hide file tree
Showing 3 changed files with 215 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1225,9 +1225,11 @@ private static void assignLeftOverBlocks(Map<String, List<Distributable>> output
List<Distributable> blockLst = outputMap.get(activeNode);
if (null == blockLst) {
blockLst = new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
outputMap.put(activeNode, blockLst);
}
populateBlocks(uniqueBlocks, noOfBlocksPerNode, blockLst);
if (blockLst.size() > 0) {
outputMap.put(activeNode, blockLst);
}
}
} else {
for (Map.Entry<String, List<Distributable>> entry : outputMap.entrySet()) {
Expand Down Expand Up @@ -1293,21 +1295,15 @@ private static void createOutputMap(Map<String, List<Distributable>> outputMap,
// are assigned first
Collections.sort(multiBlockRelations);

Set<String> validActiveNodes = new HashSet<String>();
// find all the valid active nodes
for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
String nodeName = nodeMultiBlockRelation.getNode();
//assign the block to the node only if the node is active
if (null != activeNodes && isActiveExecutor(activeNodes, nodeName)) {
validActiveNodes.add(nodeName);
}
}

for (NodeMultiBlockRelation nodeMultiBlockRelation : multiBlockRelations) {
String nodeName = nodeMultiBlockRelation.getNode();
//assign the block to the node only if the node is active
if (!validActiveNodes.isEmpty() && !validActiveNodes.contains(nodeName)) {
continue;
String activeExecutor = nodeName;
if (null != activeNodes) {
activeExecutor = getActiveExecutor(activeNodes, nodeName);
if (null == activeExecutor) {
continue;
}
}
// this loop will be for each NODE
int nodeCapacity = 0;
Expand All @@ -1317,14 +1313,14 @@ private static void createOutputMap(Map<String, List<Distributable>> outputMap,
// check if this is already assigned.
if (uniqueBlocks.contains(block)) {

if (null == outputMap.get(nodeName)) {
if (null == outputMap.get(activeExecutor)) {
List<Distributable> list =
new ArrayList<Distributable>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
outputMap.put(nodeName, list);
outputMap.put(activeExecutor, list);
}
// assign this block to this node if node has capacity left
if (nodeCapacity < blocksPerNode) {
List<Distributable> infos = outputMap.get(nodeName);
List<Distributable> infos = outputMap.get(activeExecutor);
infos.add(block);
nodeCapacity++;
uniqueBlocks.remove(block);
Expand All @@ -1344,28 +1340,34 @@ private static void createOutputMap(Map<String, List<Distributable>> outputMap,
* @param nodeName
* @return returns true if active else false.
*/
private static boolean isActiveExecutor(List activeNode, String nodeName) {
private static String getActiveExecutor(List activeNode, String nodeName) {
boolean isActiveNode = activeNode.contains(nodeName);
if (isActiveNode) {
return isActiveNode;
return nodeName;
}
//if localhost then retrieve the localhost name then do the check
else if (nodeName.equals("localhost")) {
try {
String hostName = InetAddress.getLocalHost().getHostName();
isActiveNode = activeNode.contains(hostName);
if(isActiveNode){
return hostName;
}
} catch (UnknownHostException ue) {
isActiveNode = false;
}
} else {
try {
String hostAddress = InetAddress.getLocalHost().getHostAddress();
isActiveNode = activeNode.contains(hostAddress);
if(isActiveNode){
return hostAddress;
}
} catch (UnknownHostException ue) {
isActiveNode = false;
}
}
return isActiveNode;
return null;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ object DistributionUtil {
confExecutors
} else {nodeMapping.size()}

var startTime = System.currentTimeMillis();
val startTime = System.currentTimeMillis();
CarbonContext.ensureExecutors(sparkContext, requiredExecutors)
var nodes = DistributionUtil.getNodeList(sparkContext)
var maxTimes = 30;
Expand All @@ -139,9 +139,9 @@ object DistributionUtil {
nodes = DistributionUtil.getNodeList(sparkContext)
maxTimes = maxTimes - 1;
}
var timDiff = System.currentTimeMillis() - startTime;
val timDiff = System.currentTimeMillis() - startTime;
LOGGER.info("Total Time taken to ensure the required executors : " + timDiff)
LOGGER.info("Time elapsed to allocate the required executors : " + (30 - maxTimes) * 500)
nodes
nodes.distinct
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.spark.load;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.carbondata.core.carbon.datastore.block.Distributable;
import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
import org.junit.Assert;
import org.junit.Test;

/**
* Test class to test block distribution functionality
*/
public class CarbonLoaderUtilTest {
List<Distributable> blockInfos = null;
int noOfNodesInput = -1;
List<String> activeNode = null;
Map<String, List<Distributable>> expected = null;
Map<String, List<Distributable>> mapOfNodes = null;

@Test public void nodeBlockMapping() throws Exception {

// scenario when the 3 nodes and 3 executors
initSet1();
Map<String, List<Distributable>> mapOfNodes =
CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
// node allocation
Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
// block allocation
boolean isEqual = compareResult(expected, mapOfNodes);
Assert.assertTrue("Block Allocation", isEqual);

// 2 node and 3 executors
initSet2();
mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
// node allocation
Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
// block allocation
isEqual = compareResult(expected, mapOfNodes);
Assert.assertTrue("Block Allocation", isEqual);

// 3 data node and 2 executors
initSet3();
mapOfNodes = CarbonLoaderUtil.nodeBlockMapping(blockInfos, noOfNodesInput, activeNode);
// node allocation
Assert.assertTrue("Node Allocation", expected.size() == mapOfNodes.size());
// block allocation
isEqual = compareResult(expected, mapOfNodes);
Assert.assertTrue("Block Allocation", isEqual);
}

/**
* compares the blocks allocation
*
* @param expectedResult
* @param actualResult
* @return
*/
private boolean compareResult(Map<String, List<Distributable>> expectedResult,
Map<String, List<Distributable>> actualResult) {
expectedResult = sortByListSize(expectedResult);
actualResult = sortByListSize(actualResult);
List<List<Distributable>> expectedList = new LinkedList(expectedResult.entrySet());
List<List<Distributable>> mapOfNodesList = new LinkedList(actualResult.entrySet());
boolean isEqual = expectedList.size() == mapOfNodesList.size();
if (isEqual) {
for (int i = 0; i < expectedList.size(); i++) {
int size1 = ((List) ((Map.Entry) (expectedList.get(i))).getValue()).size();
int size2 = ((List) ((Map.Entry) (mapOfNodesList.get(i))).getValue()).size();
isEqual = size1 == size2;
if (!isEqual) {
break;
}
}
}
return isEqual;
}

/**
* sort by list size
*
* @param map
* @return
*/
private static Map<String, List<Distributable>> sortByListSize(
Map<String, List<Distributable>> map) {
List<List<Distributable>> list = new LinkedList(map.entrySet());
Collections.sort(list, new Comparator() {
public int compare(Object obj1, Object obj2) {
if (obj1 == null && obj2 == null) {
return 0;
} else if (obj1 == null) {
return 1;
} else if (obj2 == null) {
return -1;
}
int size1 = ((List) ((Map.Entry) (obj1)).getValue()).size();
int size2 = ((List) ((Map.Entry) (obj2)).getValue()).size();
return size2 - size1;
}
});

Map res = new LinkedHashMap();
for (Iterator it = list.iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry) it.next();
res.put(entry.getKey(), entry.getValue());
}
return res;
}

void initSet1() {
blockInfos = new ArrayList<>();
activeNode = new ArrayList<>();
activeNode.add("node-7");
activeNode.add("node-9");
activeNode.add("node-11");
String[] location = { "node-7", "node-9", "node-11" };
blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
expected = new HashMap<>();
expected.put("node-7", blockInfos.subList(0, 2));
expected.put("node-9", blockInfos.subList(2, 4));
expected.put("node-11", blockInfos.subList(4, 6));
}

void initSet2() {
blockInfos = new ArrayList<>();
activeNode = new ArrayList<>();
activeNode.add("node-7");
activeNode.add("node-9");
activeNode.add("node-11");
String[] location = { "node-7", "node-11" };
blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
expected = new HashMap<>();
expected.put("node-7", blockInfos.subList(0, 2));
expected.put("node-9", blockInfos.subList(2, 4));
expected.put("node-11", blockInfos.subList(4, 6));
}

void initSet3() {
blockInfos = new ArrayList<>();
activeNode = new ArrayList<>();
activeNode.add("node-7");
activeNode.add("node-11");
String[] location = { "node-7", "node-9", "node-11" };
blockInfos.add(new TableBlockInfo("node", 1, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 2, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 3, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 4, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 5, "1", location, 0));
blockInfos.add(new TableBlockInfo("node", 6, "1", location, 0));
expected = new HashMap<>();
expected.put("node-7", blockInfos.subList(0, 3));
expected.put("node-11", blockInfos.subList(3, 6));
}
}

0 comments on commit eac5573

Please sign in to comment.