Skip to content

Commit

Permalink
DataLoadCoalescedRDD
Browse files Browse the repository at this point in the history
DataLoadPartitionCoalescer

concurrently read dataframe

add test case

fix comments

fix comments
  • Loading branch information
QiangCai authored and jackylk committed Nov 29, 2016
1 parent 879bfe7 commit f8a0c87
Show file tree
Hide file tree
Showing 17 changed files with 921 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,20 @@ import scala.util.Random

import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.DataLoadCoalescedRDD
import org.apache.spark.rdd.DataLoadPartitionWrap
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.command.Partitioner
import org.apache.spark.sql.Row
import org.apache.spark.util.TaskContextUtil

import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory}
import org.apache.carbondata.processing.constants.DataProcessorConstants
import org.apache.carbondata.processing.csvreaderstep.JavaRddIterator
import org.apache.carbondata.processing.csvreaderstep.RddInputUtils
import org.apache.carbondata.processing.etl.DataLoadingException
import org.apache.carbondata.processing.graphgenerator.GraphGenerator
Expand All @@ -46,6 +50,7 @@ import org.apache.carbondata.spark.DataLoadResult
import org.apache.carbondata.spark.load._
import org.apache.carbondata.spark.splits.TableSplit
import org.apache.carbondata.spark.util.CarbonQueryUtil
import org.apache.carbondata.spark.util.CarbonScalaUtil

/**
* This partition class use to split by TableSplit
Expand Down Expand Up @@ -125,6 +130,7 @@ class SparkPartitionLoader(model: CarbonLoadModel,
try {
CarbonLoaderUtil.executeGraph(model, storeLocation, storePath,
kettleHomePath)
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
} catch {
case e: DataLoadingException => if (e.getErrorCode ==
DataProcessorConstants.BAD_REC_FOUND) {
Expand Down Expand Up @@ -235,14 +241,11 @@ class DataFileLoaderRDD[K, V](
theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)

carbonLoadModel.setSegmentId(String.valueOf(loadCount))
setModelAndBlocksInfo()
val loader = new SparkPartitionLoader(model, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
loader.initialize
if (model.isRetentionRequest) {
recreateAggregationTableForRetention
} else if (model.isAggLoadRequest) {
Expand Down Expand Up @@ -495,7 +498,7 @@ class DataFrameLoaderRDD[K, V](
loadCount: Integer,
tableCreationTime: Long,
schemaLastUpdatedTime: Long,
prev: RDD[Row]) extends RDD[(K, V)](prev) {
prev: DataLoadCoalescedRDD[Row]) extends RDD[(K, V)](prev) {

sc.setLocalProperty("spark.scheduler.pool", "DDL")

Expand All @@ -509,18 +512,19 @@ class DataFrameLoaderRDD[K, V](
theSplit.index
try {
loadMetadataDetails.setPartitionCount(partitionID)
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
carbonLoadModel.setPartitionId(partitionID)
carbonLoadModel.setSegmentId(String.valueOf(loadCount))
carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
val loader = new SparkPartitionLoader(carbonLoadModel, theSplit.index, storePath,
kettleHomePath, loadCount, loadMetadataDetails)
loader.initialize()
loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
loader.initialize
val rddIteratorKey = UUID.randomUUID().toString
try {
RddInputUtils.put(rddIteratorKey,
new RddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel))
new PartitionIterator(
firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context),
carbonLoadModel,
context))
carbonLoadModel.setRddIteratorKey(rddIteratorKey)
loader.run()
} finally {
Expand Down Expand Up @@ -548,77 +552,53 @@ class DataFrameLoaderRDD[K, V](
override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
}

class PartitionIterator(partitionIter: Iterator[DataLoadPartitionWrap[Row]],
carbonLoadModel: CarbonLoadModel,
context: TaskContext) extends JavaRddIterator[JavaRddIterator[Array[String]]] {
def hasNext: Boolean = partitionIter.hasNext
def next: JavaRddIterator[Array[String]] = {
val value = partitionIter.next
new RddIterator(value.rdd.iterator(value.partition, context),
carbonLoadModel,
context)
}
def initialize: Unit = {
TaskContextUtil.setTaskContext(context)
}
}
/**
* This class wrap Scala's Iterator to Java's Iterator.
* It also convert all columns to string data to use csv data loading flow.
*
* @param rddIter
* @param carbonLoadModel
* @param context
*/
class RddIterator(rddIter: Iterator[Row],
carbonLoadModel: CarbonLoadModel) extends java.util.Iterator[Array[String]] {
carbonLoadModel: CarbonLoadModel,
context: TaskContext) extends JavaRddIterator[Array[String]] {

val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2

val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
def hasNext: Boolean = rddIter.hasNext

private def getString(value: Any, level: Int = 1): String = {
if (value == null) {
""
} else {
value match {
case s: String => s
case i: java.lang.Integer => i.toString
case d: java.lang.Double => d.toString
case t: java.sql.Timestamp => format format t
case d: java.sql.Date => format format d
case d: java.math.BigDecimal => d.toPlainString
case b: java.lang.Boolean => b.toString
case s: java.lang.Short => s.toString
case f: java.lang.Float => f.toString
case bs: Array[Byte] => new String(bs)
case s: scala.collection.Seq[Any] =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
}
val builder = new StringBuilder()
s.foreach { x =>
builder.append(getString(x, level + 1)).append(delimiter)
}
builder.substring(0, builder.length - 1)
case m: scala.collection.Map[Any, Any] =>
throw new Exception("Unsupported data type: Map")
case r: org.apache.spark.sql.Row =>
val delimiter = if (level == 1) {
delimiterLevel1
} else {
delimiterLevel2
}
val builder = new StringBuilder()
for (i <- 0 until r.length) {
builder.append(getString(r(i), level + 1)).append(delimiter)
}
builder.substring(0, builder.length - 1)
case other => other.toString
}
}
}

def next: Array[String] = {
val row = rddIter.next()
val columns = new Array[String](row.length)
for (i <- 0 until row.length) {
columns(i) = getString(row(i))
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, format)
}
columns
}

def remove(): Unit = {
def initialize: Unit = {
TaskContextUtil.setTaskContext(context)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.{SparkContext, SparkEnv, SparkException}
import org.apache.spark.sql.{CarbonEnv, DataFrame, SQLContext}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer}
import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel, CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
Expand Down Expand Up @@ -902,21 +903,30 @@ object CarbonDataRDDFactory {
}

def loadDataFrame(): Unit = {
var rdd = dataFrame.get.rdd
var numPartitions = DistributionUtil.getNodeList(sqlContext.sparkContext).length
numPartitions = Math.max(1, Math.min(numPartitions, rdd.partitions.length))
rdd = rdd.coalesce(numPartitions, shuffle = false)

status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
storePath,
kettleHomePath,
columinar,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
rdd).collect()
try {
val rdd = dataFrame.get.rdd
val nodeNumOfData = rdd.partitions.flatMap[String, Array[String]]{ p =>
DataLoadPartitionCoalescer.getPreferredLocs(rdd, p).map(_.host)
}.distinct.size
val nodes = DistributionUtil.ensureExecutorsByNumberAndGetNodeList(nodeNumOfData,
sqlContext.sparkContext)
val newRdd = new DataLoadCoalescedRDD[Row](rdd, nodes.toArray.distinct)

status = new DataFrameLoaderRDD(sqlContext.sparkContext,
new DataLoadResultImpl(),
carbonLoadModel,
storePath,
kettleHomePath,
columinar,
currentLoadCount,
tableCreationTime,
schemaLastUpdatedTime,
newRdd).collect()
} catch {
case ex: Exception =>
LOGGER.error(ex, "load data frame failed")
throw ex
}
}

CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
Expand All @@ -932,28 +942,32 @@ object CarbonDataRDDFactory {
loadDataFile()
}
val newStatusMap = scala.collection.mutable.Map.empty[String, String]
status.foreach { eachLoadStatus =>
val state = newStatusMap.get(eachLoadStatus._1)
state match {
case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
if eachLoadStatus._2.getLoadStatus ==
CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
case _ =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
if (status.nonEmpty) {
status.foreach { eachLoadStatus =>
val state = newStatusMap.get(eachLoadStatus._1)
state match {
case Some(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
case Some(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
if eachLoadStatus._2.getLoadStatus ==
CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
case _ =>
newStatusMap.put(eachLoadStatus._1, eachLoadStatus._2.getLoadStatus)
}
}
}

newStatusMap.foreach {
case (key, value) =>
if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
} else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
!loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
}
newStatusMap.foreach {
case (key, value) =>
if (value == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
} else if (value == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
!loadStatus.equals(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)) {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
}
}
} else {
loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
}

if (loadStatus != CarbonCommonConstants.STORE_LOADSTATUS_FAILURE &&
Expand Down Expand Up @@ -1116,6 +1130,4 @@ object CarbonDataRDDFactory {
CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd

import java.io.{DataInputStream, InputStreamReader}
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.regex.Pattern

import scala.collection.mutable
Expand All @@ -42,6 +43,7 @@ import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
import org.apache.carbondata.processing.model.CarbonLoadModel
import org.apache.carbondata.spark.load.CarbonLoaderUtil
import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}
import org.apache.carbondata.spark.util.CarbonScalaUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil
import org.apache.carbondata.spark.util.GlobalDictionaryUtil._

Expand Down Expand Up @@ -157,7 +159,8 @@ case class DictionaryLoadModel(table: CarbonTableIdentifier,
isFirstLoad: Boolean,
hdfsTempLocation: String,
lockType: String,
zooKeeperUrl: String) extends Serializable
zooKeeperUrl: String,
serializationNullFormat: String) extends Serializable

case class ColumnDistinctValues(values: Array[String], rowCount: Long) extends Serializable

Expand Down Expand Up @@ -251,13 +254,17 @@ class CarbonBlockDistinctValuesCombineRDD(
val dimNum = model.dimensions.length
var row: Row = null
val rddIter = firstParent[Row].iterator(split, context)
val formatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val format = new SimpleDateFormat(formatString)
// generate block distinct value set
while (rddIter.hasNext) {
row = rddIter.next()
if (row != null) {
rowCount += 1
for (i <- 0 until dimNum) {
dimensionParsers(i).parseString(row.getString(i))
dimensionParsers(i).parseString(CarbonScalaUtil.getString(row.get(i),
model.serializationNullFormat, model.delimiters(0), model.delimiters(1), format))
}
}
}
Expand Down
Loading

0 comments on commit f8a0c87

Please sign in to comment.