From 5f6a56cac26be7e5307a01296ce9351b60be0e66 Mon Sep 17 00:00:00 2001 From: jackylk Date: Fri, 25 Nov 2016 18:07:20 +0800 Subject: [PATCH] change ScanRdd to use RecordReader fix style --- .../carbon/datastore/block/Distributable.java | 8 +- .../datastore/block/TableBlockInfo.java | 3 +- .../carbon/datastore/block/TableTaskInfo.java | 2 +- .../filter/FilterExpressionProcessor.java | 29 +- .../examples/DataFrameAPIExample.scala | 5 +- .../carbondata/hadoop/CarbonInputFormat.java | 334 +++++---------- .../carbondata/hadoop/CarbonInputSplit.java | 112 ++++- .../hadoop/CarbonMultiBlockSplit.java | 105 +++++ .../carbondata/hadoop/CarbonRecordReader.java | 34 +- .../hadoop/readsupport/CarbonReadSupport.java | 2 +- .../AbstractDictionaryDecodedReadSupport.java | 2 +- .../impl/ArrayWritableReadSupport.java | 2 +- .../readsupport/impl/RawDataReadSupport.java | 6 +- .../hadoop/util/CarbonInputFormatUtil.java | 14 + .../hadoop/ft/CarbonInputMapperTest.java | 15 +- .../spark/load/CarbonLoaderUtil.java | 99 +---- .../spark/merger/CarbonDataMergerUtil.java | 11 +- .../readsupport/SparkRowReadSupportImpl.java | 9 +- .../spark/util/LoadMetadataUtil.java | 4 +- .../spark/rdd/CarbonDataRDDFactory.scala | 20 +- .../spark/rdd/CarbonMergerRDD.scala | 151 +++---- .../carbondata/spark/rdd/CarbonScanRDD.scala | 387 ++++++++---------- .../carbondata/spark/rdd/Compactor.scala | 13 +- .../carbondata/spark/util/QueryPlanUtil.scala | 56 --- .../sql/CarbonDatasourceHadoopRelation.scala | 20 +- .../spark/sql/CarbonDatasourceRelation.scala | 3 +- ...CarbonOperators.scala => CarbonScan.scala} | 29 +- .../execution/command/carbonTableSchema.scala | 20 +- .../spark/sql/hive/DistributionUtil.scala | 21 +- .../AllDataTypesTestCaseAggregate.scala | 20 +- .../CompactionSystemLockFeatureTest.scala | 20 +- ...DataCompactionBoundaryConditionsTest.scala | 3 - ...DataCompactionCardinalityBoundryTest.scala | 10 +- .../DataCompactionLockTest.scala | 11 +- .../DataCompactionMinorThresholdTest.scala | 6 +- .../DataCompactionNoDictionaryTest.scala | 10 +- .../datacompaction/DataCompactionTest.scala | 14 +- .../MajorCompactionIgnoreInMinorTest.scala | 32 +- .../MajorCompactionStopsAfterCompaction.scala | 14 +- .../dataretention/DataRetentionTestCase.scala | 5 +- .../lcm/status/SegmentStatusManager.java | 160 +++----- 41 files changed, 790 insertions(+), 1031 deletions(-) create mode 100644 hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java delete mode 100644 integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala rename integration/spark/src/main/scala/org/apache/spark/sql/{CarbonOperators.scala => CarbonScan.scala} (92%) diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java index 817aafc7a1e..99d44591ffb 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/Distributable.java @@ -16,10 +16,12 @@ */ package org.apache.carbondata.core.carbon.datastore.block; +import java.io.IOException; + /** - * Abstract class which is maintains the locations of node. + * interface to get the locations of node. Used for making task distribution based on locality */ -public abstract class Distributable implements Comparable { +public interface Distributable extends Comparable { - public abstract String[] getLocations(); + String[] getLocations() throws IOException; } diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java index f8da9afb559..4bf0047b272 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableBlockInfo.java @@ -28,8 +28,7 @@ * class will be used to pass the block detail detail will be passed form driver * to all the executor to load the b+ tree */ -public class TableBlockInfo extends Distributable - implements Serializable, Comparable { +public class TableBlockInfo implements Distributable, Serializable { /** * serialization id diff --git a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java index 1f8caf0299d..7ce3a14b58f 100644 --- a/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java +++ b/core/src/main/java/org/apache/carbondata/core/carbon/datastore/block/TableTaskInfo.java @@ -27,7 +27,7 @@ /** * This class is responsible for maintaining the mapping of tasks of a node. */ -public class TableTaskInfo extends Distributable { +public class TableTaskInfo implements Distributable { private final List tableBlockInfoList; private final String taskId; diff --git a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java index 154186726a3..44a3a31ddd5 100644 --- a/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java +++ b/core/src/main/java/org/apache/carbondata/scan/filter/FilterExpressionProcessor.java @@ -29,7 +29,6 @@ import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder; import org.apache.carbondata.core.carbon.datastore.IndexKey; import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex; -import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties; import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.encoder.Encoding; @@ -127,12 +126,10 @@ public List getFilterredBlocks(DataRefNode btreeNode, FilterExecuter filterExecuter = FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null); while (startBlock != endBlock) { - addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock, - tableSegment.getSegmentProperties()); + addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock); startBlock = startBlock.getNextDataRefNode(); } - addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock, - tableSegment.getSegmentProperties()); + addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock); LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + ( System.currentTimeMillis() - startTimeInMillis) + " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan @@ -147,11 +144,9 @@ public List getFilterredBlocks(DataRefNode btreeNode, * @param filterResolver * @param listOfDataBlocksToScan * @param dataRefNode - * @param segmentProperties */ private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, - List listOfDataBlocksToScan, DataRefNode dataRefNode, - SegmentProperties segmentProperties) { + List listOfDataBlocksToScan, DataRefNode dataRefNode) { BitSet bitSet = filterExecuter .isScanRequired(dataRefNode.getColumnsMaxValue(), dataRefNode.getColumnsMinValue()); @@ -174,7 +169,7 @@ private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter, private FilterResolverIntf getFilterResolvertree(Expression expressionTree, AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException { FilterResolverIntf filterEvaluatorTree = - createFilterResolverTree(expressionTree, tableIdentifier, null); + createFilterResolverTree(expressionTree, tableIdentifier); traverseAndResolveTree(filterEvaluatorTree, tableIdentifier); return filterEvaluatorTree; } @@ -212,24 +207,22 @@ private void traverseAndResolveTree(FilterResolverIntf filterResolverTree, * @return */ private FilterResolverIntf createFilterResolverTree(Expression expressionTree, - AbsoluteTableIdentifier tableIdentifier, Expression intermediateExpression) { + AbsoluteTableIdentifier tableIdentifier) { ExpressionType filterExpressionType = expressionTree.getFilterExpressionType(); BinaryExpression currentExpression = null; switch (filterExpressionType) { case OR: currentExpression = (BinaryExpression) expressionTree; return new LogicalFilterResolverImpl( - createFilterResolverTree(currentExpression.getLeft(), tableIdentifier, - currentExpression), - createFilterResolverTree(currentExpression.getRight(), tableIdentifier, - currentExpression),currentExpression); + createFilterResolverTree(currentExpression.getLeft(), tableIdentifier), + createFilterResolverTree(currentExpression.getRight(), tableIdentifier), + currentExpression); case AND: currentExpression = (BinaryExpression) expressionTree; return new LogicalFilterResolverImpl( - createFilterResolverTree(currentExpression.getLeft(), tableIdentifier, - currentExpression), - createFilterResolverTree(currentExpression.getRight(), tableIdentifier, - currentExpression), currentExpression); + createFilterResolverTree(currentExpression.getLeft(), tableIdentifier), + createFilterResolverTree(currentExpression.getRight(), tableIdentifier), + currentExpression); case EQUALS: case IN: return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS, diff --git a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala index 49fb0da6629..97fa152bb1b 100644 --- a/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala +++ b/examples/spark/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala @@ -24,7 +24,7 @@ object DataFrameAPIExample { def main(args: Array[String]) { val cc = ExampleUtils.createCarbonContext("DataFrameAPIExample") - ExampleUtils.writeSampleCarbonFile(cc, "carbon1") + ExampleUtils.writeSampleCarbonFile(cc, "carbon1", 1000) // use datasource api to read val in = cc.read @@ -42,7 +42,8 @@ object DataFrameAPIExample { println(s"count after 2 load: $count") // use SQL to read - cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show + cc.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show + cc.sql("DROP TABLE IF EXISTS carbon1") } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java index 4a51f52d274..a44a78c60ff 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java @@ -25,12 +25,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.datastore.DataRefNode; @@ -50,7 +44,6 @@ import org.apache.carbondata.core.carbon.querystatistics.*; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.keygenerator.KeyGenException; -import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; @@ -61,7 +54,6 @@ import org.apache.carbondata.lcm.status.SegmentStatusManager; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.expression.Expression; -import org.apache.carbondata.scan.expression.exception.FilterUnsupportedException; import org.apache.carbondata.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.scan.filter.FilterUtil; import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf; @@ -74,6 +66,7 @@ import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; @@ -130,6 +123,11 @@ public static CarbonTable getCarbonTable(Configuration configuration) throws IOE return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr); } + public static void setTablePath(Configuration configuration, String tablePath) + throws IOException { + configuration.set(FileInputFormat.INPUT_DIR, tablePath); + } + /** * It sets unresolved filter expression. * @@ -137,26 +135,10 @@ public static CarbonTable getCarbonTable(Configuration configuration) throws IOE * @param filterExpression */ public static void setFilterPredicates(Configuration configuration, Expression filterExpression) { - try { - String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); - configuration.set(FILTER_PREDICATE, filterString); - } catch (Exception e) { - throw new RuntimeException("Error while setting filter expression to Job", e); + if (filterExpression == null) { + return; } - } - - /** - * It sets the resolved filter expression - * - * @param configuration - * @param filterExpression - */ - public static void setFilterPredicates(Configuration configuration, - FilterResolverIntf filterExpression) { try { - if (filterExpression == null) { - return; - } String filterString = ObjectSerializationUtil.convertObjectToString(filterExpression); configuration.set(FILTER_PREDICATE, filterString); } catch (Exception e) { @@ -164,7 +146,7 @@ public static void setFilterPredicates(Configuration configuration, } } - public static void setColumnProjection(CarbonProjection projection, Configuration configuration) { + public static void setColumnProjection(Configuration configuration, CarbonProjection projection) { if (projection == null || projection.isEmpty()) { return; } @@ -178,6 +160,10 @@ public static void setColumnProjection(CarbonProjection projection, Configuratio configuration.set(COLUMN_PROJECTION, columnString); } + public static String getColumnProjection(Configuration configuration) { + return configuration.get(COLUMN_PROJECTION); + } + public static void setCarbonReadSupport(Class readSupportClass, Configuration configuration) { if (readSupportClass != null) { @@ -191,30 +177,13 @@ public static CarbonTablePath getTablePath(Configuration configuration) throws I } /** - * Set List of segments to access + * Set list of segments to access */ public static void setSegmentsToAccess(Configuration configuration, List validSegments) { - configuration - .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments)); + configuration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, + CarbonUtil.getSegmentString(validSegments)); } - /** - * Below method will be used to set the segments details if - * segments are not added in the configuration - * - * @param job - * @param absoluteTableIdentifier - * @throws IOException - */ - private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier) - throws IOException { - if (getSegmentsFromConfiguration(job).length == 0) { - // Get the valid segments from the carbon store. - SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = - new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); - setSegmentsToAccess(job.getConfiguration(), validAndInvalidSegments.getValidSegments()); - } - } /** * {@inheritDoc} * Configurations FileInputFormat.INPUT_DIR @@ -224,42 +193,46 @@ private void addSegmentsIfEmpty(JobContext job, AbsoluteTableIdentifier absolute * @return List list of CarbonInputSplit * @throws IOException */ - @Override public List getSplits(JobContext job) throws IOException { - try { - CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); - Object filterPredicates = getFilterPredicates(job.getConfiguration()); - AbsoluteTableIdentifier absoluteTableIdentifier = - getAbsoluteTableIdentifier(job.getConfiguration()); - addSegmentsIfEmpty(job, absoluteTableIdentifier); - if (filterPredicates == null) { - return getSplitsNonFilter(job); - } else { - if (filterPredicates instanceof Expression) { - //process and resolve the expression. - CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable); - return getSplits(job, CarbonInputFormatUtil - .resolveFilter((Expression) filterPredicates, absoluteTableIdentifier)); - } else { - //It means user sets already resolved expression. - return getSplits(job, (FilterResolverIntf) filterPredicates); - } + @Override + public List getSplits(JobContext job) throws IOException { + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration()); + List invalidSegments = new ArrayList<>(); + + // get all valid segments and set them into the configuration + if (getSegmentsToAccess(job).length == 0) { + SegmentStatusManager.SegmentStatus segments = + SegmentStatusManager.getSegmentStatus(identifier); + setSegmentsToAccess(job.getConfiguration(), segments.getValidSegments()); + if (segments.getValidSegments().size() == 0) { + return new ArrayList<>(0); + } + + // remove entry in the segment index if there are invalid segments + invalidSegments.addAll(segments.getInvalidSegments()); + if (invalidSegments.size() > 0) { + SegmentTaskIndexStore.getInstance().removeTableBlocks(invalidSegments, identifier); } - } catch (Exception ex) { - throw new IOException(ex); } - } - /** - * the method will return the blocks to be scanned with blocklets info - * - * @param job - * @return - * @throws IOException - * @throws IndexBuilderException - */ - private List getSplitsNonFilter(JobContext job) - throws IOException, IndexBuilderException { - return getSplits(job, null); + // process and resolve the expression + Expression filter = getFilterPredicates(job.getConfiguration()); + CarbonTable carbonTable = getCarbonTable(job.getConfiguration()); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + FilterResolverIntf filterInterface = CarbonInputFormatUtil.resolveFilter(filter, identifier); + List splits; + try { + // do block filtering and get split + splits = getSplits(job, filterInterface); + } catch (IndexBuilderException e) { + throw new IOException(e); + } + // pass the invalid segment to task side in order to remove index entry in task side + if (invalidSegments.size() > 0) { + for (InputSplit split : splits) { + ((CarbonInputSplit) split).setInvalidSegments(invalidSegments); + } + } + return splits; } private List getSplitsInternal(JobContext job) throws IOException { @@ -296,7 +269,7 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso getAbsoluteTableIdentifier(job.getConfiguration()); //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : getSegmentsFromConfiguration(job)) { + for (String segmentNo : getSegmentsToAccess(job)) { List dataRefNodes = getDataBlocksOfSegment(job, filterExpressionProcessor, absoluteTableIdentifier, filterResolver, segmentNo); @@ -311,98 +284,23 @@ private List getSplits(JobContext job, FilterResolverIntf filterReso return result; } - /** - * get total number of rows. Same as count(*) - * - * @throws IOException - * @throws IndexBuilderException - */ - public long getRowCount(JobContext job) throws IOException, IndexBuilderException { - - long rowCount = 0; - AbsoluteTableIdentifier absoluteTableIdentifier = - getAbsoluteTableIdentifier(job.getConfiguration()); - // no of core to load the blocks in driver - addSegmentsIfEmpty(job, absoluteTableIdentifier); - int numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - try { - numberOfCores = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT)); - } catch (NumberFormatException e) { - numberOfCores = CarbonCommonConstants.NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT_DEFAULT_VALUE; - } - // creating a thread pool - ExecutorService threadPool = Executors.newFixedThreadPool(numberOfCores); - List>> loadedBlocks = - new ArrayList>>(); - //for each segment fetch blocks matching filter in Driver BTree - for (String segmentNo : getSegmentsFromConfiguration(job)) { - // submitting the task - loadedBlocks - .add(threadPool.submit(new BlocksLoaderThread(job, absoluteTableIdentifier, segmentNo))); - } - threadPool.shutdown(); - try { - threadPool.awaitTermination(1, TimeUnit.HOURS); - } catch (InterruptedException e) { - throw new IndexBuilderException(e); - } - try { - // adding all the rows of the blocks to get the total row - // count - for (Future> block : loadedBlocks) { - for (AbstractIndex abstractIndex : block.get().values()) { - rowCount += abstractIndex.getTotalNumberOfRows(); - } - } - } catch (InterruptedException | ExecutionException e) { - throw new IndexBuilderException(e); - } - return rowCount; - } - - /** - * {@inheritDoc} - * Configurations FileInputFormat.INPUT_DIR, CarbonInputFormat.INPUT_SEGMENT_NUMBERS - * are used to get table path to read. - * - * @return - * @throws IOException - */ - public FilterResolverIntf getResolvedFilter(Configuration configuration, - Expression filterExpression) - throws IOException, IndexBuilderException, QueryExecutionException { - if (filterExpression == null) { - return null; - } - FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor(); - AbsoluteTableIdentifier absoluteTableIdentifier = getAbsoluteTableIdentifier(configuration); - //get resolved filter - try { - return filterExpressionProcessor.getFilterResolver(filterExpression, absoluteTableIdentifier); - } catch (FilterUnsupportedException e) { - throw new QueryExecutionException(e.getMessage()); - } - } - - private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) - throws IOException { + private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration) { String dirs = configuration.get(INPUT_DIR, ""); String[] inputPaths = StringUtils.split(dirs); if (inputPaths.length == 0) { - throw new IOException("No input paths specified in job"); + throw new InvalidPathException("No input paths specified in job"); } return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]); } - private Object getFilterPredicates(Configuration configuration) { + private Expression getFilterPredicates(Configuration configuration) { try { String filterExprString = configuration.get(FILTER_PREDICATE); if (filterExprString == null) { return null; } - Object filterExprs = ObjectSerializationUtil.convertStringToObject(filterExprString); - return filterExprs; + Object filter = ObjectSerializationUtil.convertStringToObject(filterExprString); + return (Expression) filter; } catch (IOException e) { throw new RuntimeException("Error while reading filter expression", e); } @@ -452,16 +350,13 @@ private List getDataBlocksOfSegment(JobContext job, * Below method will be used to get the table block info * * @param job job context - * @param absoluteTableIdentifier absolute table identifier * @param segmentId number of segment id * @return list of table block * @throws IOException */ - private List getTableBlockInfo(JobContext job, - AbsoluteTableIdentifier absoluteTableIdentifier, String segmentId) throws IOException { - // List fileStatusList = new LinkedList(); + private List getTableBlockInfo(JobContext job, String segmentId) + throws IOException { List tableBlockInfoList = new ArrayList(); - // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); // get file location of all files of given segment JobContext newJob = @@ -489,10 +384,8 @@ private Map getSegmentAbstractIndexs(JobContext job, // if segment tree is not loaded, load the segment tree if (segmentIndexMap == null) { - // List fileStatusList = new LinkedList(); List tableBlockInfoList = - getTableBlockInfo(job, absoluteTableIdentifier, segmentId); - // getFileStatusOfSegments(job, new int[]{ segmentId }, fileStatusList); + getTableBlockInfo(job, segmentId); Map> segmentToTableBlocksInfos = new HashMap<>(); segmentToTableBlocksInfos.put(segmentId, tableBlockInfoList); @@ -535,30 +428,33 @@ private List getDataBlocksOfIndex(AbstractIndex abstractIndex) { return blocks; } - @Override public RecordReader createRecordReader(InputSplit inputSplit, + @Override + public RecordReader createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException { Configuration configuration = taskAttemptContext.getConfiguration(); CarbonTable carbonTable = getCarbonTable(configuration); - QueryModel queryModel; - try { - CarbonQueryPlan queryPlan = - CarbonInputFormatUtil.createQueryPlan(carbonTable, configuration.get(COLUMN_PROJECTION)); - queryModel = - QueryModel.createModel(getAbsoluteTableIdentifier(configuration), queryPlan, carbonTable); - Object filterPredicates = getFilterPredicates(configuration); - if (filterPredicates != null) { - if (filterPredicates instanceof Expression) { - CarbonInputFormatUtil.processFilterExpression((Expression) filterPredicates, carbonTable); - queryModel.setFilterExpressionResolverTree(CarbonInputFormatUtil - .resolveFilter((Expression) filterPredicates, - getAbsoluteTableIdentifier(configuration))); - } else { - queryModel.setFilterExpressionResolverTree((FilterResolverIntf) filterPredicates); - } + AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration); + + // query plan includes projection column + String projection = getColumnProjection(configuration); + CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection); + QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable); + + // set the filter to the query model in order to filter blocklet before scan + Expression filter = getFilterPredicates(configuration); + CarbonInputFormatUtil.processFilterExpression(filter, carbonTable); + FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier); + queryModel.setFilterExpressionResolverTree(filterIntf); + + // update the file level index store if there are invalid segment + if (inputSplit instanceof CarbonMultiBlockSplit) { + CarbonMultiBlockSplit split = (CarbonMultiBlockSplit) inputSplit; + List invalidSegments = split.getAllSplits().get(0).getInvalidSegments(); + if (invalidSegments.size() > 0) { + queryModel.setInvalidSegmentIds(invalidSegments); } - } catch (Exception e) { - throw new IOException(e); } + CarbonReadSupport readSupport = getReadSupportClass(configuration); return new CarbonRecordReader(queryModel, readSupport); } @@ -586,17 +482,20 @@ private CarbonReadSupport getReadSupportClass(Configuration configuration) { return readSupport; } - @Override protected long computeSplitSize(long blockSize, long minSize, long maxSize) { + @Override + protected long computeSplitSize(long blockSize, long minSize, long maxSize) { return super.computeSplitSize(blockSize, minSize, maxSize); } - @Override protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { + @Override + protected int getBlockIndex(BlockLocation[] blkLocations, long offset) { return super.getBlockIndex(blkLocations, offset); } - @Override protected List listStatus(JobContext job) throws IOException { + @Override + protected List listStatus(JobContext job) throws IOException { List result = new ArrayList(); - String[] segmentsToConsider = getSegmentsFromConfiguration(job); + String[] segmentsToConsider = getSegmentsToAccess(job); if (segmentsToConsider.length == 0) { throw new IOException("No segments found"); } @@ -605,7 +504,8 @@ private CarbonReadSupport getReadSupportClass(Configuration configuration) { return result; } - @Override protected boolean isSplitable(JobContext context, Path filename) { + @Override + protected boolean isSplitable(JobContext context, Path filename) { try { // Don't split the file if it is local file system FileSystem fileSystem = filename.getFileSystem(context.getConfiguration()); @@ -626,7 +526,7 @@ private void getFileStatusOfSegments(JobContext job, String[] segmentsToConsider throw new IOException("No partitions/data found"); } - PathFilter inputFilter = getDataFileFilter(job); + PathFilter inputFilter = getDataFileFilter(); CarbonTablePath tablePath = getTablePath(job.getConfiguration()); // get tokens for all the required FileSystem for table path @@ -658,10 +558,9 @@ private void getFileStatusOfSegments(JobContext job, String[] segmentsToConsider } /** - * @param job * @return the PathFilter for Fact Files. */ - public PathFilter getDataFileFilter(JobContext job) { + private PathFilter getDataFileFilter() { return new CarbonPathFilter(getUpdateExtension()); } @@ -676,27 +575,14 @@ private String getUpdateExtension() { } /** - * @return updateExtension + * return valid segment to access */ - private String[] getSegmentsFromConfiguration(JobContext job) - throws IOException { + private String[] getSegmentsToAccess(JobContext job) throws IOException { String segmentString = job.getConfiguration().get(INPUT_SEGMENT_NUMBERS, ""); - // if no segments if (segmentString.trim().isEmpty()) { return new String[0]; } - - String[] segments = segmentString.split(","); - String[] segmentIds = new String[segments.length]; - int i = 0; - try { - for (; i < segments.length; i++) { - segmentIds[i] = segments[i]; - } - } catch (NumberFormatException e) { - throw new IOException("segment no:" + segments[i] + " should be integer"); - } - return segmentIds; + return segmentString.split(","); } /** @@ -709,28 +595,4 @@ private String[] getValidPartitions(JobContext job) { return new String[] { "0" }; } - /** - * Thread class to load the blocks - */ - private class BlocksLoaderThread implements Callable> { - // job - private JobContext job; - - // table identifier - private AbsoluteTableIdentifier absoluteTableIdentifier; - - // segment id - private String segmentId; - - private BlocksLoaderThread(JobContext job, AbsoluteTableIdentifier absoluteTableIdentifier, - String segmentId) { - this.job = job; - this.absoluteTableIdentifier = absoluteTableIdentifier; - this.segmentId = segmentId; - } - - @Override public Map call() throws Exception { - return getSegmentAbstractIndexs(job, absoluteTableIdentifier, segmentId); - } - } } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index ffcd663ff03..132ee439752 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -22,8 +22,13 @@ import java.io.DataOutput; import java.io.IOException; import java.io.Serializable; +import java.util.ArrayList; import java.util.List; +import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; +import org.apache.carbondata.core.carbon.datastore.block.Distributable; +import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; +import org.apache.carbondata.core.carbon.path.CarbonTablePath; import org.apache.carbondata.hadoop.internal.index.Block; import org.apache.hadoop.fs.Path; @@ -33,23 +38,36 @@ /** * Carbon input split to allow distributed read of CarbonInputFormat. */ -public class CarbonInputSplit extends FileSplit implements Serializable, Writable, Block { +public class CarbonInputSplit extends FileSplit implements Distributable, Serializable, Writable, + Block { private static final long serialVersionUID = 3520344046772190207L; private String segmentId; - /** + public String taskId; + + /* + * Invalid segments that need to be removed in task side index + */ + private List invalidSegments; + + /* * Number of BlockLets in a block */ - private int numberOfBlocklets = 0; + private int numberOfBlocklets; - public CarbonInputSplit() { - super(null, 0, 0, new String[0]); + public CarbonInputSplit() { + segmentId = null; + taskId = "0"; + numberOfBlocklets = 0; + invalidSegments = new ArrayList<>(); } - public CarbonInputSplit(String segmentId, Path path, long start, long length, + private CarbonInputSplit(String segmentId, Path path, long start, long length, String[] locations) { super(path, start, length, locations); this.segmentId = segmentId; + this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName()); + this.invalidSegments = new ArrayList<>(); } public CarbonInputSplit(String segmentId, Path path, long start, long length, @@ -67,16 +85,33 @@ public String getSegmentId() { return segmentId; } - @Override public void readFields(DataInput in) throws IOException { - + @Override + public void readFields(DataInput in) throws IOException { super.readFields(in); this.segmentId = in.readUTF(); - + int numInvalidSegment = in.readInt(); + invalidSegments = new ArrayList<>(numInvalidSegment); + for (int i = 0; i < numInvalidSegment; i++) { + invalidSegments.add(in.readUTF()); + } } - @Override public void write(DataOutput out) throws IOException { + @Override + public void write(DataOutput out) throws IOException { super.write(out); out.writeUTF(segmentId); + out.writeInt(invalidSegments.size()); + for (String invalidSegment: invalidSegments) { + out.writeUTF(invalidSegment); + } + } + + public List getInvalidSegments(){ + return invalidSegments; + } + + public void setInvalidSegments(List invalidSegments) { + this.invalidSegments = invalidSegments; } /** @@ -87,6 +122,63 @@ public int getNumberOfBlocklets() { return numberOfBlocklets; } + @Override + public int compareTo(Distributable o) { + CarbonInputSplit other = (CarbonInputSplit)o; + int compareResult = 0; + // get the segment id + // converr seg ID to double. + + double seg1 = Double.parseDouble(segmentId); + double seg2 = Double.parseDouble(other.getSegmentId()); + if (seg1 - seg2 < 0) { + return -1; + } + if (seg1 - seg2 > 0) { + return 1; + } + + // Comparing the time task id of the file to other + // if both the task id of the file is same then we need to compare the + // offset of + // the file + String filePath1 = this.getPath().getName(); + String filePath2 = other.getPath().getName(); + if (CarbonTablePath.isCarbonDataFile(filePath1)) { + int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1)); + int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2)); + if (firstTaskId != otherTaskId) { + return firstTaskId - otherTaskId; + } + // compare the part no of both block info + int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1)); + int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2)); + compareResult = firstPartNo - SecondPartNo; + } else { + compareResult = filePath1.compareTo(filePath2); + } + if (compareResult != 0) { + return compareResult; + } + return 0; + } + + public static List createBlocks(List splitList) { + List tableBlockInfoList = new ArrayList<>(); + for (CarbonInputSplit split : splitList) { + BlockletInfos blockletInfos = new BlockletInfos(split.getNumberOfBlocklets(), 0, + split.getNumberOfBlocklets()); + try { + tableBlockInfoList.add( + new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(), + split.getLocations(), split.getLength(), blockletInfos)); + } catch (IOException e) { + throw new RuntimeException("fail to get location of split: " + split, e); + } + } + return tableBlockInfoList; + } + @Override public String getBlockPath() { return getPath().getName(); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java new file mode 100644 index 00000000000..a13d6bab399 --- /dev/null +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonMultiBlockSplit.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.hadoop; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.InputSplit; + +/** + * This class wraps multiple blocks belong to a same node to one split. + * So the scanning task will scan multiple blocks. This is an optimization for concurrent query. + */ +public class CarbonMultiBlockSplit extends InputSplit implements Writable { + + /* + * Splits (HDFS Blocks) for task to scan. + */ + private List splitList; + + /* + * The location of all wrapped splits belong to the same node + */ + private String location; + + public CarbonMultiBlockSplit() { + splitList = null; + location = null; + } + + public CarbonMultiBlockSplit(AbsoluteTableIdentifier identifier, List splitList, + String location) throws IOException { + this.splitList = splitList; + this.location = location; + } + + /** + * Return all splits for scan + * @return split list for scan + */ + public List getAllSplits() { + return splitList; + } + + @Override + public long getLength() throws IOException, InterruptedException { + long total = 0; + for (InputSplit split: splitList) { + total += split.getLength(); + } + return total; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{location}; + } + + @Override + public void write(DataOutput out) throws IOException { + // write number of splits and then write all splits + out.writeInt(splitList.size()); + for (CarbonInputSplit split: splitList) { + split.write(out); + } + out.writeUTF(location); + } + + @Override + public void readFields(DataInput in) throws IOException { + // read all splits + int numSplit = in.readInt(); + splitList = new ArrayList<>(numSplit); + for (int i = 0; i < numSplit; i++) { + CarbonInputSplit split = new CarbonInputSplit(); + split.readFields(in); + splitList.add(split); + } + location = in.readUTF(); + } + +} diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java index 443922c3409..91e6e46a681 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java @@ -25,7 +25,6 @@ import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.cache.dictionary.Dictionary; -import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport; @@ -33,7 +32,6 @@ import org.apache.carbondata.scan.executor.QueryExecutorFactory; import org.apache.carbondata.scan.executor.exception.QueryExecutionException; import org.apache.carbondata.scan.model.QueryModel; -import org.apache.carbondata.scan.result.BatchResult; import org.apache.carbondata.scan.result.iterator.ChunkRowIterator; import org.apache.hadoop.mapreduce.InputSplit; @@ -59,22 +57,28 @@ public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport readSuppor this.queryExecutor = QueryExecutorFactory.getQueryExecutor(); } - @Override public void initialize(InputSplit split, TaskAttemptContext context) + @Override + public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException { - CarbonInputSplit carbonInputSplit = (CarbonInputSplit) split; - List tableBlockInfoList = new ArrayList(); - BlockletInfos blockletInfos = new BlockletInfos(carbonInputSplit.getNumberOfBlocklets(), 0, - carbonInputSplit.getNumberOfBlocklets()); - tableBlockInfoList.add( - new TableBlockInfo(carbonInputSplit.getPath().toString(), carbonInputSplit.getStart(), - carbonInputSplit.getSegmentId(), carbonInputSplit.getLocations(), - carbonInputSplit.getLength(), blockletInfos)); + // The input split can contain single HDFS block or multiple blocks, so firstly get all the + // blocks and then set them in the query model. + List splitList; + if (inputSplit instanceof CarbonInputSplit) { + splitList = new ArrayList<>(1); + splitList.add((CarbonInputSplit) inputSplit); + } else if (inputSplit instanceof CarbonMultiBlockSplit){ + // contains multiple blocks, this is an optimization for concurrent query. + CarbonMultiBlockSplit multiBlockSplit = (CarbonMultiBlockSplit) inputSplit; + splitList = multiBlockSplit.getAllSplits(); + } else { + throw new RuntimeException("unsupported input split type: " + inputSplit); + } + List tableBlockInfoList = CarbonInputSplit.createBlocks(splitList); queryModel.setTableBlockInfos(tableBlockInfoList); - readSupport - .intialize(queryModel.getProjectionColumns(), queryModel.getAbsoluteTableIdentifier()); + readSupport.initialize(queryModel.getProjectionColumns(), + queryModel.getAbsoluteTableIdentifier()); try { - carbonIterator = - new ChunkRowIterator((CarbonIterator) queryExecutor.execute(queryModel)); + carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel)); } catch (QueryExecutionException e) { throw new InterruptedException(e.getMessage()); } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java index d3419c20066..1b7f577097d 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/CarbonReadSupport.java @@ -31,7 +31,7 @@ public interface CarbonReadSupport { * * @param carbonColumns */ - public void intialize(CarbonColumn[] carbonColumns, + public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier); public T readRow(Object[] data); diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java index 6832789d8ff..fa8ba6ea027 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/AbstractDictionaryDecodedReadSupport.java @@ -51,7 +51,7 @@ public abstract class AbstractDictionaryDecodedReadSupport implements CarbonR * @param carbonColumns * @param absoluteTableIdentifier */ - @Override public void intialize(CarbonColumn[] carbonColumns, + @Override public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier) { this.carbonColumns = carbonColumns; dictionaries = new Dictionary[carbonColumns.length]; diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java index dae83a16b56..4fbcb059222 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/ArrayWritableReadSupport.java @@ -26,7 +26,7 @@ public class ArrayWritableReadSupport implements CarbonReadSupport { - @Override public void intialize(CarbonColumn[] carbonColumns, + @Override public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier) { } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java index 578ab4399c8..59b45adb8e8 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/readsupport/impl/RawDataReadSupport.java @@ -24,7 +24,8 @@ public class RawDataReadSupport implements CarbonReadSupport { - @Override public void intialize(CarbonColumn[] carbonColumns, + @Override + public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier) { } @@ -34,7 +35,8 @@ public class RawDataReadSupport implements CarbonReadSupport { * @param data * @return */ - @Override public Object[] readRow(Object[] data) { + @Override + public Object[] readRow(Object[] data) { return data; } diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java index 0d94da453c0..75e004da811 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java @@ -19,12 +19,14 @@ package org.apache.carbondata.hadoop.util; +import java.io.IOException; import java.util.List; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure; +import org.apache.carbondata.hadoop.CarbonInputFormat; import org.apache.carbondata.scan.expression.Expression; import org.apache.carbondata.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.scan.filter.resolver.FilterResolverIntf; @@ -33,6 +35,11 @@ import org.apache.carbondata.scan.model.QueryMeasure; import org.apache.carbondata.scan.model.QueryModel; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + + /** * Utility class */ @@ -81,6 +88,13 @@ public static CarbonQueryPlan createQueryPlan(CarbonTable carbonTable, String co return plan; } + public static CarbonInputFormat createCarbonInputFormat(AbsoluteTableIdentifier identifier, + Job job) throws IOException { + CarbonInputFormat carbonInputFormat = new CarbonInputFormat<>(); + FileInputFormat.addInputPath(job, new Path(identifier.getTablePath())); + return carbonInputFormat; + } + private static void addQueryMeasure(CarbonQueryPlan plan, int order, CarbonMeasure measure) { QueryMeasure queryMeasure = new QueryMeasure(measure.getColName()); queryMeasure.setQueryOrder(order); diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java index 1c21a50709d..8693646912c 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/ft/CarbonInputMapperTest.java @@ -47,7 +47,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; public class CarbonInputMapperTest extends TestCase { @@ -61,8 +60,8 @@ public class CarbonInputMapperTest extends TestCase { try { String outPath = "target/output"; runJob(outPath, null, null); - Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000); - Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 7); + Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 7, countTheColumns(outPath)); } catch (Exception e) { Assert.assertTrue("failed", false); e.printStackTrace(); @@ -79,8 +78,8 @@ public class CarbonInputMapperTest extends TestCase { carbonProjection.addColumn("salary"); runJob(outPath, carbonProjection, null); - Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 1000); - Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3); + Assert.assertEquals("Count lines are not matching", 1000, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); } catch (Exception e) { Assert.assertTrue("failed", false); } @@ -97,8 +96,8 @@ public class CarbonInputMapperTest extends TestCase { new EqualToExpression(new ColumnExpression("country", DataType.STRING), new LiteralExpression("france", DataType.STRING)); runJob(outPath, carbonProjection, expression); - Assert.assertTrue("Count lines are not matching", countTheLines(outPath) == 101); - Assert.assertTrue("Column count are not matching", countTheColumns(outPath) == 3); + Assert.assertEquals("Count lines are not matching", 101, countTheLines(outPath)); + Assert.assertEquals("Column count are not matching", 3, countTheColumns(outPath)); } catch (Exception e) { Assert.assertTrue("failed", false); } @@ -171,7 +170,7 @@ private void runJob(String outPath, CarbonProjection projection, Expression filt job.setOutputFormatClass(TextOutputFormat.class); AbsoluteTableIdentifier abs = StoreCreator.getAbsoluteTableIdentifier(); if (projection != null) { - CarbonInputFormat.setColumnProjection(projection, job.getConfiguration()); + CarbonInputFormat.setColumnProjection(job.getConfiguration(), projection); } if (filter != null) { CarbonInputFormat.setFilterPredicates(job.getConfiguration(), filter); diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 3d670a2f6d1..f2a1f9f85ee 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -50,9 +50,7 @@ import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnIdentifier; -import org.apache.carbondata.core.carbon.datastore.block.BlockletInfos; import org.apache.carbondata.core.carbon.datastore.block.Distributable; -import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo; import org.apache.carbondata.core.carbon.metadata.CarbonMetadata; import org.apache.carbondata.core.carbon.metadata.datatype.DataType; import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable; @@ -236,9 +234,7 @@ public static void deletePartialLoadDataIfExist(CarbonLoadModel loadModel, final boolean isCompactionFlow) throws IOException { CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); String metaDataLocation = carbonTable.getMetaDataFilepath(); - SegmentStatusManager segmentStatusManager = - new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier()); - final LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation); + final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(loadModel.getStorePath(), carbonTable.getCarbonTableIdentifier()); @@ -404,10 +400,7 @@ public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails load absoluteTableIdentifier.getCarbonTableIdentifier()); String tableStatusPath = carbonTablePath.getTableStatusFilePath(); - - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); - - ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); + ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier); try { if (carbonLock.lockWithRetries()) { @@ -416,7 +409,7 @@ public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails load + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); String loadEnddate = readCurrentTime(); loadMetadataDetails.setTimestamp(loadEnddate); @@ -434,7 +427,7 @@ public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails load } listOfLoadFolderDetails.add(loadMetadataDetails); - segmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails + SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()])); status = true; @@ -579,9 +572,13 @@ public static Map> getRequiredExecutors( List flattenedList = new ArrayList(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); for (Distributable blockInfo : blockInfos) { - for (String eachNode : blockInfo.getLocations()) { - NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode); - flattenedList.add(nbr); + try { + for (String eachNode : blockInfo.getLocations()) { + NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode); + flattenedList.add(nbr); + } + } catch (IOException e) { + throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e); } } // sort the flattened data. @@ -646,7 +643,7 @@ public static Map> nodeBlockMapping(List blockInfos, // put the blocks in the set uniqueBlocks.add(blockInfo); - for (String eachNode : blockInfo.getLocations()) { - NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode); - flattenedList.add(nbr); - nodeList.add(eachNode); + try { + for (String eachNode : blockInfo.getLocations()) { + NodeBlockRelation nbr = new NodeBlockRelation(blockInfo, eachNode); + flattenedList.add(nbr); + nodeList.add(eachNode); + } + } catch (IOException e) { + throw new RuntimeException("error getting location of block: " + blockInfo.toString(), e); } } } @@ -953,66 +954,6 @@ public static String[] getConfiguredLocalDirs(SparkConf conf) { return Utils.getConfiguredLocalDirs(conf); } - /** - * method to distribute the blocklets of a block in multiple blocks - * @param blockInfoList - * @param defaultParallelism - * @return - */ - public static List distributeBlockLets(List blockInfoList, - int defaultParallelism) { - String blockletDistributionString = CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION, - CarbonCommonConstants.ENABLE_BLOCKLET_DISTRIBUTION_DEFAULTVALUE); - boolean isBlockletDistributionEnabled = Boolean.parseBoolean(blockletDistributionString); - LOGGER.info("No.Of Blocks before Blocklet distribution: " + blockInfoList.size()); - List tableBlockInfos = new ArrayList(); - if (blockInfoList.size() < defaultParallelism && isBlockletDistributionEnabled) { - for (TableBlockInfo tableBlockInfo : blockInfoList) { - int noOfBlockLets = tableBlockInfo.getBlockletInfos().getNoOfBlockLets(); - LOGGER.info( - "No.Of blocklet : " + noOfBlockLets + ".Minimum blocklets required for distribution : " - + minBlockLetsReqForDistribution); - if (noOfBlockLets < minBlockLetsReqForDistribution) { - tableBlockInfos.add(tableBlockInfo); - continue; - } - TableBlockInfo tableBlockInfo1 = null; - int rem = noOfBlockLets % minBlockLetsReqForDistribution; - int count = noOfBlockLets / minBlockLetsReqForDistribution; - if (rem > 0) { - count = count + 1; - } - for (int i = 0; i < count; i++) { - BlockletInfos blockletInfos = new BlockletInfos(); - blockletInfos.setStartBlockletNumber(i * minBlockLetsReqForDistribution); - blockletInfos.setNumberOfBlockletToScan(minBlockLetsReqForDistribution); - blockletInfos.setNoOfBlockLets(blockletInfos.getNoOfBlockLets()); - tableBlockInfo1 = - new TableBlockInfo(tableBlockInfo.getFilePath(), tableBlockInfo.getBlockOffset(), - tableBlockInfo.getSegmentId(), tableBlockInfo.getLocations(), - tableBlockInfo.getBlockLength(), blockletInfos); - tableBlockInfos.add(tableBlockInfo1); - } - //if rem is greater than 0 then for the last block - if (rem > 0) { - tableBlockInfo1.getBlockletInfos().setNumberOfBlockletToScan(rem); - } - } - } - if (tableBlockInfos.size() == 0) { - { - for (TableBlockInfo tableBlockInfo : blockInfoList) { - tableBlockInfos.add(tableBlockInfo); - } - LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size()); - return tableBlockInfos; - } - } - LOGGER.info("No.Of Blocks after Blocklet distribution: " + tableBlockInfos.size()); - return tableBlockInfos; - } - /** * This will update the old table status details before clean files to the latest table status. * @param oldList diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java index 6e8ab0c49e9..9be4d475c8e 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java @@ -133,12 +133,7 @@ public static boolean updateLoadMetadataWithMergeStatus(List { - @Override public void intialize(CarbonColumn[] carbonColumns, + @Override public void initialize(CarbonColumn[] carbonColumns, AbsoluteTableIdentifier absoluteTableIdentifier) { - super.intialize(carbonColumns, absoluteTableIdentifier); - //can intialize and generate schema here. + super.initialize(carbonColumns, absoluteTableIdentifier); + //can initialize and generate schema here. } @Override public Row readRow(Object[] data) { @@ -52,6 +52,9 @@ public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSuppor case TIMESTAMP: data[i] = new Timestamp((long) data[i] / 1000); break; + case LONG: + data[i] = data[i]; + break; default: } } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) { diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java index 6b565454aba..11cf9f844ee 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java @@ -44,9 +44,7 @@ public static boolean isLoadDeletionRequired(CarbonLoadModel loadModel) { .getCarbonTable(loadModel.getDatabaseName() + '_' + loadModel.getTableName()); String metaDataLocation = table.getMetaDataFilepath(); - SegmentStatusManager segmentStatusManager = - new SegmentStatusManager(table.getAbsoluteTableIdentifier()); - LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metaDataLocation); + LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); if (details != null && details.length != 0) { for (LoadMetadataDetails oneRow : details) { if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneRow.getLoadStatus()) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 4edfa6b9dde..8445440ceeb 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -93,9 +93,8 @@ object CarbonDataRDDFactory { // Delete the records based on data val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance .getCarbonTable(databaseName + "_" + tableName) - val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier) val loadMetadataDetailsArray = - segmentStatusManager.readLoadMetadata(table.getMetaDataFilepath()).toList + SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath).toList val resultMap = new CarbonDeleteLoadByDateRDD( sc.sparkContext, new DeletedLoadResultImpl(), @@ -1055,10 +1054,7 @@ object CarbonDataRDDFactory { def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = { val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath - val segmentStatusManager = - new SegmentStatusManager( - model.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier) - val details = segmentStatusManager.readLoadMetadata(metadataPath) + val details = SegmentStatusManager.readLoadMetadata(metadataPath) model.setLoadMetadataDetails(details.toList.asJava) } @@ -1070,9 +1066,7 @@ object CarbonDataRDDFactory { if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) { val loadMetadataFilePath = CarbonLoaderUtil .extractLoadMetadataFileLocation(carbonLoadModel) - val segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier) - val details = segmentStatusManager - .readLoadMetadata(loadMetadataFilePath) + val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath) val carbonTableStatusLock = CarbonLockFactory .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.TABLE_STATUS_LOCK) @@ -1089,17 +1083,16 @@ object CarbonDataRDDFactory { LOGGER.info("Table status lock has been successfully acquired.") // read latest table status again. - val latestMetadata = segmentStatusManager.readLoadMetadata(loadMetadataFilePath) + val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath) // update the metadata details from old to new status. val latestStatus = CarbonLoaderUtil - .updateLoadMetadataFromOldToNew(details, latestMetadata) + .updateLoadMetadataFromOldToNew(details, latestMetadata) CarbonLoaderUtil.writeLoadMetadata( carbonLoadModel.getCarbonDataLoadSchema, carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, latestStatus - ) + carbonLoadModel.getTableName, latestStatus) } else { val errorMsg = "Clean files request is failed for " + s"${ carbonLoadModel.getDatabaseName }." + @@ -1109,7 +1102,6 @@ object CarbonDataRDDFactory { LOGGER.audit(errorMsg) LOGGER.error(errorMsg) throw new Exception(errorMsg + " Please try after some time.") - } } finally { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 6c2e99317b9..4e820c60eb6 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -21,8 +21,11 @@ import java.util import java.util.{Collections, List} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Random +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._ import org.apache.spark.rdd.RDD @@ -32,22 +35,19 @@ import org.apache.spark.sql.hive.DistributionUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, -TableBlockInfo, TableTaskInfo, TaskBlockInfo} +import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo} import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter -import org.apache.carbondata.core.carbon.path.CarbonTablePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException} -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} -import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, -CarbonCompactionUtil, RowResultMerger} +import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit} +import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil +import org.apache.carbondata.integration.spark.merger.{CarbonCompactionExecutor, CarbonCompactionUtil, RowResultMerger} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.processing.util.CarbonDataProcessorUtil import org.apache.carbondata.scan.result.iterator.RawResultIterator import org.apache.carbondata.spark.MergeResult import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.splits.TableSplit -import org.apache.carbondata.spark.util.QueryPlanUtil class CarbonMergerRDD[K, V]( @@ -103,17 +103,17 @@ class CarbonMergerRDD[K, V]( var mergeNumber = "" var exec: CarbonCompactionExecutor = null try { - var dataloadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition] // get destination segment properties as sent from driver which is of last segment. - val segmentProperties = new SegmentProperties(carbonMergerMapping.maxSegmentColumnSchemaList - .asJava, + val segmentProperties = new SegmentProperties( + carbonMergerMapping.maxSegmentColumnSchemaList.asJava, carbonMergerMapping.maxSegmentColCardinality) // sorting the table block info List. - val tableBlockInfoList = carbonSparkPartition.tableBlockInfos + val splitList = carbonSparkPartition.split.value.getAllSplits + val tableBlockInfoList = CarbonInputSplit.createBlocks(splitList) Collections.sort(tableBlockInfoList) @@ -214,86 +214,46 @@ class CarbonMergerRDD[K, V]( override def getPreferredLocations(split: Partition): Seq[String] = { val theSplit = split.asInstanceOf[CarbonSparkPartition] - theSplit.locations.filter(_ != "localhost") + theSplit.split.value.getLocations.filter(_ != "localhost") } override def getPartitions: Array[Partition] = { - val startTime = System.currentTimeMillis() val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier( storePath, new CarbonTableIdentifier(databaseName, factTableName, tableId) ) - val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = - QueryPlanUtil.createCarbonInputFormat(absoluteTableIdentifier) + val jobConf: JobConf = new JobConf(new Configuration) + val job: Job = new Job(jobConf) + val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job) var defaultParallelism = sparkContext.defaultParallelism val result = new util.ArrayList[Partition](defaultParallelism) // mapping of the node and block list. - var nodeMapping: util.Map[String, util.List[Distributable]] = new + var nodeBlockMapping: util.Map[String, util.List[Distributable]] = new util.HashMap[String, util.List[Distributable]] - var noOfBlocks = 0 - - val taskInfoList = new util.ArrayList[Distributable] - - var blocksOfLastSegment: List[TableBlockInfo] = null + val noOfBlocks = 0 + var carbonInputSplits = mutable.Seq[CarbonInputSplit]() // for each valid segment. for (eachSeg <- carbonMergerMapping.validSegments) { // map for keeping the relation of a task and its blocks. - val taskIdMapping: util.Map[String, util.List[TableBlockInfo]] = new - util.HashMap[String, util.List[TableBlockInfo]] - job.getConfiguration.set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg) // get splits - val splits = carbonInputFormat.getSplits(job) - val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - - // take the blocks of one segment. - val blocksOfOneSegment = carbonInputSplits.map(inputSplit => - new TableBlockInfo(inputSplit.getPath.toString, - inputSplit.getStart, inputSplit.getSegmentId, - inputSplit.getLocations, inputSplit.getLength - ) - ) - - // keep on assigning till last one is reached. - if (null != blocksOfOneSegment && blocksOfOneSegment.nonEmpty) { - blocksOfLastSegment = blocksOfOneSegment.asJava - } - - // populate the task and its block mapping. - blocksOfOneSegment.foreach(tableBlockInfo => { - val taskNo = CarbonTablePath.DataFileUtil.getTaskNo(tableBlockInfo.getFilePath) - val blockList = taskIdMapping.get(taskNo) - if (null == blockList) { - val blockListTemp = new util.ArrayList[TableBlockInfo]() - blockListTemp.add(tableBlockInfo) - taskIdMapping.put(taskNo, blockListTemp) - } else { - blockList.add(tableBlockInfo) - } - } - ) - - noOfBlocks += blocksOfOneSegment.size - taskIdMapping.asScala.foreach( - entry => - taskInfoList.add(new TableTaskInfo(entry._1, entry._2).asInstanceOf[Distributable]) - ) + val splits = format.getSplits(job) + carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) } // prepare the details required to extract the segment properties using last segment. - if (null != blocksOfLastSegment && blocksOfLastSegment.size > 0) { - val lastBlockInfo = blocksOfLastSegment.get(blocksOfLastSegment.size - 1) - + if (null != carbonInputSplits && carbonInputSplits.nonEmpty) { + val carbonInputSplit = carbonInputSplits.last var dataFileFooter: DataFileFooter = null try { - dataFileFooter = CarbonUtil.readMetadatFile(lastBlockInfo.getFilePath, - lastBlockInfo.getBlockOffset, lastBlockInfo.getBlockLength) + dataFileFooter = CarbonUtil.readMetadatFile(carbonInputSplit.getPath.toString(), + carbonInputSplit.getStart, carbonInputSplit.getLength) } catch { case e: CarbonUtilException => logError("Exception in preparing the data file footer for compaction " + e.getMessage) @@ -306,16 +266,17 @@ class CarbonMergerRDD[K, V]( .toList } // send complete list of blocks to the mapping util. - nodeMapping = CarbonLoaderUtil.nodeBlockMapping(taskInfoList, -1) + nodeBlockMapping = CarbonLoaderUtil.nodeBlockMapping( + carbonInputSplits.map(_.asInstanceOf[Distributable]).asJava, -1) val confExecutors = confExecutorsTemp.toInt - val requiredExecutors = if (nodeMapping.size > confExecutors) { + val requiredExecutors = if (nodeBlockMapping.size > confExecutors) { confExecutors - } else { nodeMapping.size() } + } else { nodeBlockMapping.size() } CarbonContext.ensureExecutors(sparkContext, requiredExecutors) - logInfo("No.of Executors required=" + requiredExecutors - + " , spark.executor.instances=" + confExecutors - + ", no.of.nodes where data present=" + nodeMapping.size()) + logInfo("No.of Executors required=" + requiredExecutors + + " , spark.executor.instances=" + confExecutors + + ", no.of.nodes where data present=" + nodeBlockMapping.size()) var nodes = DistributionUtil.getNodeList(sparkContext) var maxTimes = 30 while (nodes.length < requiredExecutors && maxTimes > 0) { @@ -327,24 +288,23 @@ class CarbonMergerRDD[K, V]( defaultParallelism = sparkContext.defaultParallelism var i = 0 - val nodeTaskBlocksMap: util.Map[String, util.List[NodeInfo]] = new util.HashMap[String, util - .List[NodeInfo]]() + val nodeTaskBlocksMap = new util.HashMap[String, util.List[NodeInfo]]() // Create Spark Partition for each task and assign blocks - nodeMapping.asScala.foreach { entry => - - val taskBlockList: List[NodeInfo] = new util.ArrayList[NodeInfo](0) - nodeTaskBlocksMap.put(entry._1, taskBlockList) - - val list = new util.ArrayList[TableBlockInfo] - entry._2.asScala.foreach(taskInfo => { - val blocksPerNode = taskInfo.asInstanceOf[TableTaskInfo] - list.addAll(blocksPerNode.getTableBlockInfoList) - taskBlockList - .add(new NodeInfo(blocksPerNode.getTaskId, blocksPerNode.getTableBlockInfoList.size)) - }) - if (list.size() != 0) { - result.add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, list)) + nodeBlockMapping.asScala.foreach { case (nodeName, blockList) => + val taskBlockList = new util.ArrayList[NodeInfo](0) + nodeTaskBlocksMap.put(nodeName, taskBlockList) + var blockletCount = 0 + blockList.asScala.foreach { taskInfo => + val blocksPerNode = taskInfo.asInstanceOf[CarbonInputSplit] + blockletCount = blockletCount + blocksPerNode.getNumberOfBlocklets + taskBlockList.add( + NodeInfo(blocksPerNode.taskId, blocksPerNode.getNumberOfBlocklets)) + } + if (blockletCount != 0) { + val multiBlockSplit = new CarbonMultiBlockSplit(absoluteTableIdentifier, + carbonInputSplits.asJava, nodeName) + result.add(new CarbonSparkPartition(id, i, multiBlockSplit)) i += 1 } } @@ -360,17 +320,14 @@ class CarbonMergerRDD[K, V]( val noOfNodes = nodes.length val noOfTasks = result.size - logInfo(s"Identified no.of.Blocks: $noOfBlocks," - + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks" - ) - logInfo("Time taken to identify Blocks to scan: " + (System - .currentTimeMillis() - startTime) - ) - for (j <- 0 until result.size) { - val cp = result.get(j).asInstanceOf[CarbonSparkPartition] - logInfo(s"Node: " + cp.locations.toSeq.mkString(",") - + ", No.Of Blocks: " + cp.tableBlockInfos.size - ) + logInfo(s"Identified no.of.Blocks: $noOfBlocks," + + s"parallelism: $defaultParallelism , no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks") + logInfo("Time taken to identify Blocks to scan : " + (System.currentTimeMillis() - startTime)) + for (j <- 0 until result.size ) { + val multiBlockSplit = result.get(j).asInstanceOf[CarbonSparkPartition].split.value + val splitList = multiBlockSplit.getAllSplits + logInfo(s"Node: ${multiBlockSplit.getLocations.mkString(",")}, No.Of Blocks: " + + s"${CarbonInputSplit.createBlocks(splitList).size}") } result.toArray(new Array[Partition](result.size)) } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala index 7798e5c51c6..e8d73993851 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala @@ -17,49 +17,43 @@ package org.apache.carbondata.spark.rdd +import java.text.SimpleDateFormat import java.util +import java.util.Date import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.InputSplit -import org.apache.hadoop.mapreduce.Job -import org.apache.spark.{Partition, SparkContext, TaskContext} +import org.apache.hadoop.mapreduce.{InputSplit, Job, JobID} +import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext, TaskKilledException} +import org.apache.spark.mapred.CarbonHadoopMapReduceUtil import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.DistributionUtil -import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory -import org.apache.carbondata.core.cache.dictionary.Dictionary -import org.apache.carbondata.core.carbon.datastore.block.{BlockletInfos, TableBlockInfo} -import org.apache.carbondata.core.carbon.datastore.SegmentTaskIndexStore +import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier +import org.apache.carbondata.core.carbon.datastore.block.Distributable +import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory -import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit} -import org.apache.carbondata.lcm.status.SegmentStatusManager -import org.apache.carbondata.scan.executor.QueryExecutor -import org.apache.carbondata.scan.executor.QueryExecutorFactory +import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit, CarbonProjection} +import org.apache.carbondata.hadoop.readsupport.impl.RawDataReadSupport import org.apache.carbondata.scan.expression.Expression -import org.apache.carbondata.scan.model.QueryModel -import org.apache.carbondata.scan.result.BatchResult -import org.apache.carbondata.scan.result.iterator.ChunkRowIterator -import org.apache.carbondata.spark.RawValue import org.apache.carbondata.spark.load.CarbonLoaderUtil -import org.apache.carbondata.spark.util.QueryPlanUtil - -class CarbonSparkPartition(rddId: Int, val idx: Int, - val locations: Array[String], - val tableBlockInfos: util.List[TableBlockInfo]) +class CarbonSparkPartition( + val rddId: Int, + val idx: Int, + @transient val multiBlockSplit: CarbonMultiBlockSplit) extends Partition { + val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit) + override val index: Int = idx - // val serializableHadoopSplit = new SerializableWritable[Array[String]](locations) - override def hashCode(): Int = { - 41 * (41 + rddId) + idx - } + override def hashCode(): Int = 41 * (41 + rddId) + idx } /** @@ -68,169 +62,135 @@ class CarbonSparkPartition(rddId: Int, val idx: Int, * level filtering in driver side. */ class CarbonScanRDD[V: ClassTag]( - sc: SparkContext, - queryModel: QueryModel, + @transient sc: SparkContext, + columnProjection: Seq[Attribute], filterExpression: Expression, - keyClass: RawValue[V], - @transient conf: Configuration, - tableCreationTime: Long, - schemaLastUpdatedTime: Long, - baseStoreLocation: String) - extends RDD[V](sc, Nil) { + identifier: AbsoluteTableIdentifier, + @transient carbonTable: CarbonTable) + extends RDD[V](sc, Nil) + with CarbonHadoopMapReduceUtil + with Logging { + + private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "") + private val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + formatter.format(new Date()) + } + @transient private val jobId = new JobID(jobTrackerId, id) + @transient val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) override def getPartitions: Array[Partition] = { - var defaultParallelism = sparkContext.defaultParallelism - val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder() - val (carbonInputFormat: CarbonInputFormat[Array[Object]], job: Job) = - QueryPlanUtil.createCarbonInputFormat(queryModel.getAbsoluteTableIdentifier) + val job = Job.getInstance(new Configuration()) + val format = prepareInputFormatForDriver(job.getConfiguration) // initialise query_id for job - job.getConfiguration.set("query.id", queryModel.getQueryId) - - val result = new util.ArrayList[Partition](defaultParallelism) - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val validAndInvalidSegments = new SegmentStatusManager(queryModel.getAbsoluteTableIdentifier) - .getValidAndInvalidSegments - // set filter resolver tree - try { - // before applying filter check whether segments are available in the table. - if (!validAndInvalidSegments.getValidSegments.isEmpty) { - val filterResolver = carbonInputFormat - .getResolvedFilter(job.getConfiguration, filterExpression) - CarbonInputFormat.setFilterPredicates(job.getConfiguration, filterResolver) - queryModel.setFilterExpressionResolverTree(filterResolver) - CarbonInputFormat - .setSegmentsToAccess(job.getConfiguration, - validAndInvalidSegments.getValidSegments - ) - SegmentTaskIndexStore.getInstance() - .removeTableBlocks(validAndInvalidSegments.getInvalidSegments, - queryModel.getAbsoluteTableIdentifier - ) - } - } catch { - case e: Exception => - LOGGER.error(e) - sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") - } + job.getConfiguration.set("query.id", queryId) + // get splits - val splits = carbonInputFormat.getSplits(job) + val splits = format.getSplits(job) + val result = distributeSplits(splits) + result + } + + private def distributeSplits(splits: util.List[InputSplit]): Array[Partition] = { + // this function distributes the split based on following logic: + // 1. based on data locality, to make split balanced on all available nodes + // 2. if the number of split for one + + var statistic = new QueryStatistic() + val statisticRecorder = CarbonTimeStatisticsFactory.createDriverRecorder() + val parallelism = sparkContext.defaultParallelism + val result = new util.ArrayList[Partition](parallelism) + var noOfBlocks = 0 + var noOfNodes = 0 + var noOfTasks = 0 + if (!splits.isEmpty) { - val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit]) - queryModel.setInvalidSegmentIds(validAndInvalidSegments.getInvalidSegments) - val blockListTemp = carbonInputSplits.map(inputSplit => - new TableBlockInfo(inputSplit.getPath.toString, - inputSplit.getStart, inputSplit.getSegmentId, - inputSplit.getLocations, inputSplit.getLength, - new BlockletInfos(inputSplit.getNumberOfBlocklets, 0, inputSplit.getNumberOfBlocklets) - ) - ) - var activeNodes = Array[String]() - if (blockListTemp.nonEmpty) { - activeNodes = DistributionUtil - .ensureExecutorsAndGetNodeList(blockListTemp.toArray, sparkContext) - } - defaultParallelism = sparkContext.defaultParallelism - val blockList = CarbonLoaderUtil. - distributeBlockLets(blockListTemp.asJava, defaultParallelism).asScala - - if (blockList.nonEmpty) { - var statistic = new QueryStatistic() - // group blocks to nodes, tasks - val nodeBlockMapping = - CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, defaultParallelism, - activeNodes.toList.asJava - ) - statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) - statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId()) - statistic = new QueryStatistic() - var i = 0 - // Create Spark Partition for each task and assign blocks - nodeBlockMapping.asScala.foreach { entry => - entry._2.asScala.foreach { blocksPerTask => { - val tableBlockInfo = blocksPerTask.asScala.map(_.asInstanceOf[TableBlockInfo]) - if (blocksPerTask.size() != 0) { - result - .add(new CarbonSparkPartition(id, i, Seq(entry._1).toArray, tableBlockInfo.asJava)) - i += 1 - } + // create a list of block based on split + val blockList = splits.asScala.map(_.asInstanceOf[Distributable]) + + // get the list of executors and map blocks to executors based on locality + val activeNodes = DistributionUtil.ensureExecutorsAndGetNodeList(blockList, sparkContext) + + // divide the blocks among the tasks of the nodes as per the data locality + val nodeBlockMapping = CarbonLoaderUtil.nodeBlockTaskMapping(blockList.asJava, -1, + parallelism, activeNodes.toList.asJava) + + statistic.addStatistics(QueryStatisticsConstants.BLOCK_ALLOCATION, System.currentTimeMillis) + statisticRecorder.recordStatisticsForDriver(statistic, queryId) + statistic = new QueryStatistic() + + var i = 0 + // Create Spark Partition for each task and assign blocks + nodeBlockMapping.asScala.foreach { case (node, blockList) => + blockList.asScala.foreach { blocksPerTask => + val splits = blocksPerTask.asScala.map(_.asInstanceOf[CarbonInputSplit]) + if (blocksPerTask.size() != 0) { + val multiBlockSplit = new CarbonMultiBlockSplit(identifier, splits.asJava, node) + val partition = new CarbonSparkPartition(id, i, multiBlockSplit) + result.add(partition) + i += 1 } - } - } - val noOfBlocks = blockList.size - val noOfNodes = nodeBlockMapping.size - val noOfTasks = result.size() - logInfo(s"Identified no.of.Blocks: $noOfBlocks," - + s"parallelism: $defaultParallelism , " + - s"no.of.nodes: $noOfNodes, no.of.tasks: $noOfTasks" - ) - statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION, - System.currentTimeMillis) - statisticRecorder.recordStatisticsForDriver(statistic, queryModel.getQueryId()) - statisticRecorder.logStatisticsAsTableDriver() - result.asScala.foreach { r => - val cp = r.asInstanceOf[CarbonSparkPartition] - logInfo(s"Node: ${ cp.locations.toSeq.mkString(",") }" + - s", No.Of Blocks: ${ cp.tableBlockInfos.size() }" - ) } - } else { - logInfo("No blocks identified to scan") } - } else { - logInfo("No valid segments found to scan") + + noOfBlocks = splits.size + noOfNodes = nodeBlockMapping.size + noOfTasks = result.size() + + statistic = new QueryStatistic() + statistic.addStatistics(QueryStatisticsConstants.BLOCK_IDENTIFICATION, + System.currentTimeMillis) + statisticRecorder.recordStatisticsForDriver(statistic, queryId) + statisticRecorder.logStatisticsAsTableDriver() } + logInfo( + s""" + | Identified no.of.blocks: $noOfBlocks, + | no.of.tasks: $noOfTasks, + | no.of.nodes: $noOfNodes, + | parallelism: $parallelism + """.stripMargin) result.toArray(new Array[Partition](result.size())) } - override def compute(thepartition: Partition, context: TaskContext): Iterator[V] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val iter = new Iterator[V] { - var rowIterator: CarbonIterator[Array[Any]] = _ - var queryStartTime: Long = 0 - val queryExecutor = QueryExecutorFactory.getQueryExecutor() - try { - context.addTaskCompletionListener(context => { - clearDictionaryCache(queryModel.getColumnToDictionaryMapping) - logStatistics() - queryExecutor.finish - }) - val carbonSparkPartition = thepartition.asInstanceOf[CarbonSparkPartition] - if (!carbonSparkPartition.tableBlockInfos.isEmpty) { - queryModel.setQueryId(queryModel.getQueryId + "_" + carbonSparkPartition.idx) - // fill table block info - queryModel.setTableBlockInfos(carbonSparkPartition.tableBlockInfos) - queryStartTime = System.currentTimeMillis - val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) - logInfo("*************************" + carbonPropertiesFilePath) - if (null == carbonPropertiesFilePath) { - System.setProperty("carbon.properties.filepath", - System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") - } - // execute query - rowIterator = new ChunkRowIterator( - queryExecutor.execute(queryModel). - asInstanceOf[CarbonIterator[BatchResult]]).asInstanceOf[CarbonIterator[Array[Any]]] + override def compute(split: Partition, context: TaskContext): Iterator[V] = { + val carbonPropertiesFilePath = System.getProperty("carbon.properties.filepath", null) + if (null == carbonPropertiesFilePath) { + System.setProperty("carbon.properties.filepath", + System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties" + ) + } - } - } catch { - case e: Exception => - LOGGER.error(e) - if (null != e.getMessage) { - sys.error(s"Exception occurred in query execution :: ${ e.getMessage }") - } else { - sys.error("Exception occurred in query execution.Please check logs.") - } - } + val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) + val attemptContext = newTaskAttemptContext(new Configuration(), attemptId) + val format = prepareInputFormatForExecutor(attemptContext.getConfiguration) + val inputSplit = split.asInstanceOf[CarbonSparkPartition].split.value + val reader = format.createRecordReader(inputSplit, attemptContext) + reader.initialize(inputSplit, attemptContext) + + val queryStartTime = System.currentTimeMillis - var havePair = false - var finished = false - var recordCount = 0 + new Iterator[V] { + private var havePair = false + private var finished = false + private var count = 0 + + context.addTaskCompletionListener { context => + logStatistics(queryStartTime, count) + reader.close() + } override def hasNext: Boolean = { + if (context.isInterrupted) { + throw new TaskKilledException + } if (!finished && !havePair) { - finished = (null == rowIterator) || (!rowIterator.hasNext) + finished = !reader.nextKeyValue + if (finished) { + reader.close() + } havePair = !finished } !finished @@ -241,68 +201,55 @@ class CarbonScanRDD[V: ClassTag]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - recordCount += 1 - keyClass.getValue(rowIterator.next()) + val value: V = reader.getCurrentValue + count += 1 + value } + } + } - def clearDictionaryCache(columnToDictionaryMap: java.util.Map[String, Dictionary]) = { - if (null != columnToDictionaryMap) { - org.apache.carbondata.spark.util.CarbonQueryUtil - .clearColumnDictionaryCache(columnToDictionaryMap) - } - } + private def prepareInputFormatForDriver(conf: Configuration): CarbonInputFormat[V] = { + CarbonInputFormat.setCarbonTable(conf, carbonTable) + createInputFormat(conf) + } - def logStatistics(): Unit = { - if (null != queryModel.getStatisticsRecorder) { - var queryStatistic = new QueryStatistic() - queryStatistic - .addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, - System.currentTimeMillis - queryStartTime - ) - queryModel.getStatisticsRecorder.recordStatistics(queryStatistic) - // result size - queryStatistic = new QueryStatistic() - queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount) - queryModel.getStatisticsRecorder.recordStatistics(queryStatistic) - // print executor query statistics for each task_id - queryModel.getStatisticsRecorder.logStatisticsAsTableExecutor() - } - } + private def prepareInputFormatForExecutor(conf: Configuration): CarbonInputFormat[V] = { + CarbonInputFormat.setCarbonReadSupport(classOf[RawDataReadSupport], conf) + createInputFormat(conf) + } + + private def createInputFormat(conf: Configuration): CarbonInputFormat[V] = { + val format = new CarbonInputFormat[V] + CarbonInputFormat.setTablePath(conf, identifier.getTablePath) + CarbonInputFormat.setFilterPredicates(conf, filterExpression) + val projection = new CarbonProjection + columnProjection.foreach { attr => + projection.addColumn(attr.name) } + CarbonInputFormat.setColumnProjection(conf, projection) + format + } - iter + def logStatistics(queryStartTime: Long, recordCount: Int): Unit = { + var queryStatistic = new QueryStatistic() + queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART, + System.currentTimeMillis - queryStartTime) + val statisticRecorder = CarbonTimeStatisticsFactory.createExecutorRecorder(queryId) + statisticRecorder.recordStatistics(queryStatistic) + // result size + queryStatistic = new QueryStatistic() + queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, recordCount) + statisticRecorder.recordStatistics(queryStatistic) + // print executor query statistics for each task_id + statisticRecorder.logStatisticsAsTableExecutor() } /** * Get the preferred locations where to launch this task. */ - override def getPreferredLocations(partition: Partition): Seq[String] = { - val theSplit = partition.asInstanceOf[CarbonSparkPartition] - val firstOptionLocation = theSplit.locations.filter(_ != "localhost") - val tableBlocks = theSplit.tableBlockInfos - // node name and count mapping - val blockMap = new util.LinkedHashMap[String, Integer]() - - tableBlocks.asScala.foreach(tableBlock => tableBlock.getLocations.foreach( - location => { - if (!firstOptionLocation.exists(location.equalsIgnoreCase(_))) { - val currentCount = blockMap.get(location) - if (currentCount == null) { - blockMap.put(location, 1) - } else { - blockMap.put(location, currentCount + 1) - } - } - } - ) - ) - - val sortedList = blockMap.entrySet().asScala.toSeq.sortWith((nodeCount1, nodeCount2) => { - nodeCount1.getValue > nodeCount2.getValue - } - ) - - val sortedNodesList = sortedList.map(nodeCount => nodeCount.getKey).take(2) - firstOptionLocation ++ sortedNodesList + override def getPreferredLocations(split: Partition): Seq[String] = { + val theSplit = split.asInstanceOf[CarbonSparkPartition] + val firstOptionLocation = theSplit.split.value.getLocations.filter(_ != "localhost") + firstOptionLocation } } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 9c9be8d363f..5fdbc5d1ab8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -73,17 +73,8 @@ object Compactor { maxSegmentColumnSchemaList = null ) carbonLoadModel.setStorePath(carbonMergerMapping.storePath) - val segmentStatusManager = new SegmentStatusManager(new AbsoluteTableIdentifier - (CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, - carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId - ) - ) - ) - carbonLoadModel.setLoadMetadataDetails(segmentStatusManager - .readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava - ) + carbonLoadModel.setLoadMetadataDetails( + SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava) var execInstance = "1" // in case of non dynamic executor allocation, number of executors are fixed. if (sc.sparkContext.getConf.contains("spark.executor.instances")) { diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala deleted file mode 100644 index c55c8075a52..00000000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/QueryPlanUtil.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.util - -import scala.reflect.ClassTag - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapreduce.Job -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - -import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier -import org.apache.carbondata.hadoop.CarbonInputFormat - - -/** - * All the utility functions for carbon plan creation - */ -object QueryPlanUtil { - - /** - * createCarbonInputFormat from query model - */ - def createCarbonInputFormat(absoluteTableIdentifier: AbsoluteTableIdentifier) : - (CarbonInputFormat[Array[Object]], Job) = { - val carbonInputFormat = new CarbonInputFormat[Array[Object]]() - val jobConf: JobConf = new JobConf(new Configuration) - val job: Job = new Job(jobConf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) - (carbonInputFormat, job) - } - - def createCarbonInputFormat[V: ClassTag](absoluteTableIdentifier: AbsoluteTableIdentifier, - conf: Configuration) : CarbonInputFormat[V] = { - val carbonInputFormat = new CarbonInputFormat[V]() - val job: Job = new Job(conf) - FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath)) - carbonInputFormat - } -} diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala index c9d2a0f74db..ca0ad58dcb6 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala @@ -38,14 +38,13 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.util.SerializableConfiguration import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier -import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.carbon.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection} -import org.apache.carbondata.hadoop.util.SchemaReader +import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader} import org.apache.carbondata.scan.expression.logical.AndExpression import org.apache.carbondata.spark.{CarbonFilters, CarbonOption} import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl import org.apache.carbondata.spark.util.CarbonScalaUtil.CarbonSparkUtil -import org.apache.carbondata.spark.util.QueryPlanUtil private[sql] case class CarbonDatasourceHadoopRelation( @@ -104,7 +103,7 @@ private[sql] case class CarbonDatasourceHadoopRelation( val projection = new CarbonProjection requiredColumns.foreach(projection.addColumn) - CarbonInputFormat.setColumnProjection(projection, conf) + CarbonInputFormat.setColumnProjection(conf, projection) CarbonInputFormat.setCarbonReadSupport(classOf[SparkRowReadSupportImpl], conf) new CarbonHadoopFSRDD[Row](sqlContext.sparkContext, @@ -145,12 +144,11 @@ class CarbonHadoopFSRDD[V: ClassTag]( context: TaskContext): Iterator[V] = { val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf.value, attemptId) - val inputFormat = QueryPlanUtil.createCarbonInputFormat(identifier, - hadoopAttemptContext.getConfiguration - ) + val job: Job = new Job(hadoopAttemptContext.getConfiguration) + val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, job) hadoopAttemptContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) val reader = - inputFormat.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, + format.createRecordReader(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, hadoopAttemptContext ) reader.initialize(split.asInstanceOf[CarbonHadoopFSPartition].carbonSplit.value, @@ -186,11 +184,9 @@ class CarbonHadoopFSRDD[V: ClassTag]( override protected def getPartitions: Array[Partition] = { val jobContext = newJobContext(conf.value, jobId) - val carbonInputFormat = QueryPlanUtil.createCarbonInputFormat(identifier, - jobContext.getConfiguration - ) + val format = CarbonInputFormatUtil.createCarbonInputFormat(identifier, new Job(conf.value)) jobContext.getConfiguration.set(FileInputFormat.INPUT_DIR, identifier.getTablePath) - val splits = carbonInputFormat.getSplits(jobContext).toArray + val splits = format.getSplits(jobContext).toArray val carbonInputSplits = splits .map(f => new SerializableWritable(f.asInstanceOf[CarbonInputSplit])) carbonInputSplits.zipWithIndex.map(f => new CarbonHadoopFSPartition(id, f._2, f._1)) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala index 069e1060d87..a06d5cba44c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala @@ -272,9 +272,8 @@ case class CarbonRelation( private var sizeInBytesLocalValue = 0L def sizeInBytes: Long = { - val tableStatusNewLastUpdatedTime = new SegmentStatusManager( + val tableStatusNewLastUpdatedTime = SegmentStatusManager.getTableStatusLastModifiedTime( tableMeta.carbonTable.getAbsoluteTableIdentifier) - .getTableStatusLastModifiedTime if (tableStatusLastUpdateTime != tableStatusNewLastUpdatedTime) { val tablePath = CarbonStorePath.getCarbonTablePath( tableMeta.storePath, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala similarity index 92% rename from integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala rename to integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala index c105cae7027..6580c4f6695 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonOperators.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonScan.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.hive.CarbonMetastoreCatalog import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.scan.model._ -import org.apache.carbondata.spark.{CarbonFilters, RawValue, RawValueImpl} +import org.apache.carbondata.spark.CarbonFilters import org.apache.carbondata.spark.rdd.CarbonScanRDD case class CarbonScan( @@ -136,14 +136,11 @@ case class CarbonScan( selectedMsrs.foreach(plan.addMeasure) } - def inputRdd: CarbonScanRDD[Array[Any]] = { val conf = new Configuration() val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier - val model = QueryModel.createModel( - absoluteTableIdentifier, buildCarbonPlan, carbonTable) - val kv: RawValue[Array[Any]] = new RawValueImpl + // setting queryid buildCarbonPlan.setQueryId(ocRaw.getConf("queryId", System.nanoTime() + "")) @@ -151,19 +148,15 @@ case class CarbonScan( .getTableCreationTime(relationRaw.databaseName, relationRaw.tableName) val schemaLastUpdatedTime = carbonCatalog .getSchemaLastUpdatedTime(relationRaw.databaseName, relationRaw.tableName) - val big = new CarbonScanRDD( + new CarbonScanRDD( ocRaw.sparkContext, - model, + attributesRaw, buildCarbonPlan.getFilterExpression, - kv, - conf, - tableCreationTime, - schemaLastUpdatedTime, - carbonCatalog.storePath) - big + absoluteTableIdentifier, + carbonTable + ) } - override def outputsUnsafeRows: Boolean = (attributesNeedToDecode.size() == 0) && useUnsafeCoversion @@ -174,12 +167,14 @@ case class CarbonScan( new Iterator[InternalRow] { override def hasNext: Boolean = iter.hasNext - override def next(): InternalRow = + override def next(): InternalRow = { + val value = iter.next if (outUnsafeRows) { - unsafeProjection(new GenericMutableRow(iter.next())) + unsafeProjection(new GenericMutableRow(value)) } else { - new GenericMutableRow(iter.next()) + new GenericMutableRow(value) } + } } } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index a6b4ec5bcb3..74b0dd2b39c 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -924,10 +924,9 @@ private[sql] case class DeleteLoadsById( } val path = carbonTable.getMetaDataFilepath - val segmentStatusManager = - new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) try { - val invalidLoadIds = segmentStatusManager.updateDeletionStatus(loadids.asJava, path).asScala + val invalidLoadIds = SegmentStatusManager.updateDeletionStatus( + carbonTable.getAbsoluteTableIdentifier, loadids.asJava, path).asScala if (invalidLoadIds.isEmpty) { @@ -986,8 +985,6 @@ private[sql] case class DeleteLoadsByLoadDate( val carbonTable = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance() .getCarbonTable(dbName + '_' + tableName) - val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) - if (null == carbonTable) { var relation = CarbonEnv.getInstance(sqlContext).carbonCatalog .lookupRelation1(identifier, None)(sqlContext).asInstanceOf[CarbonRelation] @@ -995,8 +992,9 @@ private[sql] case class DeleteLoadsByLoadDate( val path = carbonTable.getMetaDataFilepath() try { - val invalidLoadTimestamps = segmentStatusManager - .updateDeletionStatus(loadDate, path, timeObj.asInstanceOf[java.lang.Long]).asScala + val invalidLoadTimestamps = SegmentStatusManager.updateDeletionStatus( + carbonTable.getAbsoluteTableIdentifier, loadDate, path, + timeObj.asInstanceOf[java.lang.Long]).asScala if (invalidLoadTimestamps.isEmpty) { LOGGER.audit(s"Delete segment by date is successfull for $dbName.$tableName.") } @@ -1328,12 +1326,8 @@ private[sql] case class ShowLoads( if (carbonTable == null) { sys.error(s"$databaseName.$tableName is not found") } - val path = carbonTable.getMetaDataFilepath() - - val segmentStatusManager = new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier) - - val loadMetadataDetailsArray = segmentStatusManager.readLoadMetadata(path) - + val path = carbonTable.getMetaDataFilepath + val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path) if (loadMetadataDetailsArray.nonEmpty) { val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP) diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala index 0c13293e4a1..25c36c5db4f 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.sql.hive - import java.net.{InetAddress, InterfaceAddress, NetworkInterface} import scala.collection.JavaConverters._ @@ -44,12 +43,9 @@ object DistributionUtil { * localhost for retriving executor list */ def getNodeList(sparkContext: SparkContext): Array[String] = { - - val arr = - sparkContext.getExecutorMemoryStatus.map { - kv => - kv._1.split(":")(0) - }.toSeq + val arr = sparkContext.getExecutorMemoryStatus.map { kv => + kv._1.split(":")(0) + }.toSeq val localhostIPs = getLocalhostIPs val selectedLocalIPList = localhostIPs.filter(arr.contains(_)) @@ -109,10 +105,9 @@ object DistributionUtil { * @param sparkContext * @return */ - def ensureExecutorsAndGetNodeList(blockList: Array[Distributable], - sparkContext: SparkContext): - Array[String] = { - val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.toSeq.asJava) + def ensureExecutorsAndGetNodeList(blockList: Seq[Distributable], + sparkContext: SparkContext): Seq[String] = { + val nodeMapping = CarbonLoaderUtil.getRequiredExecutors(blockList.asJava) var confExecutorsTemp: String = null if (sparkContext.getConf.contains("spark.executor.instances")) { confExecutorsTemp = sparkContext.getConf.get("spark.executor.instances") @@ -131,7 +126,9 @@ object DistributionUtil { } val requiredExecutors = if (nodeMapping.size > confExecutors) { confExecutors - } else { nodeMapping.size() } + } else { + nodeMapping.size() + } val startTime = System.currentTimeMillis() CarbonContext.ensureExecutors(sparkContext, requiredExecutors) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala index cc00c47ec83..f02d4e7be9d 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/AllDataTypesTestCaseAggregate.scala @@ -35,10 +35,9 @@ import org.apache.carbondata.core.util.CarbonProperties class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll { override def beforeAll { - + clean val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../") .getCanonicalPath - sql("create table if not exists Carbon_automation_test (imei string,deviceInformationId int,MAC string,deviceColor string,device_backColor string,modelId string,marketName string,AMSize string,ROMSize string,CUPAudit string,CPIClocked string,series string,productionDate timestamp,bomCode string,internalModels string, deliveryTime string, channelsId string, channelsName string , deliveryAreaId string, deliveryCountry string, deliveryProvince string, deliveryCity string,deliveryDistrict string, deliveryStreet string, oxSingleNumber string, ActiveCheckTime string, ActiveAreaId string, ActiveCountry string, ActiveProvince string, Activecity string, ActiveDistrict string, ActiveStreet string, ActiveOperatorId string, Active_releaseId string, Active_EMUIVersion string, Active_operaSysVersion string, Active_BacVerNumber string, Active_BacFlashVer string, Active_webUIVersion string, Active_webUITypeCarrVer string,Active_webTypeDataVerNumber string, Active_operatorsVersion string, Active_phonePADPartitionedVersions string, Latest_YEAR int, Latest_MONTH int, Latest_DAY int, Latest_HOUR string, Latest_areaId string, Latest_country string, Latest_province string, Latest_city string, Latest_district string, Latest_street string, Latest_releaseId string, Latest_EMUIVersion string, Latest_operaSysVersion string, Latest_BacVerNumber string, Latest_BacFlashVer string, Latest_webUIVersion string, Latest_webUITypeCarrVer string, Latest_webTypeDataVerNumber string, Latest_operatorsVersion string, Latest_phonePADPartitionedVersions string, Latest_operatorId string, gamePointDescription string, gamePointId int,contractNumber int) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_INCLUDE'='Latest_MONTH,Latest_DAY,deviceInformationId')"); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT) @@ -52,11 +51,14 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll { } + def clean{ + sql("drop table if exists Carbon_automation_test") + sql("drop table if exists Carbon_automation_hive") + sql("drop table if exists Carbon_automation_test_hive") + } + override def afterAll { - sql("drop table Carbon_automation_test") - sql("drop table Carbon_automation_hive") - sql("drop table Carbon_automation_test_hive") - + clean CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy") } @@ -425,10 +427,10 @@ class AllDataTypesTestCaseAggregate extends QueryTest with BeforeAndAfterAll { }) //TC_103 - test("select variance(deviceInformationId) as a from Carbon_automation_test")({ + test("select variance(deviceInformationId) as a from carbon_automation_test")({ checkAnswer( - sql("select variance(deviceInformationId) as a from Carbon_automation_test"), - sql("select variance(deviceInformationId) as a from Carbon_automation_hive")) + sql("select variance(deviceInformationId) as a from carbon_automation_test"), + sql("select variance(deviceInformationId) as a from carbon_automation_hive")) }) //TC_105 test("select var_samp(deviceInformationId) as a from Carbon_automation_test")({ diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala index 7343a814c8f..924d91a3067 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSystemLockFeatureTest.scala @@ -113,26 +113,22 @@ class CompactionSystemLockFeatureTest extends QueryTest with BeforeAndAfterAll { sql("clean files for table table2") // check for table 1. - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier1 = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr") - ) - ) + new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table1", "rrr")) // merged segment should not be there - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier1) + .getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0")) assert(!segments.contains("1")) // check for table 2. - val segmentStatusManager2: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier2 = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1") - ) - ) + new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "table2", "rrr1")) // merged segment should not be there - val segments2 = segmentStatusManager2.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments2 = SegmentStatusManager.getSegmentStatus(identifier2) + .getValidSegments.asScala.toList assert(segments2.contains("0.1")) assert(!segments2.contains("0")) assert(!segments2.contains("1")) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala index 80a23206fa9..f5039a7a5f5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala @@ -43,9 +43,6 @@ class DataCompactionBoundaryConditionsTest extends QueryTest with BeforeAndAfter val carbonTableIdentifier: CarbonTableIdentifier = new CarbonTableIdentifier("default", "boundarytest".toLowerCase(), "1") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)) - override def beforeAll { CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "2,2") diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala index d780efef8f7..f77ec9b86c3 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala @@ -86,13 +86,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter var noOfRetries = 0 while (status && noOfRetries < 10) { - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1") + new CarbonTableIdentifier( + CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1") ) - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala index eb889d64f07..7ec6431baf5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala @@ -105,16 +105,11 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll { * Compaction should fail as lock is being held purposefully */ test("check if compaction is failed or not.") { - - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager( - absoluteTableIdentifier - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList - + val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier) + .getValidSegments.asScala.toList if (!segments.contains("0.1")) { assert(true) - } - else { + } else { assert(false) } } diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala index fbb39d81df9..15ed78b3b43 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala @@ -45,8 +45,7 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll val carbonTableIdentifier: CarbonTableIdentifier = new CarbonTableIdentifier("default", "minorthreshold".toLowerCase(), "1") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier)) + val identifier = new AbsoluteTableIdentifier(storeLocation, carbonTableIdentifier) override def beforeAll { CarbonProperties.getInstance() @@ -96,7 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll sql("clean files for table minorthreshold") - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList assert(segments.contains("0.2")) assert(!segments.contains("0.1")) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala index c7be22ff83b..570bb729590 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala @@ -38,14 +38,10 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll { // return segment details def getSegments(databaseName : String, tableName : String, tableId : String): List[String] = { - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId) - ) - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList - segments + new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)) + SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList } val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../") diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala index 3eef8b7b2d9..137cebc3dd1 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala @@ -84,13 +84,12 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { var noOfRetries = 0 while (status && noOfRetries < 10) { - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1") ) - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. @@ -131,15 +130,14 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { // delete merged segments sql("clean files for table normalcompaction") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid") ) - ) // merged segment should not be there - val segments = segmentStatusManager.getValidAndInvalidSegments().getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList assert(!segments.contains("0")) assert(!segments.contains("1")) assert(!segments.contains("2")) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 99e3d569e27..a1664a6b9e6 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -97,14 +97,13 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll var noOfRetries = 0 while (!status && noOfRetries < 10) { - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "") ) - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -129,15 +128,14 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll // delete merged segments sql("clean files for table ignoremajor") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") ) - ) // merged segment should not be there - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(segments.contains("2.1")) assert(!segments.contains("2")) @@ -156,13 +154,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll catch { case _:Throwable => assert(true) } - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( - CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier( - CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") - ) - ) val carbontablePath = CarbonStorePath .getCarbonTablePath(CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION), @@ -170,7 +161,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") ) .getMetadataDirectoryPath - var segs = segmentStatusManager.readLoadMetadata(carbontablePath) + val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) // status should remain as compacted. assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED)) @@ -185,13 +176,6 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll "DELETE SEGMENTS FROM TABLE ignoremajor where STARTTIME before" + " '2222-01-01 19:35:01'" ) - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( - CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier( - CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") - ) - ) val carbontablePath = CarbonStorePath .getCarbonTablePath(CarbonProperties.getInstance .getProperty(CarbonCommonConstants.STORE_LOCATION), @@ -199,7 +183,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") ) .getMetadataDirectoryPath - var segs = segmentStatusManager.readLoadMetadata(carbontablePath) + val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) // status should remain as compacted for segment 2. assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED)) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala index 3745e110f6b..25087a7da61 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala @@ -87,14 +87,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA var noOfRetries = 0 while (!status && noOfRetries < 10) { - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "") ) - ) - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -119,14 +118,13 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA // delete merged segments sql("clean files for table stopmajor") - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new - AbsoluteTableIdentifier( + val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr") ) - ) // merged segment should not be there - val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + val segments = SegmentStatusManager.getSegmentStatus(identifier) + .getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0.2")) assert(!segments.contains("0")) diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala index bed64284ef3..6d3cdeca73b 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala @@ -55,7 +55,6 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { AbsoluteTableIdentifier(storeLocation, new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "DataRetentionTable".toLowerCase(), "300")) - val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(absoluteTableIdentifierForRetention) val carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifierForRetention.getStorePath, absoluteTableIdentifierForRetention.getCarbonTableIdentifier).getMetadataDirectoryPath @@ -133,8 +132,8 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll { } test("RetentionTest_DeleteSegmentsByLoadTime") { - val segments: Array[LoadMetadataDetails] = segmentStatusManager - .readLoadMetadata(carbonTablePath) + val segments: Array[LoadMetadataDetails] = + SegmentStatusManager.readLoadMetadata(carbonTablePath) // check segment length, it should be 3 (loads) if (segments.length != 2) { assert(false) diff --git a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java index e3b74ad8922..4036a8b6af2 100644 --- a/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java +++ b/processing/src/main/java/org/apache/carbondata/lcm/status/SegmentStatusManager.java @@ -60,29 +60,23 @@ public class SegmentStatusManager { private static final LogService LOG = LogServiceFactory.getLogService(SegmentStatusManager.class.getName()); - private AbsoluteTableIdentifier absoluteTableIdentifier; - - public SegmentStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier) { - this.absoluteTableIdentifier = absoluteTableIdentifier; - } - /** * This will return the lock object used to lock the table status file before updation. * * @return */ - public ICarbonLock getTableStatusLock() { - return CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier.getCarbonTableIdentifier(), + public static ICarbonLock getTableStatusLock(AbsoluteTableIdentifier identifier) { + return CarbonLockFactory.getCarbonLockObj(identifier.getCarbonTableIdentifier(), LockUsage.TABLE_STATUS_LOCK); } /** * This method will return last modified time of tablestatus file */ - public long getTableStatusLastModifiedTime() throws IOException { - String tableStatusPath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()).getTableStatusFilePath(); + public static long getTableStatusLastModifiedTime(AbsoluteTableIdentifier identifier) + throws IOException { + String tableStatusPath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(), + identifier.getCarbonTableIdentifier()).getTableStatusFilePath(); if (!FileFactory.isFileExist(tableStatusPath, FileFactory.getFileType(tableStatusPath))) { return 0L; } else { @@ -97,15 +91,15 @@ public long getTableStatusLastModifiedTime() throws IOException { * @return * @throws IOException */ - public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOException { + public static SegmentStatus getSegmentStatus(AbsoluteTableIdentifier identifier) + throws IOException { // @TODO: move reading LoadStatus file to separate class - List listOfValidSegments = new ArrayList(10); - List listOfValidUpdatedSegments = new ArrayList(10); - List listOfInvalidSegments = new ArrayList(10); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + List validSegments = new ArrayList(10); + List validUpdatedSegments = new ArrayList(10); + List invalidSegments = new ArrayList(10); + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier.getStorePath(), + identifier.getCarbonTableIdentifier()); String dataPath = carbonTablePath.getTableStatusFilePath(); DataInputStream dataInputStream = null; Gson gsonObjectToRead = new Gson(); @@ -114,9 +108,7 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOExcepti LoadMetadataDetails[] loadFolderDetailsArray; try { if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) { - dataInputStream = fileOperation.openForRead(); - BufferedReader buffReader = new BufferedReader( new InputStreamReader(dataInputStream, CarbonCommonConstants.DEFAULT_CHARSET)); @@ -126,40 +118,31 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOExcepti List loadFolderDetails = Arrays.asList(loadFolderDetailsArray); for (LoadMetadataDetails loadMetadataDetails : loadFolderDetails) { - if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) - || CarbonCommonConstants.MARKED_FOR_UPDATE - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) - || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { + String loadStatus = loadMetadataDetails.getLoadStatus(); + if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS.equalsIgnoreCase(loadStatus) + || CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus) + || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equalsIgnoreCase( + loadStatus)) { // check for merged loads. if (null != loadMetadataDetails.getMergedLoadName()) { - if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) { - listOfValidSegments.add(loadMetadataDetails.getMergedLoadName()); + if (!validSegments.contains(loadMetadataDetails.getMergedLoadName())) { + validSegments.add(loadMetadataDetails.getMergedLoadName()); } // if merged load is updated then put it in updated list - if (CarbonCommonConstants.MARKED_FOR_UPDATE - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { - listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName()); + if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) { + validUpdatedSegments.add(loadMetadataDetails.getMergedLoadName()); } continue; } - - if (CarbonCommonConstants.MARKED_FOR_UPDATE - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) { - - listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName()); + if (CarbonCommonConstants.MARKED_FOR_UPDATE.equalsIgnoreCase(loadStatus)) { + validUpdatedSegments.add(loadMetadataDetails.getLoadName()); } - listOfValidSegments.add(loadMetadataDetails.getLoadName()); - } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) - || CarbonCommonConstants.SEGMENT_COMPACTED - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()) - || CarbonCommonConstants.MARKED_FOR_DELETE - .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) { - listOfInvalidSegments.add(loadMetadataDetails.getLoadName()); + validSegments.add(loadMetadataDetails.getLoadName()); + } else if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equalsIgnoreCase(loadStatus) + || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(loadStatus) + || CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadStatus)) { + invalidSegments.add(loadMetadataDetails.getLoadName()); } - } } } catch (IOException e) { @@ -167,7 +150,6 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOExcepti throw e; } finally { try { - if (null != dataInputStream) { dataInputStream.close(); } @@ -175,10 +157,9 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOExcepti LOG.error(e); throw e; } - } - return new ValidAndInvalidSegmentsInfo(listOfValidSegments, listOfValidUpdatedSegments, - listOfInvalidSegments); + + return new SegmentStatus(validSegments, validUpdatedSegments, invalidSegments); } /** @@ -187,7 +168,7 @@ public ValidAndInvalidSegmentsInfo getValidAndInvalidSegments() throws IOExcepti * @param tableFolderPath * @return */ - public LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) { + public static LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) { Gson gsonObjectToRead = new Gson(); DataInputStream dataInputStream = null; BufferedReader buffReader = null; @@ -223,13 +204,9 @@ public LoadMetadataDetails[] readLoadMetadata(String tableFolderPath) { * * @return */ - private String readCurrentTime() { + private static String readCurrentTime() { SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); - String date = null; - - date = sdf.format(new Date()); - - return date; + return sdf.format(new Date()); } /** @@ -240,8 +217,7 @@ private String readCurrentTime() { * @return -1 if first arg is less than second arg, 1 if first arg is greater than second arg, * 0 otherwise */ - private Integer compareDateValues(Long loadValue, Long userValue) { - + private static Integer compareDateValues(Long loadValue, Long userValue) { return loadValue.compareTo(userValue); } @@ -252,10 +228,9 @@ private Integer compareDateValues(Long loadValue, Long userValue) { * @param tableFolderPath * @return */ - public List updateDeletionStatus(List loadIds, String tableFolderPath) - throws Exception { - CarbonTableIdentifier carbonTableIdentifier = - absoluteTableIdentifier.getCarbonTableIdentifier(); + public static List updateDeletionStatus(AbsoluteTableIdentifier identifier, + List loadIds, String tableFolderPath) throws Exception { + CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); ICarbonLock carbonDeleteSegmentLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); ICarbonLock carbonTableStatusLock = @@ -267,9 +242,8 @@ public List updateDeletionStatus(List loadIds, String tableFolde if (carbonDeleteSegmentLock.lockWithRetries()) { LOG.info("Delete segment lock has been successfully acquired"); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( + identifier.getStorePath(), identifier.getCarbonTableIdentifier()); String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; if (!FileFactory.isFileExist(dataLoadLocation, FileFactory.getFileType(dataLoadLocation))) { @@ -335,10 +309,9 @@ public List updateDeletionStatus(List loadIds, String tableFolde * @param tableFolderPath * @return */ - public List updateDeletionStatus(String loadDate, String tableFolderPath, - Long loadStartTime) throws Exception { - CarbonTableIdentifier carbonTableIdentifier = - absoluteTableIdentifier.getCarbonTableIdentifier(); + public static List updateDeletionStatus(AbsoluteTableIdentifier identifier, + String loadDate, String tableFolderPath, Long loadStartTime) throws Exception { + CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier(); ICarbonLock carbonDeleteSegmentLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK); ICarbonLock carbonTableStatusLock = @@ -350,9 +323,8 @@ public List updateDeletionStatus(String loadDate, String tableFolderPath if (carbonDeleteSegmentLock.lockWithRetries()) { LOG.info("Delete segment lock has been successfully acquired"); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( + identifier.getStorePath(), identifier.getCarbonTableIdentifier()); String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); LoadMetadataDetails[] listOfLoadFolderDetailsArray = null; @@ -422,7 +394,7 @@ public List updateDeletionStatus(String loadDate, String tableFolderPath * @param listOfLoadFolderDetailsArray * @throws IOException */ - public void writeLoadDetailsIntoFile(String dataLoadLocation, + public static void writeLoadDetailsIntoFile(String dataLoadLocation, LoadMetadataDetails[] listOfLoadFolderDetailsArray) throws IOException { AtomicFileOperations fileWrite = new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); @@ -458,7 +430,7 @@ public void writeLoadDetailsIntoFile(String dataLoadLocation, * @param invalidLoadIds * @return invalidLoadIds */ - public List updateDeletionStatus(List loadIds, + public static List updateDeletionStatus(List loadIds, LoadMetadataDetails[] listOfLoadFolderDetailsArray, List invalidLoadIds) { for (String loadId : loadIds) { boolean loadFound = false; @@ -502,7 +474,7 @@ public List updateDeletionStatus(List loadIds, * @param invalidLoadTimestamps * @return invalidLoadTimestamps */ - public List updateDeletionStatus(String loadDate, + public static List updateDeletionStatus(String loadDate, LoadMetadataDetails[] listOfLoadFolderDetailsArray, List invalidLoadTimestamps, Long loadStartTime) { // For each load timestamp loop through data and if the @@ -543,7 +515,7 @@ public List updateDeletionStatus(String loadDate, * * @param streams - streams to close. */ - private void closeStreams(Closeable... streams) { + private static void closeStreams(Closeable... streams) { // Added if to avoid NullPointerException in case one stream is being passed as null if (null != streams) { for (Closeable stream : streams) { @@ -566,7 +538,7 @@ private void closeStreams(Closeable... streams) { * @return */ - public List updateLatestTableStatusDetails( + public static List updateLatestTableStatusDetails( LoadMetadataDetails[] oldMetadata, LoadMetadataDetails[] newMetadata) { List newListMetadata = @@ -584,7 +556,7 @@ public List updateLatestTableStatusDetails( * * @param loadMetadata */ - public void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) { + public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) { // update status only if the segment is not marked for delete if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) { loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE); @@ -593,28 +565,28 @@ public void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) { } - public static class ValidAndInvalidSegmentsInfo { - private final List listOfValidSegments; - private final List listOfValidUpdatedSegments; - private final List listOfInvalidSegments; - - private ValidAndInvalidSegmentsInfo(List listOfValidSegments, - List listOfValidUpdatedSegments, List listOfInvalidUpdatedSegments) { - this.listOfValidSegments = listOfValidSegments; - this.listOfValidUpdatedSegments = listOfValidUpdatedSegments; - this.listOfInvalidSegments = listOfInvalidUpdatedSegments; - } + public static class SegmentStatus { + private final List validSegments; + private final List validUpdatedSegments; + private final List invalidSegments; - public List getInvalidSegments() { - return listOfInvalidSegments; + private SegmentStatus(List validSegments, List validUpdatedSegments, + List invalidSegments) { + this.validSegments = validSegments; + this.validUpdatedSegments = validUpdatedSegments; + this.invalidSegments = invalidSegments; } public List getValidSegments() { - return listOfValidSegments; + return validSegments; } public List getUpadtedSegments() { - return listOfValidUpdatedSegments; + return validUpdatedSegments; + } + + public List getInvalidSegments() { + return invalidSegments; } } }