From cbf8797776c2f3be48efe029d858b37a37d29848 Mon Sep 17 00:00:00 2001 From: ravipesala Date: Sun, 27 Nov 2016 16:58:55 +0530 Subject: [PATCH] Added partitioner Added bucketing in load Added headers Bucketing is handled in load and query flow Fixed test case Rebased with master rebased Added bucketing in spark layer Rebased and fixed scala style Added test cases for bucketing in all scenerios. And fixed review comments rebased and fixed issues Rebased and fixed comments Rebased and fixed testcases Rebased and fixed testcases Fixed comments Rebased Fixed compilation issue --- .../datastore/SegmentTaskIndexStore.java | 89 ++++-- .../ThriftWrapperSchemaConverterImpl.java | 30 ++ .../carbon/metadata/schema/BucketingInfo.java | 49 ++++ .../metadata/schema/table/CarbonTable.java | 14 + .../metadata/schema/table/TableSchema.java | 14 + .../core/carbon/path/CarbonTablePath.java | 37 ++- .../core/partition/Partitioner.java | 26 ++ .../partition/impl/HashPartitionerImpl.java | 105 +++++++ .../core/util/CarbonMetadataUtil.java | 4 +- .../carbondata/core/util/CarbonUtil.java | 5 +- .../datastore/SegmentTaskIndexStoreTest.java | 8 +- .../CarbonFormatDirectoryStructureTest.java | 4 +- .../core/util/CarbonMetadataUtilTest.java | 3 +- .../src/main/thrift/carbondata_index.thrift | 1 + format/src/main/thrift/schema.thrift | 9 + .../carbondata/hadoop/CarbonInputFormat.java | 22 +- .../carbondata/hadoop/CarbonInputSplit.java | 18 ++ .../hadoop/CarbonMultiBlockSplit.java | 23 +- .../index/impl/InMemoryBTreeIndex.java | 9 +- .../carbondata/spark/CarbonOption.scala | 7 + .../spark/rdd/CarbonMergerRDD.scala | 2 +- .../carbondata/spark/rdd/CarbonScanRDD.scala | 146 ++++++---- .../sql/catalyst/CarbonDDLSqlParser.scala | 7 +- .../execution/command/carbonTableSchema.scala | 29 +- .../apache/spark/sql/CarbonSqlParser.scala | 11 +- .../carbondata/spark/CarbonOption.scala | 7 + .../org/apache/spark/sql/CarbonSource.scala | 14 +- .../org/apache/spark/sql/TableCreator.scala | 6 +- .../execution/CarbonLateDecodeStrategy.scala | 42 ++- .../sql/parser/CarbonSparkSqlParser.scala | 14 +- .../bucketing/TableBucketingTestCase.scala | 193 +++++++++++++ .../newflow/CarbonDataLoadConfiguration.java | 11 + .../newflow/DataLoadProcessBuilder.java | 29 ++ .../processing/newflow/row/CarbonRow.java | 2 + ...allelReadMergeSorterWithBucketingImpl.java | 265 ++++++++++++++++++ ...nverterProcessorWithBucketingStepImpl.java | 189 +++++++++++++ .../steps/DataWriterProcessorStepImpl.java | 79 +++--- .../newflow/steps/SortProcessorStepImpl.java | 11 +- .../sortdata/SortParameters.java | 28 ++ .../store/CarbonFactDataHandlerColumnar.java | 4 + .../store/CarbonFactDataHandlerModel.java | 9 +- .../SingleThreadFinalSortFilesMerger.java | 4 + .../store/writer/AbstractFactDataWriter.java | 6 +- .../store/writer/CarbonDataWriterVo.java | 9 + 44 files changed, 1417 insertions(+), 177 deletions(-) create mode 100644 core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java create mode 100644 core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java create mode 100644 core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java create mode 100644 integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala create mode 100644 processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java create mode 100644 processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java index e2218a82383..6ab18bb0cf5 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStore.java @@ -18,6 +18,7 @@ */ package org.apache.carbondata.core.carbon.datastore; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; @@ -57,7 +58,8 @@ public class SegmentTaskIndexStore { * reason of so many map as each segment can have multiple data file and * each file will have its own btree */ - private Map>> tableSegmentMap; + private Map>> tableSegmentMap; /** * map of block info to lock object map, while loading the btree this will be filled @@ -76,7 +78,7 @@ public class SegmentTaskIndexStore { private SegmentTaskIndexStore() { tableSegmentMap = - new ConcurrentHashMap>>( + new ConcurrentHashMap<>( CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); tableLockMap = new ConcurrentHashMap( CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); @@ -103,26 +105,26 @@ public static SegmentTaskIndexStore getInstance() { * @return map of taks id to segment mapping * @throws IndexBuilderException */ - public Map loadAndGetTaskIdToSegmentsMap( + public Map loadAndGetTaskIdToSegmentsMap( Map> segmentToTableBlocksInfos, AbsoluteTableIdentifier absoluteTableIdentifier) throws IndexBuilderException { // task id to segment map - Map taskIdToTableSegmentMap = - new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + Map taskIdToTableSegmentMap = + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); addLockObject(absoluteTableIdentifier); Iterator>> iteratorOverSegmentBlocksInfos = segmentToTableBlocksInfos.entrySet().iterator(); - Map> tableSegmentMapTemp = + Map> tableSegmentMapTemp = addTableSegmentMap(absoluteTableIdentifier); - Map taskIdToSegmentIndexMap = null; + Map taskIdToSegmentIndexMap = null; String segmentId = null; - String taskId = null; + TaskBucketHolder taskId = null; try { while (iteratorOverSegmentBlocksInfos.hasNext()) { // segment id to table block mapping Entry> next = iteratorOverSegmentBlocksInfos.next(); // group task id to table block info mapping for the segment - Map> taskIdToTableBlockInfoMap = + Map> taskIdToTableBlockInfoMap = mappedAndGetTaskIdToTableBlockInfo(segmentToTableBlocksInfos); // get the existing map of task id to table segment map segmentId = next.getKey(); @@ -142,11 +144,11 @@ public Map loadAndGetTaskIdToSegmentsMap( taskIdToSegmentIndexMap = tableSegmentMapTemp.get(segmentId); if (null == taskIdToSegmentIndexMap) { // creating a map of task id to table segment - taskIdToSegmentIndexMap = new ConcurrentHashMap(); - Iterator>> iterator = + taskIdToSegmentIndexMap = new ConcurrentHashMap(); + Iterator>> iterator = taskIdToTableBlockInfoMap.entrySet().iterator(); while (iterator.hasNext()) { - Entry> taskToBlockInfoList = iterator.next(); + Entry> taskToBlockInfoList = iterator.next(); taskId = taskToBlockInfoList.getKey(); taskIdToSegmentIndexMap.put(taskId, loadBlocks(taskId, taskToBlockInfoList.getValue(), absoluteTableIdentifier)); @@ -207,18 +209,18 @@ private synchronized void addLockObject(AbsoluteTableIdentifier absoluteTableIde * @param absoluteTableIdentifier * @return table segment map */ - private Map> addTableSegmentMap( + private Map> addTableSegmentMap( AbsoluteTableIdentifier absoluteTableIdentifier) { // get the instance of lock object Object lockObject = tableLockMap.get(absoluteTableIdentifier); - Map> tableSegmentMapTemp = + Map> tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier); if (null == tableSegmentMapTemp) { synchronized (lockObject) { // segment id to task id to table segment map tableSegmentMapTemp = tableSegmentMap.get(absoluteTableIdentifier); if (null == tableSegmentMapTemp) { - tableSegmentMapTemp = new ConcurrentHashMap>(); + tableSegmentMapTemp = new ConcurrentHashMap<>(); tableSegmentMap.put(absoluteTableIdentifier, tableSegmentMapTemp); } } @@ -233,12 +235,13 @@ private Map> addTableSegmentMap( * @return loaded segment * @throws CarbonUtilException */ - private AbstractIndex loadBlocks(String taskId, List tableBlockInfoList, + private AbstractIndex loadBlocks(TaskBucketHolder holder, List tableBlockInfoList, AbsoluteTableIdentifier tableIdentifier) throws CarbonUtilException { // all the block of one task id will be loaded together // so creating a list which will have all the data file meta data to of one task - List footerList = - CarbonUtil.readCarbonIndexFile(taskId, tableBlockInfoList, tableIdentifier); + List footerList = CarbonUtil + .readCarbonIndexFile(holder.taskNo, holder.bucketNumber, tableBlockInfoList, + tableIdentifier); AbstractIndex segment = new SegmentTaskIndex(); // file path of only first block is passed as it all table block info path of // same task id will be same @@ -253,10 +256,10 @@ private AbstractIndex loadBlocks(String taskId, List tableBlockI * @param segmentToTableBlocksInfos segment if to table blocks info map * @return task id to table block info mapping */ - private Map> mappedAndGetTaskIdToTableBlockInfo( + private Map> mappedAndGetTaskIdToTableBlockInfo( Map> segmentToTableBlocksInfos) { - Map> taskIdToTableBlockInfoMap = - new ConcurrentHashMap>(); + Map> taskIdToTableBlockInfoMap = + new ConcurrentHashMap<>(); Iterator>> iterator = segmentToTableBlocksInfos.entrySet().iterator(); while (iterator.hasNext()) { @@ -264,10 +267,12 @@ private Map> mappedAndGetTaskIdToTableBlockInfo( List value = next.getValue(); for (TableBlockInfo blockInfo : value) { String taskNo = DataFileUtil.getTaskNo(blockInfo.getFilePath()); - List list = taskIdToTableBlockInfoMap.get(taskNo); + String bucketNo = DataFileUtil.getBucketNo(blockInfo.getFilePath()); + TaskBucketHolder bucketHolder = new TaskBucketHolder(taskNo, bucketNo); + List list = taskIdToTableBlockInfoMap.get(bucketHolder); if (null == list) { list = new ArrayList(); - taskIdToTableBlockInfoMap.put(taskNo, list); + taskIdToTableBlockInfoMap.put(bucketHolder, list); } list.add(blockInfo); } @@ -304,7 +309,8 @@ public void removeTableBlocks(List segmentToBeRemoved, return; } // Acquire the lock and remove only those instance which was loaded - Map> map = tableSegmentMap.get(absoluteTableIdentifier); + Map> map = + tableSegmentMap.get(absoluteTableIdentifier); // if there is no loaded blocks then return if (null == map) { return; @@ -322,13 +328,44 @@ public void removeTableBlocks(List segmentToBeRemoved, * @param segmentId * @return is loaded then return the loaded blocks otherwise null */ - public Map getSegmentBTreeIfExists( + public Map getSegmentBTreeIfExists( AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) { - Map> tableSegment = + Map> tableSegment = tableSegmentMap.get(absoluteTableIdentifier); if (null == tableSegment) { return null; } return tableSegment.get(segmentId); } + + public static class TaskBucketHolder implements Serializable { + + public String taskNo; + + public String bucketNumber; + + public TaskBucketHolder(String taskNo, String bucketNumber) { + this.taskNo = taskNo; + this.bucketNumber = bucketNumber; + } + + @Override public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TaskBucketHolder that = (TaskBucketHolder) o; + + if (taskNo != null ? !taskNo.equals(that.taskNo) : that.taskNo != null) return false; + return bucketNumber != null ? + bucketNumber.equals(that.bucketNumber) : + that.bucketNumber == null; + + } + + @Override public int hashCode() { + int result = taskNo != null ? taskNo.hashCode() : 0; + result = 31 * result + (bucketNumber != null ? bucketNumber.hashCode() : 0); + return result; + } + } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java index 7d5386e0551..4b2be5f7558 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/converter/ThriftWrapperSchemaConverterImpl.java @@ -23,6 +23,7 @@ import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution; import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolutionEntry; import org.apache.carbondata.core.carbon.metadata.schema.table.TableInfo; @@ -190,9 +191,24 @@ private org.apache.carbondata.format.DataType fromWrapperToExternalDataType(Data new org.apache.carbondata.format.TableSchema( wrapperTableSchema.getTableId(), thriftColumnSchema, schemaEvolution); externalTableSchema.setTableProperties(wrapperTableSchema.getTableProperties()); + if (wrapperTableSchema.getBucketingInfo() != null) { + externalTableSchema.setBucketingInfo( + fromWrapperToExternalBucketingInfo(wrapperTableSchema.getBucketingInfo())); + } return externalTableSchema; } + private org.apache.carbondata.format.BucketingInfo fromWrapperToExternalBucketingInfo( + BucketingInfo bucketingInfo) { + List thriftColumnSchema = + new ArrayList(); + for (ColumnSchema wrapperColumnSchema : bucketingInfo.getListOfColumns()) { + thriftColumnSchema.add(fromWrapperToExternalColumnSchema(wrapperColumnSchema)); + } + return new org.apache.carbondata.format.BucketingInfo(thriftColumnSchema, + bucketingInfo.getNumberOfBuckets()); + } + /* (non-Javadoc) * convert from wrapper to external tableinfo */ @@ -365,9 +381,23 @@ private DataType fromExternalToWrapperDataType(org.apache.carbondata.format.Data wrapperTableSchema.setListOfColumns(listOfColumns); wrapperTableSchema.setSchemaEvalution( fromExternalToWrapperSchemaEvolution(externalTableSchema.getSchema_evolution())); + if (externalTableSchema.isSetBucketingInfo()) { + wrapperTableSchema.setBucketingInfo( + fromExternalToWarpperBucketingInfo(externalTableSchema.bucketingInfo)); + } return wrapperTableSchema; } + private BucketingInfo fromExternalToWarpperBucketingInfo( + org.apache.carbondata.format.BucketingInfo externalBucketInfo) { + List listOfColumns = new ArrayList(); + for (org.apache.carbondata.format.ColumnSchema externalColumnSchema : + externalBucketInfo.table_columns) { + listOfColumns.add(fromExternalToWrapperColumnSchema(externalColumnSchema)); + } + return new BucketingInfo(listOfColumns, externalBucketInfo.number_of_buckets); + } + /* (non-Javadoc) * convert from external to wrapper tableinfo */ diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java new file mode 100644 index 00000000000..75c888da0d6 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/BucketingInfo.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.core.carbon.metadata.schema; + +import java.io.Serializable; +import java.util.List; + +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; + +/** + * Bucketing information + */ +public class BucketingInfo implements Serializable { + + private List listOfColumns; + + private int numberOfBuckets; + + public BucketingInfo(List listOfColumns, int numberOfBuckets) { + this.listOfColumns = listOfColumns; + this.numberOfBuckets = numberOfBuckets; + } + + public List getListOfColumns() { + return listOfColumns; + } + + public int getNumberOfBuckets() { + return numberOfBuckets; + } + +} diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java index d3e2e62c212..77666165b36 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/CarbonTable.java @@ -33,6 +33,7 @@ import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; @@ -71,6 +72,11 @@ public class CarbonTable implements Serializable { */ private Map> tableMeasuresMap; + /** + * table bucket map. + */ + private Map tableBucketMap; + /** * tableUniqueName */ @@ -99,6 +105,7 @@ public class CarbonTable implements Serializable { public CarbonTable() { this.tableDimensionsMap = new HashMap>(); this.tableMeasuresMap = new HashMap>(); + this.tableBucketMap = new HashMap<>(); this.aggregateTablesName = new ArrayList(); this.createOrderColumn = new HashMap>(); } @@ -124,7 +131,10 @@ public void loadCarbonTable(TableInfo tableInfo) { for (TableSchema aggTable : aggregateTableList) { this.aggregateTablesName.add(aggTable.getTableName()); fillDimensionsAndMeasuresForTables(aggTable); + tableBucketMap.put(aggTable.getTableName(), aggTable.getBucketingInfo()); } + tableBucketMap.put(tableInfo.getFactTable().getTableName(), + tableInfo.getFactTable().getBucketingInfo()); } /** @@ -474,6 +484,10 @@ public List getChildren(String dimName, List d return null; } + public BucketingInfo getBucketingInfo(String tableName) { + return tableBucketMap.get(tableName); + } + /** * @return absolute table identifier */ diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java index 348f235071a..9beeff2dcdb 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/metadata/schema/table/TableSchema.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; import org.apache.carbondata.core.carbon.metadata.schema.SchemaEvolution; import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -62,6 +63,11 @@ public class TableSchema implements Serializable { */ private Map tableProperties; + /** + * Information about bucketing of fields and number of buckets + */ + private BucketingInfo bucketingInfo; + public TableSchema() { this.listOfColumns = new ArrayList(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); } @@ -202,4 +208,12 @@ public Map getTableProperties() { public void setTableProperties(Map tableProperties) { this.tableProperties = tableProperties; } + + public BucketingInfo getBucketingInfo() { + return bucketingInfo; + } + + public void setBucketingInfo(BucketingInfo bucketingInfo) { + this.bucketingInfo = bucketingInfo; + } } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java index 54e72667a04..cda971af3ca 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/path/CarbonTablePath.java @@ -215,9 +215,9 @@ public String getTableStatusFilePath() { * @return absolute path of data file stored in carbon data format */ public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo, - Integer taskNo, String factUpdateTimeStamp) { + Integer taskNo, int bucketNumber, String factUpdateTimeStamp) { return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName( - filePartNo, taskNo, factUpdateTimeStamp); + filePartNo, taskNo, bucketNumber, factUpdateTimeStamp); } /** @@ -230,14 +230,15 @@ public String getCarbonDataFilePath(String partitionId, String segmentId, Intege * @return full qualified carbon index path */ public String getCarbonIndexFilePath(final String taskId, final String partitionId, - final String segmentId) { + final String segmentId, final String bucketNumber) { String segmentDir = getSegmentDir(partitionId, segmentId); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)); CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { - return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT); + return file.getName().startsWith(taskId + "-" + bucketNumber) && file.getName() + .endsWith(INDEX_FILE_EXT); } }); return files[0].getAbsolutePath(); @@ -262,10 +263,10 @@ public String getCarbonDataDirectoryPath(String partitionId, String segmentId) { * @param factUpdateTimeStamp unique identifier to identify an update * @return gets data file name only with out path */ - public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, + public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, int bucketNumber, String factUpdateTimeStamp) { - return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + factUpdateTimeStamp - + CARBON_DATA_EXT; + return DATA_PART_PREFIX + "-" + filePartNo + "-" + taskNo + "-" + bucketNumber + "-" + + factUpdateTimeStamp + CARBON_DATA_EXT; } /** @@ -275,8 +276,8 @@ public String getCarbonDataFileName(Integer filePartNo, Integer taskNo, * @param factUpdatedTimeStamp time stamp * @return filename */ - public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp) { - return taskNo + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT; + public String getCarbonIndexFileName(int taskNo, int bucketNumber, String factUpdatedTimeStamp) { + return taskNo + "-" + bucketNumber + "-" + factUpdatedTimeStamp + INDEX_FILE_EXT; } private String getSegmentDir(String partitionId, String segmentId) { @@ -351,6 +352,24 @@ public static String getTaskNo(String carbonDataFileName) { return fileName.substring(startIndex, endIndex); } + /** + * gets updated timestamp information from given carbon data file name + */ + public static String getBucketNo(String carbonFilePath) { + // Get the file name from path + String fileName = getFileName(carbonFilePath); + // + 1 for size of "-" + int firstDashPos = fileName.indexOf("-"); + int secondDash = fileName.indexOf("-", firstDashPos + 1); + int startIndex = fileName.indexOf("-", secondDash + 1) + 1; + int endIndex = fileName.indexOf("-", startIndex); + // to support backward compatibility + if (startIndex == -1 || endIndex == -1) { + return "0"; + } + return fileName.substring(startIndex, endIndex); + } + /** * Gets the file name from file path */ diff --git a/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java new file mode 100644 index 00000000000..1907687fdfd --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/partition/Partitioner.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.partition; + +/** + * Partitions the data as per key + */ +public interface Partitioner { + + int getPartition(Key key); + +} diff --git a/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java new file mode 100644 index 00000000000..a702a6be902 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/core/partition/impl/HashPartitionerImpl.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.carbondata.core.partition.impl; + +import java.util.List; + +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.partition.Partitioner; + +/** + * Hash partitioner implementation + */ +public class HashPartitionerImpl implements Partitioner { + + private int numberOfBuckets; + + private Hash[] hashes; + + public HashPartitionerImpl(List indexes, List columnSchemas, + int numberOfBuckets) { + this.numberOfBuckets = numberOfBuckets; + hashes = new Hash[indexes.size()]; + for (int i = 0; i < indexes.size(); i++) { + switch(columnSchemas.get(i).getDataType()) { + case SHORT: + case INT: + case LONG: + hashes[i] = new IntegralHash(indexes.get(i)); + break; + case DOUBLE: + case FLOAT: + case DECIMAL: + hashes[i] = new DecimalHash(indexes.get(i)); + break; + default: + hashes[i] = new StringHash(indexes.get(i)); + } + } + } + + @Override public int getPartition(Object[] objects) { + int hashCode = 0; + for (Hash hash : hashes) { + hashCode += hash.getHash(objects); + } + return (hashCode & Integer.MAX_VALUE) % numberOfBuckets; + } + + private interface Hash { + int getHash(Object[] value); + } + + private static class IntegralHash implements Hash { + + private int index; + + private IntegralHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + return value[index] != null ? Long.valueOf(value[index].toString()).hashCode() : 0; + } + } + + private static class DecimalHash implements Hash { + + private int index; + + private DecimalHash(int index) { + this.index = index; + } + + public int getHash(Object[] value) { + return value[index] != null ? Double.valueOf(value[index].toString()).hashCode() : 0; + } + } + + private static class StringHash implements Hash { + + private int index; + + private StringHash(int index) { + this.index = index; + } + + @Override public int getHash(Object[] value) { + return value[index] != null ? value[index].hashCode() : 0; + } + } +} diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java index 747862f3b1c..3415d92d022 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java @@ -467,7 +467,7 @@ private static void setBlockletIndex(FileFooter footer, * @return Index header object */ public static IndexHeader getIndexHeader(int[] columnCardinality, - List columnSchemaList) { + List columnSchemaList, int bucketNumber) { // create segment info object SegmentInfo segmentInfo = new SegmentInfo(); // set the number of columns @@ -482,6 +482,8 @@ public static IndexHeader getIndexHeader(int[] columnCardinality, indexHeader.setSegment_info(segmentInfo); // set the column names indexHeader.setTable_columns(columnSchemaList); + // set the bucket number + indexHeader.setBucket_id(bucketNumber); return indexHeader; } diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index 3980cb3cf08..fbbed76a426 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1093,7 +1093,7 @@ private static void fillCollumnSchemaListForComplexDims( * @return list of block info * @throws CarbonUtilException if any problem while reading */ - public static List readCarbonIndexFile(String taskId, + public static List readCarbonIndexFile(String taskId, String bucketNumber, List tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) throws CarbonUtilException { // need to sort the block info list based for task in ascending order so @@ -1105,7 +1105,8 @@ public static List readCarbonIndexFile(String taskId, // geting the index file path //TODO need to pass proper partition number when partiton will be supported String carbonIndexFilePath = carbonTablePath - .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId()); + .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + bucketNumber); DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter(); try { // read the index info and return diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java index 7cb213cf740..722d030ed59 100644 --- a/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java +++ b/core/src/test/java/org/apache/carbondata/core/carbon/datastore/SegmentTaskIndexStoreTest.java @@ -90,6 +90,7 @@ private List getDataFileFooters() { new MockUp() { @Mock List readCarbonIndexFile(String taskId, + String bucketNumber, List tableBlockInfoList, AbsoluteTableIdentifier absoluteTableIdentifier) { return getDataFileFooters(); @@ -101,18 +102,17 @@ private List getDataFileFooters() { } }; - Map result = + Map result = taskIndexStore.loadAndGetTaskIdToSegmentsMap(new HashMap>() {{ put("SG100", Arrays.asList(tableBlockInfo)); }}, absoluteTableIdentifier); assertEquals(result.size(), 1); - assertTrue(result.containsKey(new String("100"))); - + assertTrue(result.containsKey(new SegmentTaskIndexStore.TaskBucketHolder("100", "0"))); } @Test public void checkExistenceOfSegmentBTree() { - Map result = + Map result = taskIndexStore.getSegmentBTreeIfExists(absoluteTableIdentifier, "SG100"); assertNull(result); } diff --git a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java index 437a13f6e6a..95dd5d71143 100644 --- a/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java +++ b/core/src/test/java/org/apache/carbondata/core/path/CarbonFormatDirectoryStructureTest.java @@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest { .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta")); assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/") .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex")); - assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, "999").replace("\\", "/") - .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-999.carbondata")); + assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4, 0, "999").replace("\\", "/") + .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4-0-999.carbondata")); } /** diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java index b3647a8a0f2..b7d3a01bd1f 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java @@ -160,7 +160,8 @@ public class CarbonMetadataUtilTest { indexHeader.setVersion(2); indexHeader.setSegment_info(segmentInfo); indexHeader.setTable_columns(columnSchemaList); - IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList); + indexHeader.setBucket_id(0); + IndexHeader indexheaderResult = getIndexHeader(columnCardinality, columnSchemaList, 0); assertEquals(indexHeader, indexheaderResult); } diff --git a/format/src/main/thrift/carbondata_index.thrift b/format/src/main/thrift/carbondata_index.thrift index e5fda5db11b..364a7e57aa1 100644 --- a/format/src/main/thrift/carbondata_index.thrift +++ b/format/src/main/thrift/carbondata_index.thrift @@ -32,6 +32,7 @@ struct IndexHeader{ 1: required i32 version; // version used for data compatibility 2: required list table_columns; // Description of columns in this file 3: required carbondata.SegmentInfo segment_info; // Segment info (will be same/repeated for all files in this segment) + 4: optional i32 bucket_id; //bucket number in which file contains } /** diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift index 377c3721138..775573cfd04 100644 --- a/format/src/main/thrift/schema.thrift +++ b/format/src/main/thrift/schema.thrift @@ -121,6 +121,14 @@ struct SchemaEvolution{ 1: required list schema_evolution_history; } +/** +* Bucketing information of fields on table +**/ +struct BucketingInfo{ + 1: required list table_columns; + 2: required i32 number_of_buckets; +} + /** * The description of table schema */ @@ -129,6 +137,7 @@ struct TableSchema{ 2: required list table_columns; // Columns in the table 3: required SchemaEvolution schema_evolution; // History of schema evolution of this table 4: optional map tableProperties; // table properties configured bu the user + 5: optional BucketingInfo bucketingInfo; // bucketing information } struct TableInfo{ diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index aa995fa4dfe..b69df860f43 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -42,7 +42,9 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.carbon.path.CarbonStorePath; import org.apache.carbondata.core.carbon.path.CarbonTablePath; -import org.apache.carbondata.core.carbon.querystatistics.*; +import org.apache.carbondata.core.carbon.querystatistics.QueryStatistic; +import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsConstants; +import org.apache.carbondata.core.carbon.querystatistics.QueryStatisticsRecorder; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; @@ -319,7 +321,7 @@ private List getDataBlocksOfSegment(JobContext job, String segmentId) throws IndexBuilderException, IOException { QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); - Map segmentIndexMap = + Map segmentIndexMap = getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId); List resultFilterredBlocks = new LinkedList(); @@ -379,10 +381,20 @@ private List getTableBlockInfo(JobContext job, String segmentId) return tableBlockInfoList; } - private Map getSegmentAbstractIndexs(JobContext job, - AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) + /** + * It returns index for each task file. + * @param job + * @param absoluteTableIdentifier + * @param segmentId + * @return + * @throws IOException + * @throws IndexBuilderException + */ + private Map getSegmentAbstractIndexs( + JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException, IndexBuilderException { - Map segmentIndexMap = SegmentTaskIndexStore.getInstance() + Map segmentIndexMap = + SegmentTaskIndexStore.getInstance() .getSegmentBTreeIfExists(absoluteTableIdentifier, segmentId); // if segment tree is not loaded, load the segment tree diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 8b87cad4d80..a4acd9c8516 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -45,7 +45,10 @@ public class CarbonInputSplit extends FileSplit private static final long serialVersionUID = 3520344046772190207L; public String taskId; + private String segmentId; + + private String bucketId; /* * Invalid segments that need to be removed in task side index */ @@ -61,6 +64,7 @@ public class CarbonInputSplit extends FileSplit public CarbonInputSplit() { segmentId = null; taskId = "0"; + bucketId = "0"; numberOfBlocklets = 0; invalidSegments = new ArrayList<>(); version = CarbonProperties.getInstance().getFormatVersion(); @@ -71,6 +75,7 @@ private CarbonInputSplit(String segmentId, Path path, long start, long length, S super(path, start, length, locations); this.segmentId = segmentId; this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName()); this.invalidSegments = new ArrayList<>(); this.version = version; } @@ -124,6 +129,7 @@ public String getSegmentId() { super.readFields(in); this.segmentId = in.readUTF(); this.version = ColumnarFormatVersion.valueOf(in.readShort()); + this.bucketId = in.readUTF(); int numInvalidSegment = in.readInt(); invalidSegments = new ArrayList<>(numInvalidSegment); for (int i = 0; i < numInvalidSegment; i++) { @@ -135,6 +141,7 @@ public String getSegmentId() { super.write(out); out.writeUTF(segmentId); out.writeShort(version.number()); + out.writeUTF(bucketId); out.writeInt(invalidSegments.size()); for (String invalidSegment : invalidSegments) { out.writeUTF(invalidSegment); @@ -166,6 +173,10 @@ public void setVersion(ColumnarFormatVersion version) { this.version = version; } + public String getBucketId() { + return bucketId; + } + @Override public int compareTo(Distributable o) { CarbonInputSplit other = (CarbonInputSplit) o; int compareResult = 0; @@ -193,6 +204,13 @@ public void setVersion(ColumnarFormatVersion version) { if (firstTaskId != otherTaskId) { return firstTaskId - otherTaskId; } + + int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1)); + int otherBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath2)); + if (firstBucketNo != otherBucketNo) { + return firstBucketNo - otherBucketNo; + } + // compare the part no of both block info int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1)); int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2)); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java index a13d6bab399..26b5252607e 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -42,19 +42,19 @@ public class CarbonMultiBlockSplit extends InputSplit implements Writable { private List splitList; /* - * The location of all wrapped splits belong to the same node + * The locations of all wrapped splits */ - private String location; + private String[] locations; public CarbonMultiBlockSplit() { splitList = null; - location = null; + locations = null; } public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List splitList, - String location) throws IOException { + String[] locations) throws IOException { this.splitList = splitList; - this.location = location; + this.locations = locations; } /** @@ -76,7 +76,7 @@ public long getLength() throws IOException, InterruptedException { @Override public String[] getLocations() throws IOException, InterruptedException { - return new String[]{location}; + return locations; } @Override @@ -86,7 +86,10 @@ public void write(DataOutput out) throws IOException { for (CarbonInputSplit split: splitList) { split.write(out); } - out.writeUTF(location); + out.writeInt(locations.length); + for (int i = 0; i < locations.length; i++) { + out.writeUTF(locations[i]); + } } @Override @@ -99,7 +102,11 @@ public void readFields(DataInput in) throws IOException { split.readFields(in); splitList.add(split); } - location = in.readUTF(); + int len = in.readInt(); + locations = new String[len]; + for (int i = 0; i < len; i++) { + locations[i] = in.readUTF(); + } } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java index c238e10c9ad..82bcf1c6a4c 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/internal/index/impl/InMemoryBTreeIndex.java @@ -100,10 +100,10 @@ public List filter(JobContext job, FilterResolverIntf filter) return result; } - private Map getSegmentAbstractIndexs(JobContext job, - AbsoluteTableIdentifier identifier) + private Map getSegmentAbstractIndexs( + JobContext job, AbsoluteTableIdentifier identifier) throws IOException, IndexBuilderException { - Map segmentIndexMap = + Map segmentIndexMap = SegmentTaskIndexStore.getInstance().getSegmentBTreeIfExists(identifier, segment.getId()); // if segment tree is not loaded, load the segment tree @@ -153,7 +153,8 @@ private List getDataBlocksOfSegment(JobContext job, QueryStatisticsRecorder recorder = CarbonTimeStatisticsFactory.createDriverRecorder(); QueryStatistic statistic = new QueryStatistic(); - Map segmentIndexMap = getSegmentAbstractIndexs(job, identifier); + Map segmentIndexMap = + getSegmentAbstractIndexs(job, identifier); List resultFilterredBlocks = new LinkedList(); diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index 213712e4e18..c63b43d1f48 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) { def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean + def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt + + def bucketColumns: String = options.getOrElse("bucketcolumns", "") + + def isBucketingEnabled: Boolean = options.contains("bucketcolumns") && + options.contains("bucketnumber") + def toMap: Map[String, String] = options } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index ccaf9e368fd..99dc853ba84 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -302,7 +302,7 @@ class CarbonMergerRDD[K, V]( } if (blockletCount != 0) { val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier, - carbonInputSplits.asJava, nodeName) + carbonInputSplits.asJava, Array(nodeName)) result.add(new CarbonSparkPartition(id, i, multiBlockSplit)) i += 1 } diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 2705f942a8d..5d972bfa66b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -65,6 +65,8 @@ class CarbonScanRDD( private val readSupport = SparkReadSupport.readSupportClass + private val bucketedTable = carbonTable.getBucketingInfo(carbonTable.getFactTableName) + @transient private val jobId = new JobID(jobTrackerId, id) @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) @@ -95,36 +97,54 @@ class CarbonScanRDD( var noOfTasks = 0 if (!splits.isEmpty) { - // create a list of block based on split - val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) - - // get the list of executors and map blocks to executors based on locality - val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) - - // divide the blocks among the tasks of the nodes as per the data locality - val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, - parallelism, activeNodes.toList.asJava) statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) statisticRecorder.recordStatisticsForDriver(statistic, queryId) statistic = new QueryStatistic() - var i = 0 - // Create Spark Partition for each task and assign blocks - nodeBlockMapping.asScala.foreach { case (node, blockList) => - blockList.asScala.foreach { blocksPerTask => - val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) - if (blocksPerTask.size() != 0) { - val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node) - val partition = new CarbonSparkPartition(id, i, multiBlockSplit) - result.add(partition) - i += 1 + // If bucketing is enabled on table then partitions should be grouped based on buckets. + if (bucketedTable != null) { + var i = 0 + val bucketed = + splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).groupBy(f => f.getBucketId) + (0 until bucketedTable.getNumberOfBuckets).map { bucketId => + val bucketPartitions = bucketed.getOrElse(bucketId.toString, Nil) + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, + bucketPartitions.asJava, + bucketPartitions.flatMap(_.getLocations).toArray) + val partition = new CarbonSparkPartition(id, i, multiBlockSplit) + i += 1 + result.add(partition) + } + } else { + // create a list of block based on split + val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) + + // get the list of executors and map blocks to executors based on locality + val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) + + // divide the blocks among the tasks of the nodes as per the data locality + val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, + parallelism, activeNodes.toList.asJava) + var i = 0 + // Create Spark Partition for each task and assign blocks + nodeBlockMapping.asScala.foreach { case (node, blockList) => + blockList.asScala.foreach { blocksPerTask => + val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) + if (blocksPerTask.size() != 0) { + val multiBlockSplit = + new CarbonMultiBlockSplit(identifier, splits.asJava, Array(node)) + val partition = new CarbonSparkPartition(id, i, multiBlockSplit) + result.add(partition) + i += 1 + } } } + noOfNodes = nodeBlockMapping.size } noOfBlocks = splits.size - noOfNodes = nodeBlockMapping.size noOfTasks = result.size() statistic = new QueryStatistic() @@ -155,58 +175,68 @@ class CarbonScanRDD( val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId) val format = prepareInputFormatForExecutor(attemptContext.getConfiguration) val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value - val model = format.getQueryModel(inputSplit, attemptContext) - val reader = { - if (vectorReader) { - val carbonRecordReader = createVectorizedCarbonRecordReader(model) - if (carbonRecordReader == null) { - new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration)) + val iterator = if (inputSplit.getAllSplits.size() > 0) { + val model = format.getQueryModel(inputSplit, attemptContext) + val reader = { + if (vectorReader) { + val carbonRecordReader = createVectorizedCarbonRecordReader(model) + if (carbonRecordReader == null) { + new CarbonRecordReader(model, + format.getReadSupportClass(attemptContext.getConfiguration)) + } else { + carbonRecordReader + } } else { - carbonRecordReader + new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration)) } - } else { - new CarbonRecordReader(model, format.getReadSupportClass(attemptContext.getConfiguration)) } - } - reader.initialize(inputSplit, attemptContext) + reader.initialize(inputSplit, attemptContext) + val queryStartTime = System.currentTimeMillis - val queryStartTime = System.currentTimeMillis + new Iterator[Any] { + private var havePair = false + private var finished = false + private var count = 0 - val iterator = new Iterator[Any] { - private var havePair = false - private var finished = false - private var count = 0 - - context.addTaskCompletionListener { context => - logStatistics(queryStartTime, count) - reader.close() - } + context.addTaskCompletionListener { context => + logStatistics(queryStartTime, count) + reader.close() + } - override def hasNext: Boolean = { - if (context.isInterrupted) { - throw new TaskKilledException + override def hasNext: Boolean = { + if (context.isInterrupted) { + throw new TaskKilledException + } + if (!finished && !havePair) { + finished = !reader.nextKeyValue + if (finished) { + reader.close() + } + havePair = !finished + } + !finished } - if (!finished && !havePair) { - finished = !reader.nextKeyValue - if (finished) { - reader.close() + + override def next(): Any = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") } - havePair = !finished + havePair = false + val value = reader.getCurrentValue + count += 1 + value } - !finished } + } else { + new Iterator[Any] { + override def hasNext: Boolean = false - override def next(): Any = { - if (!hasNext) { - throw new java.util.NoSuchElementException("End of stream") - } - havePair = false - val value = reader.getCurrentValue - count += 1 - value + override def next(): Any = throw new java.util.NoSuchElementException("End of stream") } } + + iterator.asInstanceOf[Iterator[InternalRow]] } diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala index a5088df90d7..461633dc067 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala @@ -223,8 +223,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String] , tableName: String, fields: Seq[Field], partitionCols: Seq[PartitionerField], - tableProperties: Map[String, String]): TableModel - = { + tableProperties: Map[String, String], + bucketFields: Option[BucketFields]): TableModel = { fields.zipWithIndex.foreach { x => x._1.schemaOrdinal = x._2 @@ -268,7 +268,8 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser { Option(noDictionaryDims), Option(noInvertedIdxCols), groupCols, - Some(colProps)) + Some(colProps), + bucketFields: Option[BucketFields]) } /** diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index f646f1db2ba..ec064ed74cf 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -29,7 +29,7 @@ import org.apache.carbondata.common.factory.CarbonCommonFactory import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.metadata.datatype.DataType import org.apache.carbondata.core.carbon.metadata.encoder.Encoding -import org.apache.carbondata.core.carbon.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry} +import org.apache.carbondata.core.carbon.metadata.schema.{BucketingInfo, SchemaEvolution, SchemaEvolutionEntry} import org.apache.carbondata.core.carbon.metadata.schema.table.{CarbonTable, TableInfo, TableSchema} import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -50,7 +50,9 @@ case class TableModel( highcardinalitydims: Option[Seq[String]], noInvertedIdxCols: Option[Seq[String]], columnGroups: Seq[String], - colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None) + colProps: Option[util.Map[String, + util.List[ColumnProperty]]] = None, + bucketFields: Option[BucketFields]) case class Field(column: String, var dataType: Option[String], name: Option[String], children: Option[List[Field]], parent: String = null, @@ -69,6 +71,8 @@ case class Partitioner(partitionClass: String, partitionColumn: Array[String], p case class PartitionerField(partitionColumn: String, dataType: Option[String], columnComment: String) +case class BucketFields(bucketColumns: Seq[String], numberOfBuckets: Int) + case class DataLoadTableFileMapping(table: String, loadPath: String) case class CarbonMergerMapping(storeLocation: String, @@ -300,6 +304,27 @@ class TableNewProcessor(cm: TableModel) { x => tablePropertiesMap.put(x._1, x._2) } tableSchema.setTableProperties(tablePropertiesMap) + if (cm.bucketFields.isDefined) { + val bucketCols = cm.bucketFields.get.bucketColumns.map { b => + val col = allColumns.find(_.getColumnName.equalsIgnoreCase(b)) + col match { + case Some(colSchema: ColumnSchema) => + if (colSchema.isDimensionColumn && !colSchema.isComplex) { + colSchema + } else { + LOGGER.error(s"Bucket field must be dimension column and " + + s"should not be measure or complex column: ${colSchema.getColumnName}") + sys.error(s"Bucket field must be dimension column and " + + s"should not be measure or complex column: ${colSchema.getColumnName}") + } + case _ => + LOGGER.error(s"Bucket field is not present in table columns") + sys.error(s"Bucket field is not present in table columns") + } + } + tableSchema.setBucketingInfo( + new BucketingInfo(bucketCols.asJava, cm.bucketFields.get.numberOfBuckets)) + } tableSchema.setTableName(cm.tableName) tableSchema.setListOfColumns(allColumns.asJava) tableSchema.setSchemaEvalution(schemaEvol) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala index 16e35f4b76c..7318e2786b4 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonSqlParser.scala @@ -159,6 +159,7 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { var ifNotExistPresent: Boolean = false var dbName: Option[String] = None var tableName: String = "" + var bucketFields: Option[BucketFields] = None try { @@ -252,6 +253,13 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { case Token("TOK_LIKETABLE", child :: Nil) => likeTableName = child.getChild(0).getText() + case Token("TOK_ALTERTABLE_BUCKETS", + Token("TOK_TABCOLNAME", list)::numberOfBuckets) => + val cols = list.map(_.getText) + if (cols != null) { + bucketFields = Some(BucketFields(cols, + numberOfBuckets.head.getText.toInt)) + } case _ => // Unsupport features } @@ -267,7 +275,8 @@ class CarbonSqlParser() extends CarbonDDLSqlParser { tableName, fields, partitionCols, - tableProperties) + tableProperties, + bucketFields) // get logical plan. CreateTable(tableModel) diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index b02d467fd83..3b87e41e3c4 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -46,5 +46,12 @@ class CarbonOption(options: Map[String, String]) { def useKettle: Boolean = options.getOrElse("useKettle", "true").toBoolean + def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt + + def bucketColumns: String = options.getOrElse("bucketcolumns", "") + + def isBucketingEnabled: Boolean = options.contains("bucketcolumns") && + options.contains("bucketnumber") + def toMap: Map[String, String] = options } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala index 8a946c00597..d03c90c92b7 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala @@ -24,7 +24,7 @@ import scala.language.implicitConversions import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.CarbonLateDecodeStrategy -import org.apache.spark.sql.execution.command.{CreateTable, Field} +import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field} import org.apache.spark.sql.optimizer.CarbonLateDecodeRule import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.{DecimalType, StructType} @@ -108,7 +108,7 @@ class CarbonSource extends CreatableRelationProvider val dbName: String = parameters.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME) val tableName: String = parameters.getOrElse("tableName", "default_table") - + val options = new CarbonOption(parameters) try { CarbonEnv.get.carbonMetastore.lookupRelation(Option(dbName), tableName)(sparkSession) CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName" @@ -132,7 +132,15 @@ class CarbonSource extends CreatableRelationProvider } val map = scala.collection.mutable.Map[String, String]() parameters.foreach { x => map.put(x._1, x._2) } - val cm = TableCreator.prepareTableModel(false, Option(dbName), tableName, fields, Nil, map) + val bucketFields = { + if (options.isBucketingEnabled) { + Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber)) + } else { + None + } + } + val cm = TableCreator.prepareTableModel(false, Option(dbName), + tableName, fields, Nil, bucketFields, map) CreateTable(cm, false).run(sparkSession) CarbonEnv.get.carbonMetastore.storePath + s"/$dbName/$tableName" case ex: Exception => diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala index 362c9511a77..530e70efae3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/TableCreator.scala @@ -21,7 +21,7 @@ import java.util.regex.{Matcher, Pattern} import scala.collection.mutable.{LinkedHashSet, Map} -import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField, TableModel} +import org.apache.spark.sql.execution.command.{BucketFields, ColumnProperty, Field, PartitionerField, TableModel} import org.apache.carbondata.core.carbon.metadata.datatype.DataType import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -442,6 +442,7 @@ object TableCreator { def prepareTableModel(ifNotExistPresent: Boolean, dbName: Option[String] , tableName: String, fields: Seq[Field], partitionCols: Seq[PartitionerField], + bucketFields: Option[BucketFields], tableProperties: Map[String, String]): TableModel = { @@ -483,7 +484,8 @@ object TableCreator { Option(noDictionaryDims), Option(noInvertedIdxCols), groupCols, - Some(colProps)) + Some(colProps), + bucketFields) } } diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index fe8bbe7602a..de768c02ee3 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -17,24 +17,28 @@ package org.apache.spark.sql.execution +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, _} import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.optimizer.CarbonDecoderRelation import org.apache.spark.sql.sources.{BaseRelation, Filter} import org.apache.spark.sql.types.{AtomicType, IntegerType} +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD +import org.apache.carbondata.spark.util.CarbonScalaUtil /** * Carbon strategy for late decode (convert dictionary key to value as late as possible), which @@ -248,20 +252,21 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { metadata: Map[String, String], needDecoder: ArrayBuffer[AttributeReference], updateRequestedColumns: Seq[AttributeReference]): DataSourceScanExec = { + val table = relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation] if (supportBatchedDataSource(relation.relation.sqlContext, updateRequestedColumns) && needDecoder.isEmpty) { BatchedDataSourceScanExec( output, scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), relation.relation, - UnknownPartitioning(0), + getPartitioning(table.carbonTable, updateRequestedColumns), metadata, relation.metastoreTableIdentifier) } else { RowDataSourceScanExec(output, scanBuilder(updateRequestedColumns, candidatePredicates, pushedFilters, needDecoder), relation.relation, - UnknownPartitioning(0), + getPartitioning(table.carbonTable, updateRequestedColumns), metadata, relation.metastoreTableIdentifier) } @@ -288,6 +293,35 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } } + private def getPartitioning(carbonTable: CarbonTable, + output: Seq[AttributeReference]): Partitioning = { + val info: BucketingInfo = carbonTable.getBucketingInfo(carbonTable.getFactTableName) + if (info != null) { + val cols = info.getListOfColumns.asScala + val sortColumn = carbonTable. + getDimensionByTableName(carbonTable.getFactTableName).get(0).getColName + val numBuckets = info.getNumberOfBuckets + val bucketColumns = cols.flatMap { n => + val attrRef = output.find(_.name.equalsIgnoreCase(n.getColumnName)) + attrRef match { + case Some(attr) => + Some(AttributeReference(attr.name, + CarbonScalaUtil.convertCarbonToSparkDataType(n.getDataType), + attr.nullable, + attr.metadata)(attr.exprId, attr.qualifier)) + case _ => None + } + } + if (bucketColumns.size == cols.size) { + HashPartitioning(bucketColumns, numBuckets) + } else { + UnknownPartitioning(0) + } + } else { + UnknownPartitioning(0) + } + } + protected[sql] def selectFilters( relation: BaseRelation, diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala index 5a91ad1e9e3..342cabc2f5c 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala @@ -24,10 +24,11 @@ import org.apache.spark.sql.catalyst.parser.ParserUtils._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser.{ColTypeListContext, CreateTableContext, TablePropertyListContext} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkSqlAstBuilder -import org.apache.spark.sql.execution.command.{CreateTable, Field, TableModel} +import org.apache.spark.sql.execution.command.{BucketFields, CreateTable, Field, TableModel} import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.sql.types.DataType +import org.apache.carbondata.spark.CarbonOption import org.apache.carbondata.spark.exception.MalformedCarbonCommandException import org.apache.carbondata.spark.util.CommonUtil @@ -132,13 +133,22 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) { if (!CommonUtil.validateTblProperties(properties.asJava.asScala, fields)) { throw new MalformedCarbonCommandException("Invalid table properties") } + val options = new CarbonOption(properties) + val bucketFields = { + if (options.isBucketingEnabled) { + Some(BucketFields(options.bucketColumns.split(","), options.bucketNumber)) + } else { + None + } + } // prepare table model of the collected tokens val tableModel: TableModel = parser.prepareTableModel(ifNotExists, name.database, name.table, fields, Seq(), - properties.asJava.asScala) + properties.asJava.asScala, + bucketFields) CreateTable(tableModel) } else { diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala new file mode 100644 index 00000000000..c480d309d6d --- /dev/null +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.carbondata.bucketing + +import java.io.File + +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.common.util.QueryTest +import org.apache.spark.sql.execution.command.LoadTable +import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.scalatest.BeforeAndAfterAll + +import org.apache.carbondata.core.carbon.metadata.CarbonMetadata +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.util.CarbonProperties + +class TableBucketingTestCase extends QueryTest with BeforeAndAfterAll { + + override def beforeAll { + + // clean data folder + clean + + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") + spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS t4") + spark.sql("DROP TABLE IF EXISTS t5") + spark.sql("DROP TABLE IF EXISTS t6") + spark.sql("DROP TABLE IF EXISTS t7") + spark.sql("DROP TABLE IF EXISTS t8") + } + + test("test create table with buckets") { + spark.sql( + """ + CREATE TABLE t4 + (ID Int, date Timestamp, country String, + name String, phonetype String, serialname String, salary Int) + USING org.apache.spark.sql.CarbonSource + OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t4") + """) + LoadTable(Some("default"), "t4", "./src/test/resources/dataDiff.csv", Nil, + Map(("use_kettle", "false"))).run(spark) + val table: CarbonTable = CarbonMetadata.getInstance().getCarbonTable("default_t4") + if (table != null && table.getBucketingInfo("t4") != null) { + assert(true) + } else { + assert(false, "Bucketing info does not exist") + } + } + + test("test create table with no bucket join of carbon tables") { + spark.sql( + """ + CREATE TABLE t5 + (ID Int, date Timestamp, country String, + name String, phonetype String, serialname String, salary Int) + USING org.apache.spark.sql.CarbonSource + OPTIONS("tableName"="t5") + """) + LoadTable(Some("default"), "t5", "./src/test/resources/dataDiff.csv", Nil, + Map(("use_kettle", "false"))).run(spark) + + val plan = spark.sql( + """ + |select t1.*, t2.* + |from t5 t1, t5 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(shuffleExists, "shuffle should exist on non bucket tables") + } + + test("test create table with bucket join of carbon tables") { + spark.sql( + """ + CREATE TABLE t6 + (ID Int, date Timestamp, country String, + name String, phonetype String, serialname String, salary Int) + USING org.apache.spark.sql.CarbonSource + OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t6") + """) + LoadTable(Some("default"), "t6", "./src/test/resources/dataDiff.csv", Nil, + Map(("use_kettle", "false"))).run(spark) + + val plan = spark.sql( + """ + |select t1.*, t2.* + |from t6 t1, t6 t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + test("test create table with bucket join of carbon table and parquet table") { + spark.sql( + """ + CREATE TABLE t7 + (ID Int, date Timestamp, country String, + name String, phonetype String, serialname String, salary Int) + USING org.apache.spark.sql.CarbonSource + OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t7") + """) + LoadTable(Some("default"), "t7", "./src/test/resources/dataDiff.csv", Nil, + Map(("use_kettle", "false"))).run(spark) + + spark.sql("DROP TABLE IF EXISTS bucketed_parquet_table") + spark.sql("select * from t7").write + .format("parquet") + .bucketBy(4, "name") + .saveAsTable("bucketed_parquet_table") + + val plan = spark.sql( + """ + |select t1.*, t2.* + |from t7 t1, bucketed_parquet_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(!shuffleExists, "shuffle should not exist on bucket tables") + } + + test("test create table with bucket join of carbon table and non bucket parquet table") { + spark.sql( + """ + CREATE TABLE t8 + (ID Int, date Timestamp, country String, + name String, phonetype String, serialname String, salary Int) + USING org.apache.spark.sql.CarbonSource + OPTIONS("bucketnumber"="4", "bucketcolumns"="name", "tableName"="t8") + """) + LoadTable(Some("default"), "t8", "./src/test/resources/dataDiff.csv", Nil, + Map(("use_kettle", "false"))).run(spark) + + spark.sql("DROP TABLE IF EXISTS parquet_table") + spark.sql("select * from t8").write + .format("parquet") + .saveAsTable("parquet_table") + + val plan = spark.sql( + """ + |select t1.*, t2.* + |from t8 t1, parquet_table t2 + |where t1.name = t2.name + """.stripMargin).queryExecution.executedPlan + var shuffleExists = false + plan.collect { + case s: ShuffleExchange => shuffleExists = true + } + assert(shuffleExists, "shuffle should exist on non bucket tables") + } + + override def afterAll { + spark.sql("DROP TABLE IF EXISTS t3") + spark.sql("DROP TABLE IF EXISTS t4") + spark.sql("DROP TABLE IF EXISTS t5") + spark.sql("DROP TABLE IF EXISTS t6") + spark.sql("DROP TABLE IF EXISTS t7") + spark.sql("DROP TABLE IF EXISTS t8") + // clean data folder + clean + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java index 20013cec5b5..26300d69e85 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; public class CarbonDataLoadConfiguration { @@ -36,6 +37,8 @@ public class CarbonDataLoadConfiguration { private String taskNo; + private BucketingInfo bucketingInfo; + private Map dataLoadProperties = new HashMap<>(); public int getDimensionCount() { @@ -141,4 +144,12 @@ public Object getDataLoadProperty(String key, Object defaultValue) { public Object getDataLoadProperty(String key) { return dataLoadProperties.get(key); } + + public BucketingInfo getBucketingInfo() { + return bucketingInfo; + } + + public void setBucketingInfo(BucketingInfo bucketingInfo) { + this.bucketingInfo = bucketingInfo; + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java index a5388d98729..63147c9fc2a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java @@ -37,6 +37,7 @@ import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorStepImpl; +import org.apache.carbondata.processing.newflow.steps.DataConverterProcessorWithBucketingStepImpl; import org.apache.carbondata.processing.newflow.steps.DataWriterProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl; import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl; @@ -54,6 +55,15 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String sto CarbonIterator[] inputIterators) throws Exception { CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation); + if (configuration.getBucketingInfo() != null) { + return buildInternalForBucketing(inputIterators, configuration); + } else { + return buildInternal(inputIterators, configuration); + } + } + + private AbstractDataLoadProcessorStep buildInternal(CarbonIterator[] inputIterators, + CarbonDataLoadConfiguration configuration) { // 1. Reads the data input iterators and parses the data. AbstractDataLoadProcessorStep inputProcessorStep = new InputProcessorStepImpl(configuration, inputIterators); @@ -70,6 +80,24 @@ public AbstractDataLoadProcessorStep build(CarbonLoadModel loadModel, String sto return writerProcessorStep; } + private AbstractDataLoadProcessorStep buildInternalForBucketing(CarbonIterator[] inputIterators, + CarbonDataLoadConfiguration configuration) throws Exception { + // 1. Reads the data input iterators and parses the data. + AbstractDataLoadProcessorStep inputProcessorStep = + new InputProcessorStepImpl(configuration, inputIterators); + // 2. Converts the data like dictionary or non dictionary or complex objects depends on + // data types and configurations. + AbstractDataLoadProcessorStep converterProcessorStep = + new DataConverterProcessorWithBucketingStepImpl(configuration, inputProcessorStep); + // 3. Sorts the data which are part of key (all dimensions except complex types) + AbstractDataLoadProcessorStep sortProcessorStep = + new SortProcessorStepImpl(configuration, converterProcessorStep); + // 4. Writes the sorted data in carbondata format. + AbstractDataLoadProcessorStep writerProcessorStep = + new DataWriterProcessorStepImpl(configuration, sortProcessorStep); + return writerProcessorStep; + } + private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadModel, String storeLocation) throws Exception { if (!new File(storeLocation).mkdirs()) { @@ -165,6 +193,7 @@ private CarbonDataLoadConfiguration createConfiguration(CarbonLoadModel loadMode } } configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()])); + configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName())); return configuration; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java index daf37fbf0a6..06893103c37 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRow.java @@ -28,6 +28,8 @@ public class CarbonRow { private Object[] data; + public short bucketNumber; + public CarbonRow(Object[] data) { this.data = data; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java new file mode 100644 index 00000000000..3ca14974def --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.processing.newflow.sort.impl; + +import java.io.File; +import java.util.Iterator; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.newflow.sort.Sorter; +import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger; +import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; +import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger; +import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; + +/** + * It parallely reads data from array of iterates and do merge sort. + * First it sorts the data and write to temp files. These temp files will be merge sorted to get + * final merge sort result. + * This step is specifically for bucketing, it sorts each bucket data separately and write to + * temp files. + */ +public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(ParallelReadMergeSorterImpl.class.getName()); + + private SortParameters sortParameters; + + private SortIntermediateFileMerger intermediateFileMerger; + + private ExecutorService executorService; + + private BucketingInfo bucketingInfo; + + private DataField[] inputDataFields; + + private int sortBufferSize; + + public ParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields, + BucketingInfo bucketingInfo) { + this.inputDataFields = inputDataFields; + this.bucketingInfo = bucketingInfo; + } + + @Override public void initialize(SortParameters sortParameters) { + this.sortParameters = sortParameters; + intermediateFileMerger = new SortIntermediateFileMerger(sortParameters); + int buffer = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)); + sortBufferSize = buffer/bucketingInfo.getNumberOfBuckets(); + if (sortBufferSize < 100) { + sortBufferSize = 100; + } + } + + @Override public Iterator[] sort(Iterator[] iterators) + throws CarbonDataLoadingException { + SortDataRows[] sortDataRows = new SortDataRows[bucketingInfo.getNumberOfBuckets()]; + try { + for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) { + SortParameters parameters = sortParameters.getCopy(); + parameters.setPartitionID(i + ""); + setTempLocation(parameters); + parameters.setBufferSize(sortBufferSize); + sortDataRows[i] = new SortDataRows(parameters, intermediateFileMerger); + sortDataRows[i].initialize(); + } + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + this.executorService = Executors.newFixedThreadPool(iterators.length); + final int batchSize = CarbonProperties.getInstance().getBatchSize(); + try { + for (int i = 0; i < iterators.length; i++) { + executorService.submit(new SortIteratorThread(iterators[i], sortDataRows)); + } + executorService.shutdown(); + executorService.awaitTermination(2, TimeUnit.DAYS); + processRowToNextStep(sortDataRows, sortParameters); + } catch (Exception e) { + throw new CarbonDataLoadingException("Problem while shutdown the server ", e); + } + try { + intermediateFileMerger.finish(); + } catch (CarbonDataWriterException e) { + throw new CarbonDataLoadingException(e); + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + + Iterator[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()]; + for (int i = 0; i < bucketingInfo.getNumberOfBuckets(); i++) { + batchIterator[i] = new MergedDataIterator(String.valueOf(i), batchSize); + } + + return batchIterator; + } + + private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { + String storeLocation = CarbonDataProcessorUtil + .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), bucketId, + sortParameters.getSegmentId() + "", false); + // Set the data file location + String dataFolderLocation = + storeLocation + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION; + SingleThreadFinalSortFilesMerger finalMerger = + new SingleThreadFinalSortFilesMerger(dataFolderLocation, sortParameters.getTableName(), + sortParameters.getDimColCount(), sortParameters.getComplexDimColCount(), + sortParameters.getMeasureColCount(), sortParameters.getNoDictionaryCount(), + sortParameters.getAggType(), sortParameters.getNoDictionaryDimnesionColumn(), + sortParameters.isUseKettle()); + return finalMerger; + } + + @Override public void close() { + intermediateFileMerger.close(); + } + + /** + * Below method will be used to process data to next step + */ + private boolean processRowToNextStep(SortDataRows[] sortDataRows, SortParameters parameters) + throws CarbonDataLoadingException { + if (null == sortDataRows || sortDataRows.length == 0) { + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + LOGGER.info("Number of Records was Zero"); + String logMessage = "Summary: Carbon Sort Key Step: Read: " + 0 + ": Write: " + 0; + LOGGER.info(logMessage); + return false; + } + + try { + for (int i = 0; i < sortDataRows.length; i++) { + // start sorting + sortDataRows[i].startSorting(); + } + // check any more rows are present + LOGGER.info("Record Processed For table: " + parameters.getTableName()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordSortRowsStepTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() + .recordDictionaryValuesTotalTime(parameters.getPartitionID(), System.currentTimeMillis()); + return false; + } catch (CarbonSortKeyAndGroupByException e) { + throw new CarbonDataLoadingException(e); + } + } + + private void setTempLocation(SortParameters parameters) { + String carbonDataDirectoryPath = CarbonDataProcessorUtil + .getLocalDataFolderLocation(parameters.getDatabaseName(), + parameters.getTableName(), parameters.getTaskNo(), + parameters.getPartitionID(), parameters.getSegmentId(), false); + parameters.setTempFileLocation( + carbonDataDirectoryPath + File.separator + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); + } + + /** + * This thread iterates the iterator and adds the rows to @{@link SortDataRows} + */ + private static class SortIteratorThread implements Callable { + + private Iterator iterator; + + private SortDataRows[] sortDataRows; + + public SortIteratorThread(Iterator iterator, SortDataRows[] sortDataRows) { + this.iterator = iterator; + this.sortDataRows = sortDataRows; + } + + @Override public Void call() throws CarbonDataLoadingException { + try { + while (iterator.hasNext()) { + CarbonRowBatch batch = iterator.next(); + Iterator batchIterator = batch.getBatchIterator(); + int i = 0; + while (batchIterator.hasNext()) { + CarbonRow row = batchIterator.next(); + if (row != null) { + SortDataRows sortDataRow = sortDataRows[row.bucketNumber]; + synchronized (sortDataRow) { + sortDataRow.addRow(row.getData()); + } + } + } + } + } catch (Exception e) { + LOGGER.error(e); + throw new CarbonDataLoadingException(e); + } + return null; + } + + } + + private class MergedDataIterator extends CarbonIterator { + + private String partitionId; + + private int batchSize; + + private boolean firstRow = true; + + public MergedDataIterator(String partitionId, int batchSize) { + this.partitionId = partitionId; + this.batchSize = batchSize; + } + + private SingleThreadFinalSortFilesMerger finalMerger; + + @Override public boolean hasNext() { + if (firstRow) { + firstRow = false; + finalMerger = getFinalMerger(partitionId); + finalMerger.startFinalMerge(); + } + return finalMerger.hasNext(); + } + + @Override public CarbonRowBatch next() { + int counter = 0; + CarbonRowBatch rowBatch = new CarbonRowBatch(); + while (finalMerger.hasNext() && counter < batchSize) { + rowBatch.addRow(new CarbonRow(finalMerger.next())); + counter++; + } + return rowBatch; + } + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java new file mode 100644 index 00000000000..16b203ae8bc --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.newflow.steps; + +import java.io.File; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.carbon.metadata.schema.BucketingInfo; +import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.partition.Partitioner; +import org.apache.carbondata.core.partition.impl.HashPartitionerImpl; +import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.processing.constants.LoggerAction; +import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep; +import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration; +import org.apache.carbondata.processing.newflow.DataField; +import org.apache.carbondata.processing.newflow.constants.DataLoadProcessorConstants; +import org.apache.carbondata.processing.newflow.converter.RowConverter; +import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl; +import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; +import org.apache.carbondata.processing.newflow.row.CarbonRow; +import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; +import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger; + +/** + * Replace row data fields with dictionary values if column is configured dictionary encoded. + * And nondictionary columns as well as complex columns will be converted to byte[]. + */ +public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoadProcessorStep { + + private RowConverter converter; + + private Partitioner partitioner; + + public DataConverterProcessorWithBucketingStepImpl(CarbonDataLoadConfiguration configuration, + AbstractDataLoadProcessorStep child) { + super(configuration, child); + } + + @Override + public DataField[] getOutput() { + return child.getOutput(); + } + + @Override + public void initialize() throws CarbonDataLoadingException { + child.initialize(); + BadRecordsLogger badRecordLogger = createBadRecordLogger(); + converter = new RowConverterImpl(child.getOutput(), configuration, badRecordLogger); + converter.initialize(); + List indexes = new ArrayList<>(); + List columnSchemas = new ArrayList<>(); + DataField[] inputDataFields = getOutput(); + BucketingInfo bucketingInfo = configuration.getBucketingInfo(); + for (int i = 0; i < inputDataFields.length; i++) { + for (int j = 0; j < bucketingInfo.getListOfColumns().size(); j++) { + if (inputDataFields[i].getColumn().getColName() + .equals(bucketingInfo.getListOfColumns().get(j).getColumnName())) { + indexes.add(i); + columnSchemas.add(inputDataFields[i].getColumn().getColumnSchema()); + break; + } + } + } + partitioner = + new HashPartitionerImpl(indexes, columnSchemas, bucketingInfo.getNumberOfBuckets()); + } + + /** + * Create the iterator using child iterator. + * + * @param childIter + * @return new iterator with step specific processing. + */ + @Override + protected Iterator getIterator(final Iterator childIter) { + return new CarbonIterator() { + RowConverter localConverter = converter.createCopyForNewThread(); + @Override public boolean hasNext() { + return childIter.hasNext(); + } + + @Override public CarbonRowBatch next() { + return processRowBatch(childIter.next(), localConverter); + } + }; + } + + /** + * Process the batch of rows as per the step logic. + * + * @param rowBatch + * @return processed row. + */ + protected CarbonRowBatch processRowBatch(CarbonRowBatch rowBatch, RowConverter localConverter) { + CarbonRowBatch newBatch = new CarbonRowBatch(); + Iterator batchIterator = rowBatch.getBatchIterator(); + while (batchIterator.hasNext()) { + CarbonRow next = batchIterator.next(); + CarbonRow convertRow = localConverter.convert(next); + convertRow.bucketNumber = (short) partitioner.getPartition(next.getData()); + newBatch.addRow(convertRow); + } + return newBatch; + } + + @Override + protected CarbonRow processRow(CarbonRow row) { + throw new UnsupportedOperationException(); + } + + private BadRecordsLogger createBadRecordLogger() { + boolean badRecordsLogRedirect = false; + boolean badRecordConvertNullDisable = false; + boolean badRecordsLoggerEnable = Boolean.parseBoolean( + configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ENABLE) + .toString()); + Object bad_records_action = + configuration.getDataLoadProperty(DataLoadProcessorConstants.BAD_RECORDS_LOGGER_ACTION) + .toString(); + if (null != bad_records_action) { + LoggerAction loggerAction = null; + try { + loggerAction = LoggerAction.valueOf(bad_records_action.toString().toUpperCase()); + } catch (IllegalArgumentException e) { + loggerAction = LoggerAction.FORCE; + } + switch (loggerAction) { + case FORCE: + badRecordConvertNullDisable = false; + break; + case REDIRECT: + badRecordsLogRedirect = true; + badRecordConvertNullDisable = true; + break; + case IGNORE: + badRecordsLogRedirect = false; + badRecordConvertNullDisable = true; + break; + } + } + CarbonTableIdentifier identifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + BadRecordsLogger badRecordsLogger = new BadRecordsLogger(identifier.getBadRecordLoggerKey(), + identifier.getTableName() + '_' + System.currentTimeMillis(), getBadLogStoreLocation( + identifier.getDatabaseName() + File.separator + identifier.getTableName() + File.separator + + configuration.getTaskNo()), badRecordsLogRedirect, badRecordsLoggerEnable, + badRecordConvertNullDisable); + return badRecordsLogger; + } + + private String getBadLogStoreLocation(String storeLocation) { + String badLogStoreLocation = + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC); + badLogStoreLocation = badLogStoreLocation + File.separator + storeLocation; + + return badLogStoreLocation; + } + + @Override + public void close() { + super.close(); + if (converter != null) { + converter.finish(); + } + } +} diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java index 48492d349da..8318530547a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java @@ -53,8 +53,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { private KeyGenerator keyGenerator; - private CarbonFactHandler dataHandler; - private int noDictionaryCount; private int complexDimensionCount; @@ -71,57 +69,66 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { private int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex(); - private String storeLocation; - public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { super(configuration, child); } - @Override - public DataField[] getOutput() { + @Override public DataField[] getOutput() { return child.getOutput(); } - @Override - public void initialize() throws CarbonDataLoadingException { + @Override public void initialize() throws CarbonDataLoadingException { child.initialize(); - CarbonTableIdentifier tableIdentifier = - configuration.getTableIdentifier().getCarbonTableIdentifier(); + } - storeLocation = CarbonDataProcessorUtil + private String getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + String storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), - configuration.getPartitionId(), configuration.getSegmentId() + "", false); - - if (!(new File(storeLocation).mkdirs())) { - LOGGER.error("Local data load folder location does not exist: " + storeLocation); - return; - } + tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, + configuration.getSegmentId() + "", false); + new File(storeLocation).mkdirs(); + return storeLocation; } - @Override - public Iterator[] execute() throws CarbonDataLoadingException { + @Override public Iterator[] execute() throws CarbonDataLoadingException { Iterator[] iterators = child.execute(); - String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName(); + CarbonTableIdentifier tableIdentifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + String tableName = tableIdentifier.getTableName(); try { - CarbonFactDataHandlerModel dataHandlerModel = - CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, storeLocation); + CarbonFactDataHandlerModel dataHandlerModel = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, + getStoreLocation(tableIdentifier, String.valueOf(0)), 0); noDictionaryCount = dataHandlerModel.getNoDictionaryCount(); complexDimensionCount = configuration.getComplexDimensionCount(); measureCount = dataHandlerModel.getMeasureCount(); segmentProperties = dataHandlerModel.getSegmentProperties(); keyGenerator = segmentProperties.getDimensionKeyGenerator(); - dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(dataHandlerModel, - CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); - dataHandler.initialise(); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), System.currentTimeMillis()); + int i = 0; for (Iterator iterator : iterators) { + String storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel + .createCarbonFactDataHandlerModel(configuration, storeLocation, i); + CarbonFactHandler dataHandler = null; + boolean rowsNotExist = true; while (iterator.hasNext()) { - processBatch(iterator.next()); + if (rowsNotExist) { + rowsNotExist = false; + dataHandler = CarbonFactHandlerFactory + .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); + dataHandler.initialise(); + } + processBatch(iterator.next(), dataHandler); + } + if (!rowsNotExist) { + finish(tableName, dataHandler); } + i++; } } catch (CarbonDataWriterException e) { @@ -135,9 +142,11 @@ public Iterator[] execute() throws CarbonDataLoadingException { return null; } - @Override - public void close() { - String tableName = configuration.getTableIdentifier().getCarbonTableIdentifier().getTableName(); + @Override public void close() { + + } + + private void finish(String tableName, CarbonFactHandler dataHandler) { try { dataHandler.finish(); } catch (Exception e) { @@ -149,7 +158,7 @@ public void close() { + writeCounter; LOGGER.info(logMessage); CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter); - processingComplete(); + processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), System.currentTimeMillis()); @@ -157,7 +166,7 @@ public void close() { .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); } - private void processingComplete() throws CarbonDataLoadingException { + private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { if (null != dataHandler) { try { dataHandler.closeHandler(); @@ -171,7 +180,8 @@ private void processingComplete() throws CarbonDataLoadingException { } } - private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingException { + private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) + throws CarbonDataLoadingException { Iterator iterator = batch.getBatchIterator(); try { while (iterator.hasNext()) { @@ -208,8 +218,7 @@ private void processBatch(CarbonRowBatch batch) throws CarbonDataLoadingExceptio } } - @Override - protected CarbonRow processRow(CarbonRow row) { + @Override protected CarbonRow processRow(CarbonRow row) { return null; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java index 99b7894dad8..ef43751bc1b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java @@ -30,6 +30,7 @@ import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; import org.apache.carbondata.processing.newflow.sort.Sorter; import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterImpl; +import org.apache.carbondata.processing.newflow.sort.impl.ParallelReadMergeSorterWithBucketingImpl; import org.apache.carbondata.processing.newflow.sort.impl.UnsafeParallelReadMergeSorterImpl; import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters; @@ -60,7 +61,15 @@ public void initialize() throws CarbonDataLoadingException { CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)); if (offheapsort) { sorter = new UnsafeParallelReadMergeSorterImpl(child.getOutput()); - } else sorter = new ParallelReadMergeSorterImpl(child.getOutput()); + } else { + sorter = new ParallelReadMergeSorterImpl(child.getOutput()); + } + if (configuration.getBucketingInfo() != null) { + sorter = new ParallelReadMergeSorterWithBucketingImpl(child.getOutput(), + configuration.getBucketingInfo()); + } else { + sorter = new ParallelReadMergeSorterImpl(child.getOutput()); + } sorter.initialize(sortParameters); } diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java index 6254cce5cd1..c24472bf818 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortParameters.java @@ -121,6 +121,34 @@ public class SortParameters { */ private boolean useKettle = true; + public SortParameters getCopy() { + SortParameters parameters = new SortParameters(); + parameters.tempFileLocation = tempFileLocation; + parameters.sortBufferSize = sortBufferSize; + parameters.measureColCount = measureColCount; + parameters.dimColCount = dimColCount; + parameters.complexDimColCount = complexDimColCount; + parameters.fileBufferSize = fileBufferSize; + parameters.numberOfIntermediateFileToBeMerged = numberOfIntermediateFileToBeMerged; + parameters.fileWriteBufferSize = fileWriteBufferSize; + parameters.observer = observer; + parameters.sortTempFileNoOFRecordsInCompression = sortTempFileNoOFRecordsInCompression; + parameters.isSortFileCompressionEnabled = isSortFileCompressionEnabled; + parameters.prefetch = prefetch; + parameters.bufferSize = bufferSize; + parameters.databaseName = databaseName; + parameters.tableName = tableName; + parameters.aggType = aggType; + parameters.noDictionaryCount = noDictionaryCount; + parameters.partitionID = partitionID; + parameters.segmentId = segmentId; + parameters.taskNo = taskNo; + parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn; + parameters.numberOfCores = numberOfCores; + parameters.useKettle = useKettle; + return parameters; + } + public String getTempFileLocation() { return tempFileLocation; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 2d468aca51d..f6a729da483 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -274,6 +274,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { private boolean useKettle; + private int bucketNumber; + /** * CarbonFactDataHandler constructor */ @@ -293,6 +295,7 @@ public CarbonFactDataHandlerColumnar(CarbonFactDataHandlerModel carbonFactDataHa this.aggKeyBlock = new boolean[columnStoreCount]; this.isNoDictionary = new boolean[columnStoreCount]; + this.bucketNumber = carbonFactDataHandlerModel.getBucketId(); this.isUseInvertedIndex = new boolean[columnStoreCount]; if (null != carbonFactDataHandlerModel.getIsUseInvertedIndex()) { for (int i = 0; i < isUseInvertedIndex.length; i++) { @@ -1495,6 +1498,7 @@ private CarbonDataWriterVo getDataWriterVo(int[] keyBlockSize) { carbonDataWriterVo.setColCardinality(colCardinality); carbonDataWriterVo.setSegmentProperties(segmentProperties); carbonDataWriterVo.setTableBlocksize(tableBlockSize); + carbonDataWriterVo.setBucketNumber(bucketNumber); return carbonDataWriterVo; } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index c7d9d29da69..51663fc7ba3 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -186,13 +186,15 @@ public void setBlockSizeInMB(int blockSize) { */ private boolean useKettle = true; + private int bucketId = 0; + /** * Create the model using @{@link CarbonDataLoadConfiguration} * @param configuration * @return CarbonFactDataHandlerModel */ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel( - CarbonDataLoadConfiguration configuration, String storeLocation) { + CarbonDataLoadConfiguration configuration, String storeLocation, int bucketId) { CarbonTableIdentifier identifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); @@ -291,6 +293,7 @@ public static CarbonFactDataHandlerModel createCarbonFactDataHandlerModel( } else { carbonFactDataHandlerModel.setMdKeyIndex(measureCount); } + carbonFactDataHandlerModel.bucketId = bucketId; return carbonFactDataHandlerModel; } @@ -558,5 +561,9 @@ public boolean isUseKettle() { public void setUseKettle(boolean useKettle) { this.useKettle = useKettle; } + + public int getBucketId() { + return bucketId; + } } diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java index 2c8267288a7..5697cb61490 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/SingleThreadFinalSortFilesMerger.java @@ -150,6 +150,10 @@ public boolean accept(File pathname) { */ private void startSorting(File[] files) throws CarbonDataWriterException { this.fileCounter = files.length; + if (fileCounter == 0) { + LOGGER.info("No files to merge sort"); + return; + } this.fileBufferSize = CarbonDataProcessorUtil .getFileBufferSize(this.fileCounter, CarbonProperties.getInstance(), CarbonCommonConstants.CONSTANT_SIZE_TEN); diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index a1e7311fe6c..019539262fc 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -271,6 +271,7 @@ public void initializeWriter() throws CarbonDataWriterException { initFileCount(); String carbonDataFileName = carbonTablePath .getCarbonDataFileName(fileCount, dataWriterVo.getCarbonDataFileAttributes().getTaskId(), + dataWriterVo.getBucketNumber(), dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); String actualFileNameVal = carbonDataFileName + CarbonCommonConstants.FILE_INPROGRESS_STATUS; FileData fileData = new FileData(actualFileNameVal, dataWriterVo.getStoreLocation()); @@ -423,12 +424,13 @@ public void closeWriter() throws CarbonDataWriterException { */ private void writeIndexFile() throws IOException, CarbonDataWriterException { // get the header - IndexHeader indexHeader = - CarbonMetadataUtil.getIndexHeader(localCardinality, thriftColumnSchemaList); + IndexHeader indexHeader = CarbonMetadataUtil + .getIndexHeader(localCardinality, thriftColumnSchemaList, dataWriterVo.getBucketNumber()); // get the block index info thrift List blockIndexThrift = CarbonMetadataUtil.getBlockIndexInfo(blockIndexInfoList); String fileName = dataWriterVo.getStoreLocation() + File.separator + carbonTablePath .getCarbonIndexFileName(dataWriterVo.getCarbonDataFileAttributes().getTaskId(), + dataWriterVo.getBucketNumber(), dataWriterVo.getCarbonDataFileAttributes().getFactTimeStamp()); CarbonIndexFileWriter writer = new CarbonIndexFileWriter(); // open file diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java index 6e0287d40a8..e46430b7bfe 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonDataWriterVo.java @@ -66,6 +66,8 @@ public class CarbonDataWriterVo { private int tableBlocksize; + private int bucketNumber; + /** * @return the storeLocation */ @@ -318,4 +320,11 @@ public void setTableBlocksize(int tableBlocksize) { this.tableBlocksize = tableBlocksize; } + public int getBucketNumber() { + return bucketNumber; + } + + public void setBucketNumber(int bucketNumber) { + this.bucketNumber = bucketNumber; + } }