Skip to content

Commit

Permalink
lazy rdd iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
QiangCai authored and gvramana committed Apr 11, 2017
1 parent d51387b commit e52e641
Showing 1 changed file with 81 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType}
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
import org.apache.spark.{Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionWrap, RDD}
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.sql.Row
import org.apache.spark.util.SparkUtil

Expand Down Expand Up @@ -408,19 +409,14 @@ class NewDataFrameLoaderRDD[K, V](
val recordReaders = mutable.Buffer[CarbonIterator[Array[AnyRef]]]()
val partitionIterator = firstParent[DataLoadPartitionWrap[Row]].iterator(theSplit, context)
val serializer = SparkEnv.get.closureSerializer.newInstance()
var serializeBuffer: ByteBuffer = null
var serializeBytes: Array[Byte] = null
while(partitionIterator.hasNext) {
val value = partitionIterator.next()
val newInstance = {
if (serializeBuffer == null) {
serializeBuffer = serializer.serialize[RDD[Row]](value.rdd)
}
serializeBuffer.rewind()
serializer.deserialize[RDD[Row]](serializeBuffer)
if (serializeBytes == null) {
serializeBytes = serializer.serialize[RDD[Row]](value.rdd).array()
}
recordReaders += new NewRddIterator(newInstance.iterator(value.partition, context),
carbonLoadModel,
context)
recordReaders += new LazyRddIterator(serializer, serializeBytes, value.partition,
carbonLoadModel, context)
}

val loader = new SparkPartitionLoader(model,
Expand Down Expand Up @@ -477,15 +473,16 @@ class NewRddIterator(rddIter: Iterator[Row],
carbonLoadModel: CarbonLoadModel,
context: TaskContext) extends CarbonIterator[Array[AnyRef]] {

val timeStampformatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
val timeStampFormat = new SimpleDateFormat(timeStampformatString)
val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
private val timeStampformatString = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
private val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
.CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
val dateFormat = new SimpleDateFormat(dateFormatString)
val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
val serializationNullFormat =
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
def hasNext: Boolean = rddIter.hasNext

Expand All @@ -499,8 +496,73 @@ class NewRddIterator(rddIter: Iterator[Row],
columns
}

override def initialize: Unit = {
override def initialize(): Unit = {
SparkUtil.setTaskContext(context)
}

}

/**
* LazyRddIterator invoke rdd.iterator method when invoking hasNext method.
* @param serializer
* @param serializeBytes
* @param partition
* @param carbonLoadModel
* @param context
*/
class LazyRddIterator(serializer: SerializerInstance,
serializeBytes: Array[Byte],
partition: Partition,
carbonLoadModel: CarbonLoadModel,
context: TaskContext) extends CarbonIterator[Array[AnyRef]] {

private val timeStampformatString = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
private val timeStampFormat = new SimpleDateFormat(timeStampformatString)
private val dateFormatString = CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
private val dateFormat = new SimpleDateFormat(dateFormatString)
private val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)

private var rddIter: Iterator[Row] = null
private var uninitialized = true
private var closed = false

def hasNext: Boolean = {
if (uninitialized) {
uninitialized = false
rddIter = serializer.deserialize[RDD[Row]](ByteBuffer.wrap(serializeBytes))
.iterator(partition, context)
}
if (closed) {
false
} else {
rddIter.hasNext
}
}

def next: Array[AnyRef] = {
val row = rddIter.next()
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
}
columns
}

override def initialize(): Unit = {
SparkUtil.setTaskContext(context)
}

override def close(): Unit = {
closed = true
rddIter = null
}

}

0 comments on commit e52e641

Please sign in to comment.