Skip to content

Commit

Permalink
remove partitioner in rdd package
Browse files Browse the repository at this point in the history
remove partitioner in query
  • Loading branch information
jackylk committed Nov 28, 2016
1 parent 23bae4c commit f7b76fd
Show file tree
Hide file tree
Showing 18 changed files with 93 additions and 427 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,16 +224,13 @@ private static void cleanDeletedFactFile(String loadFolderPath) {
/**
* @param loadModel
* @param storeLocation
* @param partitionCount
* @param isForceDelete
* @param details
* @return
*
*/
public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
String storeLocation, int partitionCount, boolean isForceDelete,
LoadMetadataDetails[] details) {
String path = null;
String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
List<LoadMetadataDetails> deletedLoads =
new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

Expand All @@ -242,12 +239,8 @@ public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel,
if (details != null && details.length != 0) {
for (LoadMetadataDetails oneLoad : details) {
if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {
boolean deletionStatus = false;

for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
path = getSegmentPath(loadModel, storeLocation, partitionId, oneLoad);
deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
}
String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad);
boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path);
if (deletionStatus) {
isDeleted = true;
oneLoad.setVisibility("false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,12 +219,11 @@ public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails
*
* @param storeLocation
* @param carbonLoadModel
* @param partitionCount
* @param compactionSize
* @return
*/
public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize,
CarbonLoadModel carbonLoadModel, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {

List sortedSegments = new ArrayList(segments);
Expand All @@ -245,7 +244,7 @@ public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeL
if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {

listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation);
listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
} else {

listOfSegmentsToBeMerged =
Expand Down Expand Up @@ -399,13 +398,12 @@ private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segD
* @param compactionSize
* @param listOfSegmentsAfterPreserve
* @param carbonLoadModel
* @param partitionCount
* @param storeLocation
* @return
*/
private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) {
CarbonLoadModel carbonLoadModel, String storeLocation) {

List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Expand All @@ -423,7 +421,7 @@ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
String segId = segment.getLoadName();
// variable to store one segment size across partition.
long sizeOfOneSegmentAcrossPartition =
getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId);
getSizeOfSegment(storeLocation, tableIdentifier, segId);

// if size of a segment is greater than the Major compaction size. then ignore it.
if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
Expand Down Expand Up @@ -460,30 +458,19 @@ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
}

/**
* For calculating the size of a segment across all partition.
* @param partitionCount
* For calculating the size of the specified segment
* @param storeLocation
* @param tableIdentifier
* @param segId
* @return
*/
private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation,
private static long getSizeOfSegment(String storeLocation,
CarbonTableIdentifier tableIdentifier, String segId) {
long sizeOfOneSegmentAcrossPartition = 0;
// calculate size across partitions
for (int partition = 0; partition < partitionCount; partition++) {

String loadPath = CarbonLoaderUtil
.getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");

CarbonFile segmentFolder =
FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));

long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);

sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
}
return sizeOfOneSegmentAcrossPartition;
String loadPath = CarbonLoaderUtil
.getStoreLocation(storeLocation, tableIdentifier, segId, "0");
CarbonFile segmentFolder =
FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
return getSizeOfFactFileInLoad(segmentFolder);
}

/**
Expand Down Expand Up @@ -691,27 +678,18 @@ public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(

/**
* Removing the already merged segments from list.
* @param segments
* @param loadsToMerge
* @return
*/
public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
List<LoadMetadataDetails> segments,
LoadMetadataDetails lastSeg) {

// take complete list of segments.
List<LoadMetadataDetails> list = new ArrayList<>(segments);

List<LoadMetadataDetails> trimmedList =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);

// sort list
CarbonDataMergerUtil.sortSegments(list);

// first filter out newly added segments.
trimmedList = list.subList(0, list.indexOf(lastSeg) + 1);

return trimmedList;
return list.subList(0, list.indexOf(lastSeg) + 1);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,18 @@

package org.apache.carbondata.spark.partition.api.impl;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.scan.model.CarbonQueryPlan;
import org.apache.carbondata.spark.partition.api.DataPartitioner;
import org.apache.carbondata.spark.partition.api.Partition;

import org.apache.spark.sql.execution.command.Partitioner;

public final class QueryPartitionHelper {
private static final LogService LOGGER =
LogServiceFactory.getLogService(QueryPartitionHelper.class.getName());
private static QueryPartitionHelper instance = new QueryPartitionHelper();
private Properties properties;
private String defaultPartitionerClass;
private Map<String, DataPartitioner> partitionerMap =
new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
private Map<String, DefaultLoadBalancer> loadBalancerMap =
Expand All @@ -56,125 +44,34 @@ public static QueryPartitionHelper getInstance() {
return instance;
}

/**
* Read the properties from CSVFilePartitioner.properties
*/
private static Properties loadProperties() {
Properties properties = new Properties();

File file = new File("DataPartitioner.properties");
FileInputStream fis = null;
try {
if (file.exists()) {
fis = new FileInputStream(file);

properties.load(fis);
}
} catch (Exception e) {
LOGGER
.error(e, e.getMessage());
} finally {
if (null != fis) {
try {
fis.close();
} catch (IOException e) {
LOGGER.error(e,
e.getMessage());
}
}
}

return properties;

}

private void checkInitialization(String tableUniqueName, Partitioner partitioner) {
//Initialise if not done earlier

//String nodeListString = null;
if (properties == null) {
properties = loadProperties();

// nodeListString = properties.getProperty("nodeList", "master,slave1,slave2,slave3");

defaultPartitionerClass = properties.getProperty("partitionerClass",
"org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl");

LOGGER.info(this.getClass().getSimpleName() + " is using following configurations.");
LOGGER.info("partitionerClass : " + defaultPartitionerClass);
LOGGER.info("nodeList : " + Arrays.toString(partitioner.nodeList()));
}

if (partitionerMap.get(tableUniqueName) == null) {
DataPartitioner dataPartitioner;
try {
dataPartitioner =
(DataPartitioner) Class.forName(partitioner.partitionClass()).newInstance();
dataPartitioner.initialize("", new String[0], partitioner);

List<Partition> partitions = dataPartitioner.getAllPartitions();
DefaultLoadBalancer loadBalancer =
new DefaultLoadBalancer(Arrays.asList(partitioner.nodeList()), partitions);
partitionerMap.put(tableUniqueName, dataPartitioner);
loadBalancerMap.put(tableUniqueName, loadBalancer);
} catch (ClassNotFoundException e) {
LOGGER.error(e,
e.getMessage());
} catch (InstantiationException e) {
LOGGER.error(e,
e.getMessage());
} catch (IllegalAccessException e) {
LOGGER.error(e,
e.getMessage());
}
}
}

/**
* Get partitions applicable for query based on filters applied in query
*/
public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan, Partitioner partitioner) {
public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
checkInitialization(tableUniqueName, partitioner);

DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);

List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
return queryPartitions;
}

public List<Partition> getAllPartitions(String databaseName, String tableName,
Partitioner partitioner) {
public List<Partition> getAllPartitions(String databaseName, String tableName) {
String tableUniqueName = databaseName + '_' + tableName;
checkInitialization(tableUniqueName, partitioner);

DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);

return dataPartitioner.getAllPartitions();
}

public void removePartition(String databaseName, String tableName) {
String tableUniqueName = databaseName + '_' + tableName;
partitionerMap.remove(tableUniqueName);
}

/**
* Get the node name where the partition is assigned to.
*/
public String getLocation(Partition partition, String databaseName, String tableName,
Partitioner partitioner) {
public String getLocation(Partition partition, String databaseName, String tableName) {
String tableUniqueName = databaseName + '_' + tableName;
checkInitialization(tableUniqueName, partitioner);

DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
return loadBalancer.getNodeForPartitions(partition);
}

public String[] getPartitionedColumns(String databaseName, String tableName,
Partitioner partitioner) {
String tableUniqueName = databaseName + '_' + tableName;
checkInitialization(tableUniqueName, partitioner);
DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
return dataPartitioner.getPartitionedColumns();
}
}
Loading

0 comments on commit f7b76fd

Please sign in to comment.