Skip to content

Commit

Permalink
[CARBONDATA-463] Extract code to spark-common. This closes apache#365
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed Nov 30, 2016
2 parents 567fa51 + 66ccd30 commit d94b99f
Show file tree
Hide file tree
Showing 89 changed files with 1,070 additions and 801 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object GenerateDictionaryExample {
dictFolderPath: String) {
val dataBaseName = carbonTableIdentifier.getDatabaseName
val tableName = carbonTableIdentifier.getTableName
val carbonRelation = CarbonEnv.getInstance(cc).carbonCatalog.
val carbonRelation = CarbonEnv.get.carbonMetastore.
lookupRelation1(Option(dataBaseName),
tableName) (cc).asInstanceOf[CarbonRelation]
val carbonTable = carbonRelation.tableMeta.carbonTable
Expand Down
2 changes: 1 addition & 1 deletion integration/spark-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_${scala.binary.version}</artifactId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -927,21 +927,18 @@ private static void createFlattenedListFromMap(List<Distributable> blockInfos,
* @param carbonStorePath
* @param dbName
* @param tableName
* @param partitionCount
* @param segmentId
*/
public static void checkAndCreateCarbonDataLocation(String carbonStorePath, String dbName,
String tableName, int partitionCount, String segmentId) {
String tableName, String segmentId) {
CarbonTable carbonTable = CarbonMetadata.getInstance()
.getCarbonTable(dbName + CarbonCommonConstants.UNDERSCORE + tableName);
CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
CarbonTablePath carbonTablePath =
CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
for (int i = 0; i < partitionCount; i++) {
String carbonDataDirectoryPath =
carbonTablePath.getCarbonDataDirectoryPath(String.valueOf(i), segmentId);
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
}
String carbonDataDirectoryPath =
carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

import java.util.ArrayList;
import java.util.Collections;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -38,7 +38,6 @@
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.CarbonUtilException;

import org.apache.spark.sql.hive.TableMeta;

/**
* Utility Class for the Compaction Flow.
Expand Down Expand Up @@ -270,12 +269,12 @@ public static boolean createCompactionRequiredFile(String metaFolderPath,
public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
List<CarbonTableIdentifier> skipList) {
for (TableMeta table : tableMetas) {
CarbonTable ctable = table.carbonTable();
CarbonTable ctable = table.carbonTable;
String metadataPath = ctable.getMetaDataFilepath();
// check for the compaction required file and at the same time exclude the tables which are
// present in the skip list.
if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
.contains(table.carbonTableIdentifier())) {
.contains(table.carbonTableIdentifier)) {
return table;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
import org.apache.carbondata.core.load.LoadMetadataDetails;
import org.apache.carbondata.core.util.CarbonProperties;
import org.apache.carbondata.integration.spark.merger.CompactionType;
import org.apache.carbondata.lcm.locks.ICarbonLock;
import org.apache.carbondata.lcm.status.SegmentStatusManager;
import org.apache.carbondata.processing.model.CarbonLoadModel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

import java.util.concurrent.Callable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

/**
* This enum is used to define the types of Compaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

import java.io.File;
import java.util.AbstractQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,26 @@
* under the License.
*/

/**
* Project Name : Carbon
* Module Name : CARBON Data Processor
* Author : R00903928
* Created Date : 21-Sep-2015
* FileName : DeleteLoadFromMetadata.java
* Description : Kettle step to generate MD Key
* Class Version : 1.0
*/
package org.apache.carbondata.spark.load;
package org.apache.carbondata.spark.merger;

import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
import java.io.Serializable;

public final class DeleteLoadFromMetadata {
import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;

private static final LogService LOGGER =
LogServiceFactory.getLogService(DeleteLoadFromMetadata.class.getName());
public class TableMeta implements Serializable {

private DeleteLoadFromMetadata() {
private static final long serialVersionUID = -1749874611119829431L;

}
public CarbonTableIdentifier carbonTableIdentifier;
public String storePath;
public CarbonTable carbonTable;

public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
CarbonTable carbonTable) {
this.carbonTableIdentifier = carbonTableIdentifier;
this.storePath = storePath;
this.carbonTable = carbonTable;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.carbondata.integration.spark.merger;
package org.apache.carbondata.spark.merger;

import java.util.ArrayList;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.spark

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.optimizer.AttributeReferenceWrapper

case class CarbonAliasDecoderRelation() {

val attrMap = new java.util.HashMap[AttributeReferenceWrapper, Attribute]

def put(key: Attribute, value: Attribute): Unit = {
attrMap.put(AttributeReferenceWrapper(key), value)
}

def getOrElse(key: Attribute, default: Attribute): Attribute = {
val value = attrMap.get(AttributeReferenceWrapper(key))
if (value == null) {
default
} else {
if (value.equals(key)) {
value
} else {
getOrElse(value, value)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.carbondata.spark
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.optimizer.{AttributeReferenceWrapper, CarbonAliasDecoderRelation}
import org.apache.spark.sql.optimizer.AttributeReferenceWrapper
import org.apache.spark.sql.sources
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class CarbonOption(options: Map[String, String]) {

def tableName: String = options.getOrElse("tableName", "default_table")

def tablePath: String = s"$dbName/$tableName"

def tableId: String = options.getOrElse("tableId", "default_table_id")

def partitionCount: String = options.getOrElse("partitionCount", "1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package org.apache.carbondata.spark
import org.apache.carbondata.core.carbon.{CarbonTableIdentifier, ColumnIdentifier}
import org.apache.carbondata.core.carbon.metadata.schema.table.column.{CarbonDimension, ColumnSchema}


/**
* Column validator
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.carbondata.spark

import scala.collection.mutable.HashMap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.csv

import java.io.IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.csv.newapi

import java.nio.charset.Charset
Expand All @@ -24,14 +25,41 @@ import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat}
import org.apache.spark.SparkContext
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.util.FileUtils

import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants

/**
* create RDD use CarbonDataLoadInputFormat
*/
private[csv] object CarbonTextFile {
object CarbonTextFile {

private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)

def configSplitMaxSize(context: SparkContext, filePaths: String,
hadoopConfiguration: Configuration): Unit = {
val defaultParallelism = if (context.defaultParallelism < 1) {
1
} else {
context.defaultParallelism
}
val spaceConsumed = FileUtils.getSpaceOccupied(filePaths)
val blockSize =
hadoopConfiguration.getLongBytes("dfs.blocksize", CarbonCommonConstants.CARBON_256MB)
LOGGER.info("[Block Distribution]")
// calculate new block size to allow use all the parallelism
if (spaceConsumed < defaultParallelism * blockSize) {
var newSplitSize: Long = spaceConsumed / defaultParallelism
if (newSplitSize < CarbonCommonConstants.CARBON_16MB) {
newSplitSize = CarbonCommonConstants.CARBON_16MB
}
hadoopConfiguration.set(FileInputFormat.SPLIT_MAXSIZE, newSplitSize.toString)
LOGGER.info(s"totalInputSpaceConsumed: $spaceConsumed , " +
s"defaultParallelism: $defaultParallelism")
LOGGER.info(s"mapreduce.input.fileinputformat.split.maxsize: ${ newSplitSize.toString }")
}
}
private def newHadoopRDD(sc: SparkContext, location: String) = {
val hadoopConfiguration = new Configuration(sc.hadoopConfiguration)
hadoopConfiguration.setStrings(FileInputFormat.INPUT_DIR, location)
Expand All @@ -41,7 +69,7 @@ private[csv] object CarbonTextFile {
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec""".stripMargin)

CarbonDataRDDFactory.configSplitMaxSize(sc, location, hadoopConfiguration)
configSplitMaxSize(sc, location, hadoopConfiguration)
new NewHadoopRDD[LongWritable, Text](
sc,
classOf[TextInputFormat],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.databricks.spark.csv.newapi

import com.databricks.spark.csv.{CarbonCsvRelation, CsvSchemaRDD}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.Partitioner
import org.apache.carbondata.spark.Value
import org.apache.carbondata.spark.util.CarbonQueryUtil


class CarbonCleanFilesRDD[V: ClassTag](
sc: SparkContext,
valueClass: Value[V],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
* limitations under the License.
*/


package org.apache.carbondata.spark.rdd

import java.lang.Long
Expand All @@ -28,12 +27,9 @@ import scala.util.Random

import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.DataLoadCoalescedRDD
import org.apache.spark.rdd.DataLoadPartitionWrap
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.util.TaskContextUtil
import org.apache.spark.util.SparkUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
Expand Down Expand Up @@ -175,7 +171,6 @@ class SparkPartitionLoader(model: CarbonLoadModel,
* @param carbonLoadModel Carbon load model which contain the load info
* @param storePath The store location
* @param kettleHomePath The kettle home path
* @param partitioner Partitioner which specify how to partition
* @param columinar whether it is columinar
* @param loadCount Current load count
* @param tableCreationTime Time of creating table
Expand All @@ -191,7 +186,6 @@ class DataFileLoaderRDD[K, V](
carbonLoadModel: CarbonLoadModel,
storePath: String,
kettleHomePath: String,
partitioner: Partitioner,
columinar: Boolean,
loadCount: Integer,
tableCreationTime: Long,
Expand Down Expand Up @@ -563,7 +557,7 @@ class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
context)
}
def initialize: Unit = {
TaskContextUtil.setTaskContext(context)
SparkUtil.setTaskContext(context)
}
}
/**
Expand Down Expand Up @@ -598,7 +592,7 @@ class RddIterator(rddIter: Iterator[Row],
}

def initialize: Unit = {
TaskContextUtil.setTaskContext(context)
SparkUtil.setTaskContext(context)
}

}
Loading

0 comments on commit d94b99f

Please sign in to comment.