Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7600][VL] Prepare test case for the removal of workaround code for empty schema batches #7601

Merged
merged 10 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/velox_backend_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:
df -a
yum install ccache -y
bash dev/ci-velox-buildstatic-centos-7.sh
- name: Save ccache
- name: Save Ccache
uses: actions/cache/save@v3
id: ccache
with:
Expand Down Expand Up @@ -76,7 +76,7 @@ jobs:
df -a
bash dev/ci-velox-buildshared-centos-8.sh
ccache -s
- name: Save ccache
- name: Save Ccache
uses: actions/cache/save@v3
id: ccache
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,11 @@ case class ColumnarPartialProjectExec(original: ProjectExec, child: SparkPlan)(
val proj = MutableProjection.create(replacedAliasUdf, projectAttributes.toSeq)
val numRows = childData.numRows()
val start = System.currentTimeMillis()
val arrowBatch = if (childData.numCols() == 0 || ColumnarBatches.isHeavyBatch(childData)) {
val arrowBatch = if (childData.numCols() == 0) {
childData
} else ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), childData)
} else {
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), childData)
}
c2a += System.currentTimeMillis() - start

val schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.iterator.Iterators
Expand All @@ -26,7 +26,7 @@ import org.apache.gluten.vectorized.NativeColumnarToRowJniWrapper
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeRow}
import org.apache.spark.sql.execution.{BroadcastUtils, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -140,53 +140,49 @@ object VeloxColumnarToRowExec {

if (batch.numRows == 0) {
batch.close()
Iterator.empty
} else if (
batch.numCols() > 0 &&
!ColumnarBatches.isLightBatch(batch)
) {
// Fallback to ColumnarToRow of vanilla Spark.
val localOutput = output
val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
batch.rowIterator().asScala.map(toUnsafe)
} else if (output.isEmpty) {
return Iterator.empty
}

if (output.isEmpty) {
numInputBatches += 1
numOutputRows += batch.numRows()
val rows = ColumnarBatches.emptyRowIterator(batch.numRows()).asScala
batch.close()
rows
} else {
val cols = batch.numCols()
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
var info =
jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)

convertTime += (System.currentTimeMillis() - beforeConvert)

new Iterator[InternalRow] {
var rowId = 0
var baseLength = 0
val row = new UnsafeRow(cols)

override def hasNext: Boolean = {
rowId < rows
}
return rows
}

VeloxColumnarBatches.checkVeloxBatch(batch)

val cols = batch.numCols()
val rows = batch.numRows()
val beforeConvert = System.currentTimeMillis()
val batchHandle = ColumnarBatches.getNativeHandle(batch)
var info =
jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, 0)

convertTime += (System.currentTimeMillis() - beforeConvert)

new Iterator[InternalRow] {
var rowId = 0
var baseLength = 0
val row = new UnsafeRow(cols)

override def hasNext: Boolean = {
rowId < rows
}

override def next: UnsafeRow = {
if (rowId == baseLength + info.lengths.length) {
baseLength += info.lengths.length
val before = System.currentTimeMillis()
info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, rowId)
convertTime += (System.currentTimeMillis() - before)
}
val (offset, length) =
(info.offsets(rowId - baseLength), info.lengths(rowId - baseLength))
row.pointTo(null, info.memoryAddress + offset, length)
rowId += 1
row
override def next: UnsafeRow = {
if (rowId == baseLength + info.lengths.length) {
baseLength += info.lengths.length
val before = System.currentTimeMillis()
info = jniWrapper.nativeColumnarToRowConvert(c2rId, batchHandle, rowId)
convertTime += (System.currentTimeMillis() - before)
}
val (offset, length) =
(info.offsets(rowId - baseLength), info.lengths(rowId - baseLength))
row.pointTo(null, info.memoryAddress + offset, length)
rowId += 1
row
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ object EmptySchemaWorkaround {
case p =>
if (fallbackOnEmptySchema(p)) {
if (p.children.exists(_.output.isEmpty)) {
// Some backends are not eligible to offload plan with zero-column input.
// If any child have empty output, mark the plan and that child as UNSUPPORTED.
// Some backends are not capable to offload plan with zero-column input.
// If any child has empty output, mark the plan and that child as UNSUPPORTED.
FallbackTags.add(p, "at least one of its children has empty output")
p.children.foreach {
child =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.utils.SparkArrowUtil

import com.google.common.base.Preconditions
import org.apache.arrow.c.ArrowSchema
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.TaskAttemptContext
Expand Down Expand Up @@ -74,7 +73,7 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
new OutputWriter {
override def write(row: InternalRow): Unit = {
val batch = row.asInstanceOf[FakeRow].batch
Preconditions.checkState(ColumnarBatches.isLightBatch(batch))
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.retain(batch)
val batchHandle = {
if (batch.numCols == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,18 @@ class MiscOperatorSuite extends VeloxWholeStageTransformerSuite with AdaptiveSpa
}
}

test("count(1) on csv scan") {
val df = runAndCompare("select count(1) from student") {
val filePath = rootPath + "/datasource/csv/student.csv"
val df = spark.read
.format("csv")
.option("header", "true")
.load(filePath)
df.createOrReplaceTempView("student")
Comment on lines +927 to +932
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The table creation code is duplicated in different cases. Refactor required.

}
checkLengthAndPlan(df, 1)
}

test("combine small batches before shuffle") {
val minBatchSize = 15
withSQLConf(
Expand Down
4 changes: 2 additions & 2 deletions cpp/velox/compute/VeloxRuntime.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class VeloxRuntime final : public Runtime {
return iter->getMetrics(exportNanos);
}

std::shared_ptr<VeloxDataSource> createDataSource(const std::string& filePath, std::shared_ptr<arrow::Schema> schema);

std::shared_ptr<ShuffleReader> createShuffleReader(
std::shared_ptr<arrow::Schema> schema,
ShuffleReaderOptions options) override;
Expand All @@ -78,6 +76,8 @@ class VeloxRuntime final : public Runtime {

void dumpConf(const std::string& path) override;

std::shared_ptr<VeloxDataSource> createDataSource(const std::string& filePath, std::shared_ptr<arrow::Schema> schema);

std::shared_ptr<const facebook::velox::core::PlanNode> getVeloxPlan() {
return veloxPlan_;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ private static BatchType identifyBatchType(ColumnarBatch batch) {
}

/** Heavy batch: Data is readable from JVM and formatted as Arrow data. */
public static boolean isHeavyBatch(ColumnarBatch batch) {
static boolean isHeavyBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.HEAVY;
}

/**
* Light batch: Data is not readable from JVM, a long int handle (which is a pointer usually) is
* used to bind the batch to a native side implementation.
*/
public static boolean isLightBatch(ColumnarBatch batch) {
static boolean isLightBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.LIGHT;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,5 +122,3 @@ case class GlutenColumnarRule(
}

}

object ColumnarOverrides {}
Loading