Skip to content

Commit

Permalink
feat: Support CometRowToColumnar operator
Browse files Browse the repository at this point in the history
  • Loading branch information
advancedxy committed Apr 8, 2024
1 parent 8a512ba commit 2c6a4f7
Show file tree
Hide file tree
Showing 12 changed files with 849 additions and 29 deletions.
19 changes: 19 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,25 @@ object CometConf {
"enabled when reading from Iceberg tables.")
.booleanConf
.createWithDefault(false)

val COMET_ROW_TO_COLUMNAR_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.rowToColumnar.enabled")
.internal()
.doc("Whether to enable row to columnar conversion in Comet. When this is turned on, " +
"Comet will convert row-based operators in spark.comet.rowToColumnar.sourceNodeList into " +
"columnar based before processing.")
.booleanConf
.createWithDefault(false)

val COMET_ROW_TO_COLUMNAR_SOURCE_NODE_LIST: ConfigEntry[Seq[String]] =
conf("spark.comet.rowToColumnar.sourceNodeList")
.doc(
"A comma-separated list of row-based data sources that will be converted to columnar " +
"format when 'spark.comet.rowToColumnar.enabled' is true")
.stringConf
.toSequence
.createWithDefault(Seq("Range,InMemoryTableScan"))

}

object ConfigHelpers {
Expand Down
17 changes: 17 additions & 0 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch
Expand Down Expand Up @@ -132,3 +134,18 @@ class NativeUtil {
new ColumnarBatch(arrayVectors.toArray, maxNumRows)
}
}

object NativeUtil {
def rootAsBatch(arrowRoot: VectorSchemaRoot): ColumnarBatch = {
rootAsBatch(arrowRoot, null)
}

def rootAsBatch(arrowRoot: VectorSchemaRoot, provider: DictionaryProvider): ColumnarBatch = {
val vectors = (0 until arrowRoot.getFieldVectors.size()).map { i =>
val vector = arrowRoot.getFieldVectors.get(i)
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, provider)
}
new ColumnarBatch(vectors.toArray, arrowRoot.getRowCount)
}
}
13 changes: 2 additions & 11 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import scala.collection.JavaConverters.collectionAsScalaIterableConverter

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
Expand All @@ -47,14 +45,7 @@ case class StreamReader(channel: ReadableByteChannel) extends AutoCloseable {
}

private def rootAsBatch(root: VectorSchemaRoot): ColumnarBatch = {
val columns = root.getFieldVectors.asScala.map { vector =>
// Native shuffle always uses decimal128.
CometVector.getVector(vector, true, arrowReader).asInstanceOf[ColumnVector]
}.toArray

val batch = new ColumnarBatch(columns)
batch.setNumRows(root.getRowCount)
batch
NativeUtil.rootAsBatch(root, arrowReader)
}

override def close(): Unit = {
Expand Down
Loading

0 comments on commit 2c6a4f7

Please sign in to comment.