Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,17 @@ case class NativeParquetInsertIntoHiveTableExec(
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) {
override def newTaskInstance(): WriteTaskStatsTracker = {
new BasicWriteTaskStatsTracker(serializableHadoopConf.value) {
override def newRow(_filePath: String, _row: InternalRow): Unit = {}
override def newRow(filePath: String, row: InternalRow): Unit = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.newRow(filePath, row)
}
}

override def closeFile(filePath: String): Unit = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.closeFile(filePath)
}

val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove()
for (_ <- 0L until outputFileStat.numRows) {
super.newRow(filePath, null)
Expand Down Expand Up @@ -147,12 +155,23 @@ case class NativeParquetInsertIntoHiveTableExec(
mutable.ArrayBuffer.empty

override def newPartition(partitionValues: InternalRow): Unit = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.newPartition(partitionValues)
}
partitions.append(partitionValues)
}

override def newRow(_row: InternalRow): Unit = {}
override def newRow(row: InternalRow): Unit = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.newRow(row)
}
}

override def getFinalStats(): WriteTaskStats = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.getFinalStats()
}

val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove()
BasicWriteTaskStats(
partitions = partitions,
Expand All @@ -179,9 +198,17 @@ case class NativeParquetInsertIntoHiveTableExec(
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics) {
override def newTaskInstance(): WriteTaskStatsTracker = {
new BasicWriteTaskStatsTracker(serializableHadoopConf.value) {
override def newRow(_row: InternalRow): Unit = {}
override def newRow(row: InternalRow): Unit = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.newRow(row)
}
}

override def getFinalStats(): WriteTaskStats = {
if (!ParquetSinkTaskContext.get.isNative) {
return super.getFinalStats()
}

val outputFileStat = ParquetSinkTaskContext.get.processedOutputFiles.remove()
BasicWriteTaskStats(
numPartitions = 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class BlazeMapredParquetOutputFormat
case class OutputFileStat(path: String, numRows: Long, numBytes: Long)

class ParquetSinkTaskContext {
var isNative: Boolean = false
val processingOutputFiles = new LinkedBlockingDeque[String]()
val processedOutputFiles = new util.ArrayDeque[OutputFileStat]()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ abstract class NativeParquetSinkBase(
inputRDD.isShuffleReadFull,
(partition, context) => {

// mark for native parquet sink
ParquetSinkTaskContext.get.isNative = true

// init hadoop fs
val resourceId = s"NativeParquetSinkExec:${UUID.randomUUID().toString}"
JniBridge.resourcesMap.put(
Expand Down
Loading