diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java index d1ff644ab9d..2b3979f049e 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java @@ -224,16 +224,13 @@ private static void cleanDeletedFactFile(String loadFolderPath) { /** * @param loadModel * @param storeLocation - * @param partitionCount * @param isForceDelete * @param details * @return * */ public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel, - String storeLocation, int partitionCount, boolean isForceDelete, - LoadMetadataDetails[] details) { - String path = null; + String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) { List deletedLoads = new ArrayList(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); @@ -242,12 +239,8 @@ public static boolean deleteLoadFoldersFromFileSystem(CarbonLoadModel loadModel, if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { - boolean deletionStatus = false; - - for (int partitionId = 0; partitionId < partitionCount; partitionId++) { - path = getSegmentPath(loadModel, storeLocation, partitionId, oneLoad); - deletionStatus = physicalFactAndMeasureMetadataDeletion(path); - } + String path = getSegmentPath(loadModel, storeLocation, 0, oneLoad); + boolean deletionStatus = physicalFactAndMeasureMetadataDeletion(path); if (deletionStatus) { isDeleted = true; oneLoad.setVisibility("false"); diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java index 9be4d475c8e..84e6c00a561 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java @@ -219,12 +219,11 @@ public static boolean updateLoadMetadataWithMergeStatus(List identifySegmentsToBeMerged(String storeLocation, - CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize, + CarbonLoadModel carbonLoadModel, long compactionSize, List segments, CompactionType compactionType) { List sortedSegments = new ArrayList(segments); @@ -245,7 +244,7 @@ public static List identifySegmentsToBeMerged(String storeL if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) { listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize, - listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation); + listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation); } else { listOfSegmentsToBeMerged = @@ -399,13 +398,12 @@ private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segD * @param compactionSize * @param listOfSegmentsAfterPreserve * @param carbonLoadModel - * @param partitionCount * @param storeLocation * @return */ private static List identifySegmentsToBeMergedBasedOnSize( long compactionSize, List listOfSegmentsAfterPreserve, - CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) { + CarbonLoadModel carbonLoadModel, String storeLocation) { List segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); @@ -423,7 +421,7 @@ private static List identifySegmentsToBeMergedBasedOnSize( String segId = segment.getLoadName(); // variable to store one segment size across partition. long sizeOfOneSegmentAcrossPartition = - getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId); + getSizeOfSegment(storeLocation, tableIdentifier, segId); // if size of a segment is greater than the Major compaction size. then ignore it. if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) { @@ -460,30 +458,19 @@ private static List identifySegmentsToBeMergedBasedOnSize( } /** - * For calculating the size of a segment across all partition. - * @param partitionCount + * For calculating the size of the specified segment * @param storeLocation * @param tableIdentifier * @param segId * @return */ - private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation, + private static long getSizeOfSegment(String storeLocation, CarbonTableIdentifier tableIdentifier, String segId) { - long sizeOfOneSegmentAcrossPartition = 0; - // calculate size across partitions - for (int partition = 0; partition < partitionCount; partition++) { - - String loadPath = CarbonLoaderUtil - .getStoreLocation(storeLocation, tableIdentifier, segId, partition + ""); - - CarbonFile segmentFolder = - FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); - - long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder); - - sizeOfOneSegmentAcrossPartition += sizeOfEachSegment; - } - return sizeOfOneSegmentAcrossPartition; + String loadPath = CarbonLoaderUtil + .getStoreLocation(storeLocation, tableIdentifier, segId, "0"); + CarbonFile segmentFolder = + FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); + return getSizeOfFactFileInLoad(segmentFolder); } /** @@ -691,9 +678,6 @@ public static Map> combineNodeBlockMaps( /** * Removing the already merged segments from list. - * @param segments - * @param loadsToMerge - * @return */ public static List filterOutNewlyAddedSegments( List segments, @@ -701,17 +685,11 @@ public static List filterOutNewlyAddedSegments( // take complete list of segments. List list = new ArrayList<>(segments); - - List trimmedList = - new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - // sort list CarbonDataMergerUtil.sortSegments(list); // first filter out newly added segments. - trimmedList = list.subList(0, list.indexOf(lastSeg) + 1); - - return trimmedList; + return list.subList(0, list.indexOf(lastSeg) + 1); } diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java index 72e7b085412..e05be7d06e4 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java @@ -19,30 +19,18 @@ package org.apache.carbondata.spark.partition.api.impl; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; -import org.apache.carbondata.common.logging.LogService; -import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.scan.model.CarbonQueryPlan; import org.apache.carbondata.spark.partition.api.DataPartitioner; import org.apache.carbondata.spark.partition.api.Partition; -import org.apache.spark.sql.execution.command.Partitioner; public final class QueryPartitionHelper { - private static final LogService LOGGER = - LogServiceFactory.getLogService(QueryPartitionHelper.class.getName()); private static QueryPartitionHelper instance = new QueryPartitionHelper(); - private Properties properties; - private String defaultPartitionerClass; private Map partitionerMap = new HashMap(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); private Map loadBalancerMap = @@ -56,86 +44,11 @@ public static QueryPartitionHelper getInstance() { return instance; } - /** - * Read the properties from CSVFilePartitioner.properties - */ - private static Properties loadProperties() { - Properties properties = new Properties(); - - File file = new File("DataPartitioner.properties"); - FileInputStream fis = null; - try { - if (file.exists()) { - fis = new FileInputStream(file); - - properties.load(fis); - } - } catch (Exception e) { - LOGGER - .error(e, e.getMessage()); - } finally { - if (null != fis) { - try { - fis.close(); - } catch (IOException e) { - LOGGER.error(e, - e.getMessage()); - } - } - } - - return properties; - - } - - private void checkInitialization(String tableUniqueName, Partitioner partitioner) { - //Initialise if not done earlier - - //String nodeListString = null; - if (properties == null) { - properties = loadProperties(); - - // nodeListString = properties.getProperty("nodeList", "master,slave1,slave2,slave3"); - - defaultPartitionerClass = properties.getProperty("partitionerClass", - "org.apache.carbondata.spark.partition.api.impl.SampleDataPartitionerImpl"); - - LOGGER.info(this.getClass().getSimpleName() + " is using following configurations."); - LOGGER.info("partitionerClass : " + defaultPartitionerClass); - LOGGER.info("nodeList : " + Arrays.toString(partitioner.nodeList())); - } - - if (partitionerMap.get(tableUniqueName) == null) { - DataPartitioner dataPartitioner; - try { - dataPartitioner = - (DataPartitioner) Class.forName(partitioner.partitionClass()).newInstance(); - dataPartitioner.initialize("", new String[0], partitioner); - - List partitions = dataPartitioner.getAllPartitions(); - DefaultLoadBalancer loadBalancer = - new DefaultLoadBalancer(Arrays.asList(partitioner.nodeList()), partitions); - partitionerMap.put(tableUniqueName, dataPartitioner); - loadBalancerMap.put(tableUniqueName, loadBalancer); - } catch (ClassNotFoundException e) { - LOGGER.error(e, - e.getMessage()); - } catch (InstantiationException e) { - LOGGER.error(e, - e.getMessage()); - } catch (IllegalAccessException e) { - LOGGER.error(e, - e.getMessage()); - } - } - } - /** * Get partitions applicable for query based on filters applied in query */ - public List getPartitionsForQuery(CarbonQueryPlan queryPlan, Partitioner partitioner) { + public List getPartitionsForQuery(CarbonQueryPlan queryPlan) { String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName(); - checkInitialization(tableUniqueName, partitioner); DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); @@ -143,38 +56,22 @@ public List getPartitionsForQuery(CarbonQueryPlan queryPlan, Partitio return queryPartitions; } - public List getAllPartitions(String databaseName, String tableName, - Partitioner partitioner) { + public List getAllPartitions(String databaseName, String tableName) { String tableUniqueName = databaseName + '_' + tableName; - checkInitialization(tableUniqueName, partitioner); DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); return dataPartitioner.getAllPartitions(); } - public void removePartition(String databaseName, String tableName) { - String tableUniqueName = databaseName + '_' + tableName; - partitionerMap.remove(tableUniqueName); - } - /** * Get the node name where the partition is assigned to. */ - public String getLocation(Partition partition, String databaseName, String tableName, - Partitioner partitioner) { + public String getLocation(Partition partition, String databaseName, String tableName) { String tableUniqueName = databaseName + '_' + tableName; - checkInitialization(tableUniqueName, partitioner); DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName); return loadBalancer.getNodeForPartitions(partition); } - public String[] getPartitionedColumns(String databaseName, String tableName, - Partitioner partitioner) { - String tableUniqueName = databaseName + '_' + tableName; - checkInitialization(tableUniqueName, partitioner); - DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName); - return dataPartitioner.getPartitionedColumns(); - } } diff --git a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java index 7393b6734f4..d2e716f1055 100644 --- a/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java +++ b/integration/spark/src/main/java/org/apache/carbondata/spark/util/CarbonQueryUtil.java @@ -21,19 +21,12 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.constants.CarbonCommonConstants; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; -import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory; -import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType; import org.apache.carbondata.core.load.LoadMetadataDetails; -import org.apache.carbondata.core.util.CarbonUtil; import org.apache.carbondata.scan.model.CarbonQueryPlan; import org.apache.carbondata.spark.partition.api.Partition; import org.apache.carbondata.spark.partition.api.impl.DefaultLoadBalancer; @@ -42,7 +35,7 @@ import org.apache.carbondata.spark.splits.TableSplit; import org.apache.commons.lang3.StringUtils; -import org.apache.spark.sql.execution.command.Partitioner; + /** * This utilty parses the Carbon query plan to actual query model object. */ @@ -52,21 +45,20 @@ private CarbonQueryUtil() { } - /** * It creates the one split for each region server. */ public static synchronized TableSplit[] getTableSplits(String databaseName, String tableName, - CarbonQueryPlan queryPlan, Partitioner partitioner) throws IOException { + CarbonQueryPlan queryPlan) throws IOException { //Just create splits depends on locations of region servers List allPartitions = null; if (queryPlan == null) { allPartitions = - QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName, partitioner); + QueryPartitionHelper.getInstance().getAllPartitions(databaseName, tableName); } else { allPartitions = - QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan, partitioner); + QueryPartitionHelper.getInstance().getPartitionsForQuery(queryPlan); } TableSplit[] splits = new TableSplit[allPartitions.size()]; for (int i = 0; i < splits.length; i++) { @@ -74,7 +66,7 @@ public static synchronized TableSplit[] getTableSplits(String databaseName, Stri List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); Partition partition = allPartitions.get(i); String location = QueryPartitionHelper.getInstance() - .getLocation(partition, databaseName, tableName, partitioner); + .getLocation(partition, databaseName, tableName); locations.add(location); splits[i].setPartition(partition); splits[i].setLocations(locations); @@ -86,14 +78,12 @@ public static synchronized TableSplit[] getTableSplits(String databaseName, Stri /** * It creates the one split for each region server. */ - public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath, String[] nodeList, - int partitionCount) throws Exception { + public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath) throws Exception { //Just create splits depends on locations of region servers - FileType fileType = FileFactory.getFileType(sourcePath); DefaultLoadBalancer loadBalancer = null; - List allPartitions = getAllFilesForDataLoad(sourcePath, fileType, partitionCount); - loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions); + List allPartitions = getAllFilesForDataLoad(sourcePath); + loadBalancer = new DefaultLoadBalancer(new ArrayList(), allPartitions); TableSplit[] tblSplits = new TableSplit[allPartitions.size()]; for (int i = 0; i < tblSplits.length; i++) { tblSplits[i] = new TableSplit(); @@ -107,55 +97,6 @@ public static TableSplit[] getTableSplitsForDirectLoad(String sourcePath, String return tblSplits; } - /** - * It creates the one split for each region server. - */ - public static TableSplit[] getPartitionSplits(String sourcePath, String[] nodeList, - int partitionCount) throws Exception { - - //Just create splits depends on locations of region servers - FileType fileType = FileFactory.getFileType(sourcePath); - DefaultLoadBalancer loadBalancer = null; - List allPartitions = getAllPartitions(sourcePath, fileType, partitionCount); - loadBalancer = new DefaultLoadBalancer(Arrays.asList(nodeList), allPartitions); - TableSplit[] splits = new TableSplit[allPartitions.size()]; - for (int i = 0; i < splits.length; i++) { - splits[i] = new TableSplit(); - List locations = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - Partition partition = allPartitions.get(i); - String location = loadBalancer.getNodeForPartitions(partition); - locations.add(location); - splits[i].setPartition(partition); - splits[i].setLocations(locations); - } - return splits; - } - - public static void getAllFiles(String sourcePath, List partitionsFiles, FileType fileType) - throws Exception { - - if (!FileFactory.isFileExist(sourcePath, fileType, false)) { - throw new Exception("Source file doesn't exist at path: " + sourcePath); - } - - CarbonFile file = FileFactory.getCarbonFile(sourcePath, fileType); - if (file.isDirectory()) { - CarbonFile[] fileNames = file.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile pathname) { - return true; - } - }); - for (int i = 0; i < fileNames.length; i++) { - getAllFiles(fileNames[i].getPath(), partitionsFiles, fileType); - } - } else { - // add only csv files - if (file.getName().endsWith("csv")) { - partitionsFiles.add(file.getPath()); - } - } - } - /** * split sourcePath by comma */ @@ -169,64 +110,22 @@ public static void splitFilePath(String sourcePath, List partitionsFiles } } - private static List getAllFilesForDataLoad(String sourcePath, FileType fileType, - int partitionCount) throws Exception { + private static List getAllFilesForDataLoad(String sourcePath) throws Exception { List files = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA); List partitionList = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); Map> partitionFiles = new HashMap>(); - for (int i = 0; i < partitionCount; i++) { - partitionFiles.put(i, new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN)); - partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles.get(i))); - } - for (int i = 0; i < files.size(); i++) { - partitionFiles.get(i % partitionCount).add(files.get(i)); - } - return partitionList; - } + partitionFiles.put(0, new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN)); + partitionList.add(new PartitionMultiFileImpl(0 + "", partitionFiles.get(0))); - private static List getAllPartitions(String sourcePath, FileType fileType, - int partitionCount) throws Exception { - List files = new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - splitFilePath(sourcePath, files, CarbonCommonConstants.COMMA); - int[] numberOfFilesPerPartition = getNumberOfFilesPerPartition(files.size(), partitionCount); - int startIndex = 0; - int endIndex = 0; - List partitionList = - new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - if (numberOfFilesPerPartition != null) { - for (int i = 0; i < numberOfFilesPerPartition.length; i++) { - List partitionFiles = - new ArrayList(CarbonCommonConstants.CONSTANT_SIZE_TEN); - endIndex += numberOfFilesPerPartition[i]; - for (int j = startIndex; j < endIndex; j++) { - partitionFiles.add(files.get(j)); - } - startIndex += numberOfFilesPerPartition[i]; - partitionList.add(new PartitionMultiFileImpl(i + "", partitionFiles)); - } + for (int i = 0; i < files.size(); i++) { + partitionFiles.get(i % 1).add(files.get(i)); } return partitionList; } - private static int[] getNumberOfFilesPerPartition(int numberOfFiles, int partitionCount) { - int div = numberOfFiles / partitionCount; - int mod = numberOfFiles % partitionCount; - int[] numberOfNodeToScan = null; - if (div > 0) { - numberOfNodeToScan = new int[partitionCount]; - Arrays.fill(numberOfNodeToScan, div); - } else if (mod > 0) { - numberOfNodeToScan = new int[mod]; - } - for (int i = 0; i < mod; i++) { - numberOfNodeToScan[i] = numberOfNodeToScan[i] + 1; - } - return numberOfNodeToScan; - } - public static List getListOfSlices(LoadMetadataDetails[] details) { List slices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); if (null != details) { @@ -240,16 +139,4 @@ public static List getListOfSlices(LoadMetadataDetails[] details) { return slices; } - /** - * This method will clear the dictionary cache for a given map of columns and dictionary cache - * mapping - * - * @param columnToDictionaryMap - */ - public static void clearColumnDictionaryCache(Map columnToDictionaryMap) { - for (Map.Entry entry : columnToDictionaryMap.entrySet()) { - CarbonUtil.clearDictionaryCache(entry.getValue()); - } - } - } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala index 51d65e3f21d..5a02bfda888 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonCleanFilesRDD.scala @@ -39,7 +39,7 @@ class CarbonCleanFilesRDD[V: ClassTag]( sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner) + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map(s => new CarbonLoadPartition(id, s._2, s._1)) } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala deleted file mode 100644 index 68548931ad0..00000000000 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataFrameRDD.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.spark.rdd - -import org.apache.spark.sql.{CarbonContext, DataFrame, Row} -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan - -class CarbonDataFrameRDD(val cc: CarbonContext, logicalPlan: LogicalPlan) - extends DataFrame(cc, logicalPlan) { - - override def collect(): Array[Row] = { - - // executing the query - val rows: Array[Row] = super.collect() - - // result - rows - - } - -} diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala index 0b23657f571..87b5673f3b8 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataLoadRDD.scala @@ -200,11 +200,10 @@ class DataFileLoaderRDD[K, V]( // for table split partition var splits = Array[TableSplit]() if (carbonLoadModel.isDirectLoad) { - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath, - partitioner.nodeList, partitioner.partitionCount) + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) } else { splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null, partitioner) + carbonLoadModel.getTableName, null) } splits.zipWithIndex.map { case (split, index) => diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8445440ceeb..6c096070c4e 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -15,7 +15,6 @@ * limitations under the License. */ - package org.apache.carbondata.spark.rdd import java.util @@ -30,23 +29,20 @@ import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} -import org.apache.spark.{util => _, _} +import org.apache.spark.{SparkContext, SparkEnv, SparkException} import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext} -import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, -CompactionModel, Partitioner} +import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner} import org.apache.spark.sql.hive.DistributionUtil import org.apache.spark.util.{FileUtils, SplitUtils} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.{CarbonDataLoadSchema, CarbonTableIdentifier} import org.apache.carbondata.core.carbon.datastore.block.{Distributable, TableBlockInfo} -import org.apache.carbondata.core.carbon.metadata.CarbonMetadata import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, -CompactionType} +import org.apache.carbondata.integration.spark.merger.{CarbonCompactionUtil, CompactionCallable, CompactionType} import org.apache.carbondata.lcm.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.lcm.status.SegmentStatusManager import org.apache.carbondata.processing.etl.DataLoadingException @@ -55,6 +51,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep import org.apache.carbondata.spark._ import org.apache.carbondata.spark.load._ import org.apache.carbondata.spark.merger.CarbonDataMergerUtil +import org.apache.carbondata.spark.partition.api.Partition import org.apache.carbondata.spark.splits.TableSplit import org.apache.carbondata.spark.util.{CarbonQueryUtil, LoadMetadataUtil} @@ -67,17 +64,6 @@ object CarbonDataRDDFactory { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) - def mergeCarbonData( - sqlContext: SQLContext, - carbonLoadModel: CarbonLoadModel, - storeLocation: String, - storePath: String, - partitioner: Partitioner) { - val table = CarbonMetadata.getInstance() - .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName) - val metaDataPath: String = table.getMetaDataFilepath - } - def deleteLoadByDate( sqlContext: SQLContext, schema: CarbonDataLoadSchema, @@ -86,8 +72,7 @@ object CarbonDataRDDFactory { storePath: String, dateField: String, dateFieldActualName: String, - dateValue: String, - partitioner: Partitioner) { + dateValue: String) { val sc = sqlContext // Delete the records based on data @@ -103,7 +88,6 @@ object CarbonDataRDDFactory { dateField, dateFieldActualName, dateValue, - partitioner, table.getFactTableName, tableName, storePath, @@ -205,8 +189,10 @@ object CarbonDataRDDFactory { def alterTableForCompaction(sqlContext: SQLContext, alterTableModel: AlterTableModel, - carbonLoadModel: CarbonLoadModel, partitioner: Partitioner, storePath: String, - kettleHomePath: String, storeLocation: String): Unit = { + carbonLoadModel: CarbonLoadModel, + storePath: String, + kettleHomePath: String, + storeLocation: String): Unit = { var compactionSize: Long = 0 var compactionType: CompactionType = CompactionType.MINOR_COMPACTION if (alterTableModel.compactionType.equalsIgnoreCase("major")) { @@ -250,7 +236,6 @@ object CarbonDataRDDFactory { LOGGER.info("System level compaction lock is enabled.") handleCompactionForSystemLocking(sqlContext, carbonLoadModel, - partitioner, storePath, kettleHomePath, storeLocation, @@ -271,7 +256,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - partitioner, storePath, kettleHomePath, storeLocation, @@ -295,7 +279,6 @@ object CarbonDataRDDFactory { def handleCompactionForSystemLocking(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - partitioner: Partitioner, storePath: String, kettleHomePath: String, storeLocation: String, @@ -312,7 +295,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - partitioner, storePath, kettleHomePath, storeLocation, @@ -351,7 +333,6 @@ object CarbonDataRDDFactory { def executeCompaction(carbonLoadModel: CarbonLoadModel, storePath: String, compactionModel: CompactionModel, - partitioner: Partitioner, executor: ExecutorService, sqlContext: SQLContext, kettleHomePath: String, @@ -365,7 +346,6 @@ object CarbonDataRDDFactory { var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( storePath, carbonLoadModel, - partitioner.partitionCount, compactionModel.compactionSize, segList, compactionModel.compactionType @@ -386,7 +366,6 @@ object CarbonDataRDDFactory { compactionModel, kettleHomePath, carbonLoadModel, - partitioner, storeLocation ) @@ -417,7 +396,6 @@ object CarbonDataRDDFactory { loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged( storePath, carbonLoadModel, - partitioner.partitionCount, compactionModel.compactionSize, segList, compactionModel.compactionType @@ -439,7 +417,6 @@ object CarbonDataRDDFactory { compactionModel: CompactionModel, kettleHomePath: String, carbonLoadModel: CarbonLoadModel, - partitioner: Partitioner, storeLocation: String): Unit = { loadsToMerge.asScala.foreach(seg => { @@ -449,7 +426,6 @@ object CarbonDataRDDFactory { val compactionCallableModel = CompactionCallableModel(storePath, carbonLoadModel, - partitioner, storeLocation, compactionModel.carbonTable, kettleHomePath, @@ -468,7 +444,6 @@ object CarbonDataRDDFactory { def startCompactionThreads(sqlContext: SQLContext, carbonLoadModel: CarbonLoadModel, - partitioner: Partitioner, storePath: String, kettleHomePath: String, storeLocation: String, @@ -499,7 +474,6 @@ object CarbonDataRDDFactory { executeCompaction(carbonLoadModel: CarbonLoadModel, storePath: String, compactionModel: CompactionModel, - partitioner: Partitioner, executor, sqlContext, kettleHomePath, storeLocation ) triggeredCompactionStatus = true @@ -550,7 +524,6 @@ object CarbonDataRDDFactory { executeCompaction(newCarbonLoadModel, newCarbonLoadModel.getStorePath, newcompactionModel, - partitioner, executor, sqlContext, kettleHomePath, storeLocation ) } catch { @@ -672,7 +645,6 @@ object CarbonDataRDDFactory { handleCompactionForSystemLocking(sqlContext, carbonLoadModel, - partitioner, storePath, kettleHomePath, storeLocation, @@ -691,7 +663,6 @@ object CarbonDataRDDFactory { try { startCompactionThreads(sqlContext, carbonLoadModel, - partitioner, storePath, kettleHomePath, storeLocation, @@ -728,7 +699,7 @@ object CarbonDataRDDFactory { s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }") } // Check if any load need to be deleted before loading new data - deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, partitioner, storePath, + deleteLoadsAndUpdateMetadata(carbonLoadModel, carbonTable, storePath, isForceDeletion = false) if (null == carbonLoadModel.getLoadMetadataDetails) { readLoadMetadataDetails(carbonLoadModel, storePath) @@ -793,9 +764,7 @@ object CarbonDataRDDFactory { var splits = Array[TableSplit]() if (carbonLoadModel.isDirectLoad) { // get all table Splits, this part means files were divide to different partitions - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath, - partitioner.nodeList, partitioner.partitionCount - ) + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) // get all partition blocks from file list blocksGroupBy = splits.map { split => @@ -813,8 +782,7 @@ object CarbonDataRDDFactory { } else { // get all table Splits,when come to this, means data have been partition splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null, partitioner - ) + carbonLoadModel.getTableName, null) // get all partition blocks from factFilePath/uniqueID/ blocksGroupBy = splits.map { split => @@ -1060,7 +1028,7 @@ object CarbonDataRDDFactory { def deleteLoadsAndUpdateMetadata( carbonLoadModel: CarbonLoadModel, - table: CarbonTable, partitioner: Partitioner, + table: CarbonTable, storePath: String, isForceDeletion: Boolean) { if (LoadMetadataUtil.isLoadDeletionRequired(carbonLoadModel)) { @@ -1073,8 +1041,7 @@ object CarbonDataRDDFactory { // Delete marked loads val isUpdationRequired = DeleteLoadFolders - .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, - partitioner.partitionCount, isForceDeletion, details) + .deleteLoadFoldersFromFileSystem(carbonLoadModel, storePath, isForceDeletion, details) if (isUpdationRequired) { try { @@ -1113,20 +1080,17 @@ object CarbonDataRDDFactory { def dropTable( sc: SparkContext, schema: String, - table: String, - partitioner: Partitioner) { + table: String) { val v: Value[Array[Object]] = new ValueImpl() - new CarbonDropTableRDD(sc, v, schema, table, partitioner).collect + new CarbonDropTableRDD(sc, v, schema, table).collect } def cleanFiles( sc: SparkContext, carbonLoadModel: CarbonLoadModel, - storePath: String, - partitioner: Partitioner) { + storePath: String) { val table = org.apache.carbondata.core.carbon.metadata.CarbonMetadata.getInstance .getCarbonTable(carbonLoadModel.getDatabaseName + "_" + carbonLoadModel.getTableName) - val metaDataPath: String = table.getMetaDataFilepath val carbonCleanFilesLock = CarbonLockFactory .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier, LockUsage.CLEAN_FILES_LOCK @@ -1136,7 +1100,6 @@ object CarbonDataRDDFactory { LOGGER.info("Clean files lock has been successfully acquired.") deleteLoadsAndUpdateMetadata(carbonLoadModel, table, - partitioner, storePath, isForceDeletion = true) } else { diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala index 15f9b4465ca..343a6023264 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadByDateRDD.scala @@ -37,7 +37,6 @@ class CarbonDeleteLoadByDateRDD[K, V]( dateField: String, dateFieldActualName: String, dateValue: String, - partitioner: Partitioner, factTableName: String, dimTableName: String, storePath: String, @@ -47,7 +46,7 @@ class CarbonDeleteLoadByDateRDD[K, V]( sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner) + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {s => new CarbonLoadPartition(id, s._2, s._1) } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala index a78c67b5b32..26e1abc962f 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDeleteLoadRDD.scala @@ -38,7 +38,7 @@ class CarbonDeleteLoadRDD[V: ClassTag]( sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner) + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map {f => new CarbonLoadPartition(id, f._2, f._1) } diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala index abdeaf5fb6b..47689bdc95f 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropTableRDD.scala @@ -30,14 +30,13 @@ class CarbonDropTableRDD[V: ClassTag]( sc: SparkContext, valueClass: Value[V], databaseName: String, - tableName: String, - partitioner: Partitioner) + tableName: String) extends RDD[V](sc, Nil) { sc.setLocalProperty("spark.scheduler.pool", "DDL") override def getPartitions: Array[Partition] = { - val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null, partitioner) + val splits = CarbonQueryUtil.getTableSplits(databaseName, tableName, null) splits.zipWithIndex.map { s => new CarbonLoadPartition(id, s._2, s._1) } @@ -46,9 +45,6 @@ class CarbonDropTableRDD[V: ClassTag]( override def compute(theSplit: Partition, context: TaskContext): Iterator[V] = { val iter = new Iterator[V] { - val split = theSplit.asInstanceOf[CarbonLoadPartition] - - val partitionCount = partitioner.partitionCount // TODO: Clear Btree from memory var havePair = false diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 4e820c60eb6..249f2cdae24 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -29,13 +29,13 @@ import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.spark._ import org.apache.spark.rdd.RDD -import org.apache.spark.sql.CarbonContext +import org.apache.spark.sql.CarbonEnv import org.apache.spark.sql.execution.command.{CarbonMergerMapping, NodeInfo} import org.apache.spark.sql.hive.DistributionUtil import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} -import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TableBlockInfo, TaskBlockInfo} +import org.apache.carbondata.core.carbon.datastore.block.{Distributable, SegmentProperties, TaskBlockInfo} import org.apache.carbondata.core.carbon.metadata.blocklet.DataFileFooter import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CarbonUtilException} @@ -273,7 +273,7 @@ class CarbonMergerRDD[K, V]( val requiredExecutors = if (nodeBlockMapping.size > confExecutors) { confExecutors } else { nodeBlockMapping.size() } - CarbonContext.ensureExecutors(sparkContext, requiredExecutors) + CarbonEnv.ensureExecutors(sparkContext, requiredExecutors) logInfo("No.of Executors required=" + requiredExecutors + " , spark.executor.instances=" + confExecutors + ", no.of.nodes where data present=" + nodeBlockMapping.size()) diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala index 5fdbc5d1ab8..6c6076e57be 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala @@ -40,7 +40,6 @@ object Compactor { def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = { val storePath = compactionCallableModel.storePath - val partitioner = compactionCallableModel.partitioner val storeLocation = compactionCallableModel.storeLocation val carbonTable = compactionCallableModel.carbonTable val kettleHomePath = compactionCallableModel.kettleHomePath @@ -60,7 +59,6 @@ object Compactor { val mergeLoadStartTime = CarbonLoaderUtil.readCurrentTime() val carbonMergerMapping = CarbonMergerMapping(storeLocation, storePath, - partitioner, carbonTable.getMetaDataFilepath, mergedLoadName, kettleHomePath, diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 86c12f874f6..3a393ed49d2 100644 --- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -74,11 +74,10 @@ class NewCarbonDataLoadRDD[K, V]( var splits: Array[TableSplit] = null if (carbonLoadModel.isDirectLoad) { - splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath, - partitioner.nodeList, partitioner.partitionCount) + splits = CarbonQueryUtil.getTableSplitsForDirectLoad(carbonLoadModel.getFactFilePath) } else { splits = CarbonQueryUtil.getTableSplits(carbonLoadModel.getDatabaseName, - carbonLoadModel.getTableName, null, partitioner) + carbonLoadModel.getTableName, null) } splits.zipWithIndex.map { s => diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala index 5d2221d59ff..ba970833b68 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonContext.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.querystatistics.{QueryStatistic, QueryStatisticsConstants} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory} -import org.apache.carbondata.spark.rdd.CarbonDataFrameRDD class CarbonContext( val sc: SparkContext, @@ -134,7 +133,7 @@ class CarbonContext( val logicPlan: LogicalPlan = parseSql(sql) statistic.addStatistics(QueryStatisticsConstants.SQL_PARSE, System.currentTimeMillis()) recorder.recordStatisticsForDriver(statistic, queryId) - val result = new CarbonDataFrameRDD(this, logicPlan) + val result = new DataFrame(this, logicPlan) // We force query optimization to happen right away instead of letting it happen lazily like // when using the query DSL. This is so DDL commands behave as expected. This is only @@ -183,29 +182,4 @@ object CarbonContext { cache(sc) = cc } - /** - * - * Requesting the extra executors other than the existing ones. - * - * @param sc - * @param numExecutors - * @return - */ - final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = { - sc.schedulerBackend match { - case b: CoarseGrainedSchedulerBackend => - val requiredExecutors = numExecutors - b.numExistingExecutors - LOGGER - .info(s"number of executors is =$numExecutors existing executors are =" + - s"${ b.numExistingExecutors }" - ) - if (requiredExecutors > 0) { - b.requestExecutors(requiredExecutors) - } - true - case _ => - false - } - - } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala index 9f6b0b4ddd6..be779547c3d 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala @@ -17,15 +17,19 @@ package org.apache.spark.sql +import org.apache.spark.SparkContext +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.sql.hive.{CarbonMetastoreCatalog, HiveContext} +import org.apache.carbondata.common.logging.LogServiceFactory + /** * Carbon Environment for unified context */ case class CarbonEnv(hiveContext: HiveContext, carbonCatalog: CarbonMetastoreCatalog) object CarbonEnv { - val className = classOf[CarbonEnv].getCanonicalName + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) var carbonEnv: CarbonEnv = _ def getInstance(sqlContext: SQLContext): CarbonEnv = { @@ -36,6 +40,29 @@ object CarbonEnv { } carbonEnv } + + /** + * + * Requesting the extra executors other than the existing ones. + * + * @param sc + * @param numExecutors + * @return + */ + final def ensureExecutors(sc: SparkContext, numExecutors: Int): Boolean = { + sc.schedulerBackend match { + case b: CoarseGrainedSchedulerBackend => + val requiredExecutors = numExecutors - b.numExistingExecutors + LOGGER.info(s"number of executors is =$numExecutors existing executors are =" + + s"${ b.numExistingExecutors }") + if (requiredExecutors > 0) { + b.requestExecutors(requiredExecutors) + } + true + case _ => + false + } + } } diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala index 22cc548d131..b7673dbdb43 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala @@ -99,7 +99,6 @@ case class DataLoadTableFileMapping(table: String, loadPath: String) case class CarbonMergerMapping(storeLocation: String, storePath: String, - partitioner: Partitioner, metadataFilePath: String, mergedLoadName: String, kettleHomePath: String, @@ -126,7 +125,6 @@ case class CompactionModel(compactionSize: Long, case class CompactionCallableModel(storePath: String, carbonLoadModel: CarbonLoadModel, - partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String, @@ -493,7 +491,6 @@ private[sql] case class AlterTableCompaction(alterTableModel: AlterTableModel) e .alterTableForCompaction(sqlContext, alterTableModel, carbonLoadModel, - partitioner, relation.tableMeta.storePath, kettleHomePath, storeLocation @@ -1162,8 +1159,7 @@ private[sql] case class DeleteLoadByDate( CarbonEnv.getInstance(sqlContext).carbonCatalog.storePath, level, actualColName, - dateValue, - relation.tableMeta.partitioner) + dateValue) LOGGER.audit(s"The delete load by date $dateValue is successful for $dbName.$tableName.") Seq.empty } @@ -1201,8 +1197,7 @@ private[sql] case class CleanFiles( CarbonDataRDDFactory.cleanFiles( sqlContext.sparkContext, carbonLoadModel, - relation.tableMeta.storePath, - relation.tableMeta.partitioner) + relation.tableMeta.storePath) LOGGER.audit(s"Clean files request is successfull for $dbName.$tableName.") } catch { case ex: Exception => diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala index 25c36c5db4f..02453bd1345 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala @@ -21,15 +21,12 @@ import java.net.{InetAddress, InterfaceAddress, NetworkInterface} import scala.collection.JavaConverters._ import org.apache.spark.SparkContext -import org.apache.spark.sql.CarbonContext +import org.apache.spark.sql.{CarbonContext, CarbonEnv} import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.carbon.datastore.block.Distributable import org.apache.carbondata.spark.load.CarbonLoaderUtil -/** - * - */ object DistributionUtil { @transient val LOGGER = LogServiceFactory.getLogService(CarbonContext.getClass.getName) @@ -131,7 +128,7 @@ object DistributionUtil { } val startTime = System.currentTimeMillis() - CarbonContext.ensureExecutors(sparkContext, requiredExecutors) + CarbonEnv.ensureExecutors(sparkContext, requiredExecutors) var nodes = DistributionUtil.getNodeList(sparkContext) var maxTimes = 30 while (nodes.length < requiredExecutors && maxTimes > 0) {