Skip to content

Commit

Permalink
Added partitioner
Browse files Browse the repository at this point in the history
Added bucketing in load

Added headers

Bucketing is handled in load and query flow

Fixed test case

Rebased with master

rebased

Added bucketing in spark layer

Rebased and fixed scala style

Added test cases for bucketing in all scenerios. And fixed review comments

rebased and fixed issues

Rebased and fixed comments

Rebased and fixed testcases

Rebased and fixed testcases

Fixed comments

Rebased

Fixed compilation issue
  • Loading branch information
ravipesala authored and jackylk committed Dec 28, 2016
1 parent 65b9221 commit cbf8797
Show file tree
Hide file tree
Showing 44 changed files with 1,417 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.carbondata.core.carbon.datastore;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
Expand Down Expand Up @@ -57,7 +58,8 @@ public class SegmentTaskIndexStore {
* reason of so many map as each segment can have multiple data file and
* each file will have its own btree
*/
private Map<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>> tableSegmentMap;
private Map<AbsoluteTableIdentifier,
Map<String, Map<TaskBucketHolder, AbstractIndex>>> tableSegmentMap;

/**
* map of block info to lock object map, while loading the btree this will be filled
Expand All @@ -76,7 +78,7 @@ public class SegmentTaskIndexStore {

private SegmentTaskIndexStore() {
tableSegmentMap =
new ConcurrentHashMap<AbsoluteTableIdentifier, Map<String, Map<String, AbstractIndex>>>(
new ConcurrentHashMap<>(
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
tableLockMap = new ConcurrentHashMap<AbsoluteTableIdentifier, Object>(
CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Expand All @@ -103,26 +105,26 @@ public static SegmentTaskIndexStore getInstance() {
* @return map of taks id to segment mapping
* @throws IndexBuilderException
*/
public Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
public Map<TaskBucketHolder, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException {
// task id to segment map
Map<String, AbstractIndex> taskIdToTableSegmentMap =
new HashMap<String, AbstractIndex>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
Map<TaskBucketHolder, AbstractIndex> taskIdToTableSegmentMap =
new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
addLockObject(absoluteTableIdentifier);
Iterator<Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
segmentToTableBlocksInfos.entrySet().iterator();
Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
addTableSegmentMap(absoluteTableIdentifier);
Map<String, AbstractIndex> taskIdToSegmentIndexMap = null;
Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
String segmentId = null;
String taskId = null;
TaskBucketHolder taskId = null;
try {
while (iteratorOverSegmentBlocksInfos.hasNext()) {
// segment id to table block mapping
Entry<String, List<TableBlockInfo>> next = iteratorOverSegmentBlocksInfos.next();
// group task id to table block info mapping for the segment
Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos);
// get the existing map of task id to table segment map
segmentId = next.getKey();
Expand All @@ -142,11 +144,11 @@ public Map<String, AbstractIndex> loadAndGetTaskIdToSegmentsMap(
taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId);
if (null == taskIdToSegmentIndexMap) {
// creating a map of task id to table segment
taskIdToSegmentIndexMap = new ConcurrentHashMap<String, AbstractIndex>();
Iterator<Entry<String, List<TableBlockInfo>>> iterator =
taskIdToSegmentIndexMap = new ConcurrentHashMap<TaskBucketHolder, AbstractIndex>();
Iterator<Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
taskIdToTableBlockInfoMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
Entry<TaskBucketHolder, List<TableBlockInfo>> taskToBlockInfoList = iterator.next();
taskId = taskToBlockInfoList.getKey();
taskIdToSegmentIndexMap.put(taskId,
loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier));
Expand Down Expand Up @@ -207,18 +209,18 @@ private synchronized void addLockObject(AbsoluteTableIdentifier absoluteTableIde
* @param absoluteTableIdentifier
* @return table segment map
*/
private Map<String, Map<String, AbstractIndex>> addTableSegmentMap(
private Map<String, Map<TaskBucketHolder, AbstractIndex>> addTableSegmentMap(
AbsoluteTableIdentifier absoluteTableIdentifier) {
// get the instance of lock object
Object lockObject = tableLockMap.get(absoluteTableIdentifier);
Map<String, Map<String, AbstractIndex>> tableSegmentMapTemp =
Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegmentMapTemp =
tableSegmentMap.get(absoluteTableIdentifier);
if (null == tableSegmentMapTemp) {
synchronized (lockObject) {
// segment id to task id to table segment map
tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier);
if (null == tableSegmentMapTemp) {
tableSegmentMapTemp = new ConcurrentHashMap<String, Map<String, AbstractIndex>>();
tableSegmentMapTemp = new ConcurrentHashMap<>();
tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp);
}
}
Expand All @@ -233,12 +235,13 @@ private Map<String, Map<String, AbstractIndex>> addTableSegmentMap(
* @return loaded segment
* @throws CarbonUtilException
*/
private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockInfoList,
private AbstractIndex loadBlocks(TaskBucketHolder holder, List<TableBlockInfo> tableBlockInfoList,
AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException {
// all the block of one task id will be loaded together
// so creating a list which will have all the data file meta data to of one task
List<DataFileFooter> footerList =
CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier);
List<DataFileFooter> footerList = CarbonUtil
.readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList,
tableIdentifier);
AbstractIndex segment = new SegmentTaskIndex();
// file path of only first block is passed as it all table block info path of
// same task id will be same
Expand All @@ -253,21 +256,23 @@ private AbstractIndex loadBlocks(String taskId, List<TableBlockInfo> tableBlockI
* @param segmentToTableBlocksInfos segment if to table blocks info map
* @return task id to table block info mapping
*/
private Map<String, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
private Map<TaskBucketHolder, List<TableBlockInfo>> mappedAndGetTaskIdToTableBlockInfo(
Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos) {
Map<String, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
new ConcurrentHashMap<String, List<TableBlockInfo>>();
Map<TaskBucketHolder, List<TableBlockInfo>> taskIdToTableBlockInfoMap =
new ConcurrentHashMap<>();
Iterator<Entry<String, List<TableBlockInfo>>> iterator =
segmentToTableBlocksInfos.entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, List<TableBlockInfo>> next = iterator.next();
List<TableBlockInfo> value = next.getValue();
for (TableBlockInfo blockInfo : value) {
String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath());
List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(taskNo);
String bucketNo = DataFileUtil.getBucketNo(blockInfo.getFilePath());
TaskBucketHolder bucketHolder = new TaskBucketHolder(taskNo, bucketNo);
List<TableBlockInfo> list = taskIdToTableBlockInfoMap.get(bucketHolder);
if (null == list) {
list = new ArrayList<TableBlockInfo>();
taskIdToTableBlockInfoMap.put(taskNo, list);
taskIdToTableBlockInfoMap.put(bucketHolder, list);
}
list.add(blockInfo);
}
Expand Down Expand Up @@ -304,7 +309,8 @@ public void removeTableBlocks(List<String> segmentToBeRemoved,
return;
}
// Acquire the lock and remove only those instance which was loaded
Map<String, Map<String, AbstractIndex>> map = tableSegmentMap.get(absoluteTableIdentifier);
Map<String, Map<TaskBucketHolder, AbstractIndex>> map =
tableSegmentMap.get(absoluteTableIdentifier);
// if there is no loaded blocks then return
if (null == map) {
return;
Expand All @@ -322,13 +328,44 @@ public void removeTableBlocks(List<String> segmentToBeRemoved,
* @param segmentId
* @return is loaded then return the loaded blocks otherwise null
*/
public Map<String, AbstractIndex> getSegmentBTreeIfExists(
public Map<TaskBucketHolder, AbstractIndex> getSegmentBTreeIfExists(
AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) {
Map<String, Map<String, AbstractIndex>> tableSegment =
Map<String, Map<TaskBucketHolder, AbstractIndex>> tableSegment =
tableSegmentMap.get(absoluteTableIdentifier);
if (null == tableSegment) {
return null;
}
return tableSegment.get(segmentId);
}

public static class TaskBucketHolder implements Serializable {

public String taskNo;

public String bucketNumber;

public TaskBucketHolder(String taskNo, String bucketNumber) {
this.taskNo = taskNo;
this.bucketNumber = bucketNumber;
}

@Override public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TaskBucketHolder that = (TaskBucketHolder) o;

if (taskNo != null ? !taskNo.equals(that.taskNo) : that.taskNo != null) return false;
return bucketNumber != null ?
bucketNumber.equals(that.bucketNumber) :
that.bucketNumber == null;

}

@Override public int hashCode() {
int result = taskNo != null ? taskNo.hashCode() : 0;
result = 31 * result + (bucketNumber != null ? bucketNumber.hashCode() : 0);
return result;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo;
Expand Down Expand Up @@ -190,9 +191,24 @@ private org.apache.carbondata.format.DataType fromWrapperToExternalDataType(Data
new org.apache.carbondata.format.TableSchema(
wrapperTableSchema.getTableId(), thriftColumnSchema, schemaEvolution);
externalTableSchema.setTableProperties(wrapperTableSchema.getTableProperties());
if (wrapperTableSchema.getBucketingInfo() != null) {
externalTableSchema.setBucketingInfo(
fromWrapperToExternalBucketingInfo(wrapperTableSchema.getBucketingInfo()));
}
return externalTableSchema;
}

private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketingInfo(
BucketingInfo bucketingInfo) {
List<org.apache.carbondata.format.ColumnSchema> thriftColumnSchema =
new ArrayList<org.apache.carbondata.format.ColumnSchema>();
for (ColumnSchema wrapperColumnSchema : bucketingInfo.getListOfColumns()) {
thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema));
}
return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema,
bucketingInfo.getNumberOfBuckets());
}

/* (non-Javadoc)
* convert from wrapper to external tableinfo
*/
Expand Down Expand Up @@ -365,9 +381,23 @@ private DataType fromExternalToWrapperDataType(org.apache.carbondata.format.Data
wrapperTableSchema.setListOfColumns(listOfColumns);
wrapperTableSchema.setSchemaEvalution(
fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution()));
if (externalTableSchema.isSetBucketingInfo()) {
wrapperTableSchema.setBucketingInfo(
fromExternalToWarpperBucketingInfo(externalTableSchema.bucketingInfo));
}
return wrapperTableSchema;
}

private BucketingInfo fromExternalToWarpperBucketingInfo(
org.apache.carbondata.format.BucketingInfo externalBucketInfo) {
List<ColumnSchema> listOfColumns = new ArrayList<ColumnSchema>();
for (org.apache.carbondata.format.ColumnSchema externalColumnSchema :
externalBucketInfo.table_columns) {
listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema));
}
return new BucketingInfo(listOfColumns, externalBucketInfo.number_of_buckets);
}

/* (non-Javadoc)
* convert from external to wrapper tableinfo
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.carbondata.core.carbon.metadata.schema;

import java.io.Serializable;
import java.util.List;

import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;

/**
* Bucketing information
*/
public class BucketingInfo implements Serializable {

private List<ColumnSchema> listOfColumns;

private int numberOfBuckets;

public BucketingInfo(List<ColumnSchema> listOfColumns, int numberOfBuckets) {
this.listOfColumns = listOfColumns;
this.numberOfBuckets = numberOfBuckets;
}

public List<ColumnSchema> getListOfColumns() {
return listOfColumns;
}

public int getNumberOfBuckets() {
return numberOfBuckets;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
Expand Down Expand Up @@ -71,6 +72,11 @@ public class CarbonTable implements Serializable {
*/
private Map<String, List<CarbonMeasure>> tableMeasuresMap;

/**
* table bucket map.
*/
private Map<String, BucketingInfo> tableBucketMap;

/**
* tableUniqueName
*/
Expand Down Expand Up @@ -99,6 +105,7 @@ public class CarbonTable implements Serializable {
public CarbonTable() {
this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableMeasuresMap = new HashMap<String, List<CarbonMeasure>>();
this.tableBucketMap = new HashMap<>();
this.aggregateTablesName = new ArrayList<String>();
this.createOrderColumn = new HashMap<String, List<CarbonColumn>>();
}
Expand All @@ -124,7 +131,10 @@ public void loadCarbonTable(TableInfo tableInfo) {
for (TableSchema aggTable : aggregateTableList) {
this.aggregateTablesName.add(aggTable.getTableName());
fillDimensionsAndMeasuresForTables(aggTable);
tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo());
}
tableBucketMap.put(tableInfo.getFactTable().getTableName(),
tableInfo.getFactTable().getBucketingInfo());
}

/**
Expand Down Expand Up @@ -474,6 +484,10 @@ public List<CarbonDimension> getChildren(String dimName, List<CarbonDimension> d
return null;
}

public BucketingInfo getBucketingInfo(String tableName) {
return tableBucketMap.get(tableName);
}

/**
* @return absolute table identifier
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;

import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo;
import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
Expand Down Expand Up @@ -62,6 +63,11 @@ public class TableSchema implements Serializable {
*/
private Map<String, String> tableProperties;

/**
* Information about bucketing of fields and number of buckets
*/
private BucketingInfo bucketingInfo;

public TableSchema() {
this.listOfColumns = new ArrayList<ColumnSchema>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
}
Expand Down Expand Up @@ -202,4 +208,12 @@ public Map<String, String> getTableProperties() {
public void setTableProperties(Map<String, String> tableProperties) {
this.tableProperties = tableProperties;
}

public BucketingInfo getBucketingInfo() {
return bucketingInfo;
}

public void setBucketingInfo(BucketingInfo bucketingInfo) {
this.bucketingInfo = bucketingInfo;
}
}
Loading

0 comments on commit cbf8797

Please sign in to comment.