-
Notifications
You must be signed in to change notification settings - Fork 252
Fix: Fix null handling in CometVector implementations #2643
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
9f8d823
2b5efc1
b06fb5b
af75384
910501a
c12b5b7
3d1b288
557293b
aa7287b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,11 +16,10 @@ | |
| // under the License. | ||
|
|
||
| use arrow::array::{make_array, Array, ArrayRef, GenericListArray, Int32Array, OffsetSizeTrait}; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use arrow::datatypes::{DataType, Schema}; | ||
| use arrow::{ | ||
| array::{as_primitive_array, Capacities, MutableArrayData}, | ||
| buffer::{NullBuffer, OffsetBuffer}, | ||
| datatypes::ArrowNativeType, | ||
| record_batch::RecordBatch, | ||
| }; | ||
| use datafusion::common::{ | ||
|
|
@@ -198,114 +197,124 @@ fn array_insert<O: OffsetSizeTrait>( | |
| pos_array: &ArrayRef, | ||
| legacy_mode: bool, | ||
| ) -> DataFusionResult<ColumnarValue> { | ||
| // The code is based on the implementation of the array_append from the Apache DataFusion | ||
| // https://github.com/apache/datafusion/blob/main/datafusion/functions-nested/src/concat.rs#L513 | ||
| // | ||
| // This code is also based on the implementation of the array_insert from the Apache Spark | ||
| // https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala#L4713 | ||
| // Implementation aligned with Arrow's half-open offset ranges and Spark semantics. | ||
|
|
||
| let values = list_array.values(); | ||
| let offsets = list_array.offsets(); | ||
| let values_data = values.to_data(); | ||
| let item_data = items_array.to_data(); | ||
|
|
||
| // Estimate capacity (original values + inserted items upper bound) | ||
| let new_capacity = Capacities::Array(values_data.len() + item_data.len()); | ||
|
|
||
| let mut mutable_values = | ||
| MutableArrayData::with_capacities(vec![&values_data, &item_data], true, new_capacity); | ||
|
|
||
| let mut new_offsets = vec![O::usize_as(0)]; | ||
| let mut new_nulls = Vec::<bool>::with_capacity(list_array.len()); | ||
| // New offsets and top-level list validity bitmap | ||
| let mut new_offsets = Vec::with_capacity(list_array.len() + 1); | ||
| new_offsets.push(O::usize_as(0)); | ||
| let mut list_valid = Vec::<bool>::with_capacity(list_array.len()); | ||
|
|
||
| let pos_data: &Int32Array = as_primitive_array(&pos_array); // Spark supports only i32 for positions | ||
| // Spark supports only Int32 position indices | ||
| let pos_data: &Int32Array = as_primitive_array(&pos_array); | ||
|
|
||
| for (row_index, offset_window) in offsets.windows(2).enumerate() { | ||
| let pos = pos_data.values()[row_index]; | ||
| let start = offset_window[0].as_usize(); | ||
| let end = offset_window[1].as_usize(); | ||
| let is_item_null = items_array.is_null(row_index); | ||
| for (row_index, window) in offsets.windows(2).enumerate() { | ||
| let start = window[0].as_usize(); | ||
| let end = window[1].as_usize(); | ||
| let len = end - start; | ||
| let pos = pos_data.value(row_index); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should |
||
|
|
||
| if list_array.is_null(row_index) { | ||
| // In Spark if value of the array is NULL than nothing happens | ||
| mutable_values.extend_nulls(1); | ||
| new_offsets.push(new_offsets[row_index] + O::one()); | ||
| new_nulls.push(false); | ||
| // Top-level list row is NULL: do not write any child values and do not advance offset | ||
| new_offsets.push(new_offsets[row_index]); | ||
| list_valid.push(false); | ||
| continue; | ||
| } | ||
|
|
||
| if pos == 0 { | ||
| return Err(DataFusionError::Internal( | ||
| "Position for array_insert should be greter or less than zero".to_string(), | ||
| "Position for array_insert should be greater or less than zero".to_string(), | ||
| )); | ||
| } | ||
|
|
||
| if (pos > 0) || ((-pos).as_usize() < (end - start + 1)) { | ||
| let corrected_pos = if pos > 0 { | ||
| (pos - 1).as_usize() | ||
| } else { | ||
| end - start - (-pos).as_usize() + if legacy_mode { 0 } else { 1 } | ||
| }; | ||
| let new_array_len = std::cmp::max(end - start + 1, corrected_pos); | ||
| if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { | ||
| return Err(DataFusionError::Internal(format!( | ||
| "Max array length in Spark is {MAX_ROUNDED_ARRAY_LENGTH:?}, but got {new_array_len:?}" | ||
| ))); | ||
| } | ||
| let final_len: usize; | ||
|
|
||
| if (start + corrected_pos) <= end { | ||
| mutable_values.extend(0, start, start + corrected_pos); | ||
| if pos > 0 { | ||
| // Positive index (1-based) | ||
| let pos1 = pos as usize; | ||
| if pos1 <= len + 1 { | ||
| // In-range insertion (including appending to end) | ||
| let corrected = pos1 - 1; // 0-based insertion point | ||
| mutable_values.extend(0, start, start + corrected); | ||
| mutable_values.extend(1, row_index, row_index + 1); | ||
| mutable_values.extend(0, start + corrected_pos, end); | ||
| new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len)); | ||
| mutable_values.extend(0, start + corrected, end); | ||
| final_len = len + 1; | ||
| } else { | ||
| // Beyond end: pad with nulls then insert | ||
| let corrected = pos1 - 1; | ||
| let padding = corrected - len; | ||
| mutable_values.extend(0, start, end); | ||
| mutable_values.extend_nulls(new_array_len - (end - start)); | ||
| mutable_values.extend_nulls(padding); | ||
| mutable_values.extend(1, row_index, row_index + 1); | ||
| // In that case spark actualy makes array longer than expected; | ||
| // For example, if pos is equal to 5, len is eq to 3, than resulted len will be 5 | ||
| new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len) + O::one()); | ||
| final_len = corrected + 1; // equals pos1 | ||
| } | ||
| } else { | ||
| // This comment is takes from the Apache Spark source code as is: | ||
| // special case- if the new position is negative but larger than the current array size | ||
| // place the new item at start of array, place the current array contents at the end | ||
| // and fill the newly created array elements inbetween with a null | ||
| let base_offset = if legacy_mode { 1 } else { 0 }; | ||
| let new_array_len = (-pos + base_offset).as_usize(); | ||
| if new_array_len > MAX_ROUNDED_ARRAY_LENGTH { | ||
| return Err(DataFusionError::Internal(format!( | ||
| "Max array length in Spark is {MAX_ROUNDED_ARRAY_LENGTH:?}, but got {new_array_len:?}" | ||
| ))); | ||
| } | ||
| mutable_values.extend(1, row_index, row_index + 1); | ||
| mutable_values.extend_nulls(new_array_len - (end - start + 1)); | ||
| mutable_values.extend(0, start, end); | ||
| new_offsets.push(new_offsets[row_index] + O::usize_as(new_array_len)); | ||
| } | ||
| if is_item_null { | ||
| if (start == end) || (values.is_null(row_index)) { | ||
| new_nulls.push(false) | ||
| // Negative index (1-based from the end) | ||
| let k = (-pos) as usize; | ||
|
|
||
| if k <= len { | ||
| // In-range negative insertion | ||
| // Non-legacy: -1 behaves like append to end (corrected = len - k + 1) | ||
| // Legacy: -1 behaves like insert before the last element (corrected = len - k) | ||
| let base_offset = if legacy_mode { 0 } else { 1 }; | ||
| let corrected = len - k + base_offset; | ||
| mutable_values.extend(0, start, start + corrected); | ||
| mutable_values.extend(1, row_index, row_index + 1); | ||
| mutable_values.extend(0, start + corrected, end); | ||
| final_len = len + 1; | ||
| } else { | ||
| new_nulls.push(true) | ||
| // Negative index beyond the start (Spark-specific behavior): | ||
| // Place item first, then pad with nulls, then append the original array. | ||
| // Final length = k + base_offset, where base_offset = 1 in legacy mode, otherwise 0. | ||
| let base_offset = if legacy_mode { 1 } else { 0 }; | ||
| let target_len = k + base_offset; | ||
| let padding = target_len.saturating_sub(len + 1); | ||
| mutable_values.extend(1, row_index, row_index + 1); // insert item first | ||
| mutable_values.extend_nulls(padding); // pad nulls | ||
| mutable_values.extend(0, start, end); // append original values | ||
| final_len = target_len; | ||
| } | ||
| } else { | ||
| new_nulls.push(true) | ||
| } | ||
|
|
||
| if final_len > MAX_ROUNDED_ARRAY_LENGTH { | ||
| return Err(DataFusionError::Internal(format!( | ||
| "Max array length in Spark is {MAX_ROUNDED_ARRAY_LENGTH}, but got {final_len}" | ||
| ))); | ||
| } | ||
|
|
||
| let prev = new_offsets[row_index].as_usize(); | ||
| new_offsets.push(O::usize_as(prev + final_len)); | ||
| list_valid.push(true); | ||
| } | ||
|
|
||
| let data = make_array(mutable_values.freeze()); | ||
| let data_type = match list_array.data_type() { | ||
| DataType::List(field) => field.data_type(), | ||
| DataType::LargeList(field) => field.data_type(), | ||
| let child = make_array(mutable_values.freeze()); | ||
|
|
||
| // Reuse the original list element field (name/type/nullability) | ||
| let elem_field = match list_array.data_type() { | ||
| DataType::List(field) => Arc::clone(field), | ||
| DataType::LargeList(field) => Arc::clone(field), | ||
| _ => unreachable!(), | ||
| }; | ||
| let new_array = GenericListArray::<O>::try_new( | ||
| Arc::new(Field::new("item", data_type.clone(), true)), | ||
|
|
||
| // Build the resulting list array | ||
| let new_list = GenericListArray::<O>::try_new( | ||
| elem_field, | ||
| OffsetBuffer::new(new_offsets.into()), | ||
| data, | ||
| Some(NullBuffer::new(new_nulls.into())), | ||
| child, | ||
| Some(NullBuffer::new(list_valid.into())), | ||
| )?; | ||
|
|
||
| Ok(ColumnarValue::Array(Arc::new(new_array))) | ||
| Ok(ColumnarValue::Array(Arc::new(new_list))) | ||
| } | ||
|
|
||
| impl Display for ArrayInsert { | ||
|
|
@@ -442,4 +451,37 @@ mod test { | |
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_array_insert_bug_repro_null_item_pos1_fixed() -> Result<()> { | ||
| use arrow::array::{Array, ArrayRef, Int32Array, ListArray}; | ||
| use arrow::datatypes::Int32Type; | ||
|
|
||
| // row0 = [0, null, 0] | ||
| // row1 = [1, null, 1] | ||
| let list = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ | ||
| Some(vec![Some(0), None, Some(0)]), | ||
| Some(vec![Some(1), None, Some(1)]), | ||
| ]); | ||
|
|
||
| let positions = Int32Array::from(vec![1, 1]); | ||
| let items = Int32Array::from(vec![None, None]); | ||
|
|
||
| let ColumnarValue::Array(result) = array_insert( | ||
| &list, | ||
| &(Arc::new(items) as ArrayRef), | ||
| &(Arc::new(positions) as ArrayRef), | ||
| false, // legacy_mode = false | ||
| )? | ||
| else { | ||
| unreachable!() | ||
| }; | ||
|
|
||
| let expected = ListArray::from_iter_primitive::<Int32Type, _, _>(vec![ | ||
| Some(vec![None, Some(0), None, Some(0)]), | ||
| Some(vec![None, Some(1), None, Some(1)]), | ||
| ]); | ||
| assert_eq!(&result.to_data(), &expected.to_data()); | ||
| Ok(()) | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.Path | |
| import org.apache.spark.sql.CometTestBase | ||
| import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper | ||
| import org.apache.spark.sql.functions._ | ||
| import org.apache.spark.sql.types.ArrayType | ||
|
|
||
| import org.apache.comet.CometSparkSessionExtensions.{isSpark35Plus, isSpark40Plus} | ||
| import org.apache.comet.DataTypeSupport.isComplexType | ||
|
|
@@ -777,4 +778,28 @@ class CometArrayExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelp | |
| } | ||
| } | ||
| } | ||
|
|
||
| test("array_reverse 2") { | ||
andygrove marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // This test validates data correctness for array<binary> columns with nullable elements. | ||
| // See https://github.com/apache/datafusion-comet/issues/2612 | ||
| withTempDir { dir => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remind me again why this is a good test for the changes in this PR?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It’s exercising a long‑standing null-handling issue in Comet. Minimal reproducible snippet: sql("select cast(array(null) as array<binary>) as c1").write
.mode("overwrite")
.save("/tmp/parquet/t1")
sql("select c1, reverse(c1) from parquet.`/tmp/parquet/t1`").showWhy this happens:
Note the scan is CometNativeScan. The bug is the mismatch between Comet’s columnar getters and Spark’s expectations when nulls are present. Relevant generated code (codegenStageId=1) highlights:
/* 047 */ for (int project_i_0 = 0; project_i_0 < project_numElements_1; project_i_0++) {
/* 048 */ int project_j_0 = project_numElements_1 - project_i_0 - 1;
/* 049 */ project_arrayData_0.update(project_i_0, project_expr_0_0.getBinary(project_j_0));
/* 050 */ }
/* 051 */ project_value_2 = project_arrayData_0;Observation 1: When constructing the reversed array, Spark’s code directly calls getBinary(j) and does not check element nullability at this point. It relies on getBinary(j) returning null for null elements.
/* 099 */ for (int project_index_2 = 0; project_index_2 < project_numElements_3; project_index_2++) {
/* 100 */ if (project_tmpInput_2.isNullAt(project_index_2)) {
/* 101 */ columnartorow_mutableStateArray_4[3].setNull8Bytes(project_index_2);
/* 102 */ } else {
/* 103 */ columnartorow_mutableStateArray_4[3].write(project_index_2, project_tmpInput_2.getBinary(project_index_2));
/* 104 */ }Observation 2: Spark uses isNullAt to mark nulls, and only calls getBinary(i) for non-null elements. Therefore, Comet must return null from getBinary(i) when the element is null; returning an empty byte array leads to [[]] instead of [NULL]. This PR makes Comet’s ColumnVector getters (getBinary, getUTF8String, getArray, getMap, getDecimal) return null when isNullAt(i) is true to fix this bug
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. FYI, the full generated code/* 001 */ public Object generate(Object[] references) {
/* 002 */ return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */ private Object[] references;
/* 008 */ private scala.collection.Iterator[] inputs;
/* 009 */ private int columnartorow_batchIdx_0;
/* 010 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] columnartorow_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[1];
/* 011 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] columnartorow_mutableStateArray_3 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[3];
/* 012 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] columnartorow_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1];
/* 013 */ private scala.collection.Iterator[] columnartorow_mutableStateArray_0 = new scala.collection.Iterator[1];
/* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[] columnartorow_mutableStateArray_4 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter[4];
/* 015 */
/* 016 */ public GeneratedIteratorForCodegenStage1(Object[] references) {
/* 017 */ this.references = references;
/* 018 */ }
/* 019 */
/* 020 */ public void init(int index, scala.collection.Iterator[] inputs) {
/* 021 */ partitionIndex = index;
/* 022 */ this.inputs = inputs;
/* 023 */ columnartorow_mutableStateArray_0[0] = inputs[0];
/* 024 */
/* 025 */ columnartorow_mutableStateArray_3[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 026 */ columnartorow_mutableStateArray_4[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[0], 8);
/* 027 */ columnartorow_mutableStateArray_3[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1, 32);
/* 028 */ columnartorow_mutableStateArray_4[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[1], 8);
/* 029 */ columnartorow_mutableStateArray_3[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(2, 64);
/* 030 */ columnartorow_mutableStateArray_4[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[2], 8);
/* 031 */ columnartorow_mutableStateArray_4[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter(columnartorow_mutableStateArray_3[2], 8);
/* 032 */
/* 033 */ }
/* 034 */
/* 035 */ private void project_doConsume_0(ArrayData project_expr_0_0, boolean project_exprIsNull_0_0) throws java.io.IOException {
/* 036 */ // common sub-expressions
/* 037 */
/* 038 */ boolean project_isNull_2 = project_exprIsNull_0_0;
/* 039 */ ArrayData project_value_2 = null;
/* 040 */
/* 041 */ if (!project_exprIsNull_0_0) {
/* 042 */ final int project_numElements_1 = project_expr_0_0.numElements();
/* 043 */
/* 044 */ ArrayData project_arrayData_0 = ArrayData.allocateArrayData(
/* 045 */ -1, project_numElements_1, " reverse failed.");
/* 046 */
/* 047 */ for (int project_i_0 = 0; project_i_0 < project_numElements_1; project_i_0++) {
/* 048 */ int project_j_0 = project_numElements_1 - project_i_0 - 1;
/* 049 */ project_arrayData_0.update(project_i_0, project_expr_0_0.getBinary(project_j_0));
/* 050 */ }
/* 051 */ project_value_2 = project_arrayData_0;
/* 052 */
/* 053 */ }
/* 054 */ columnartorow_mutableStateArray_3[2].reset();
/* 055 */
/* 056 */ columnartorow_mutableStateArray_3[2].zeroOutNullBytes();
/* 057 */
/* 058 */ if (project_exprIsNull_0_0) {
/* 059 */ columnartorow_mutableStateArray_3[2].setNullAt(0);
/* 060 */ } else {
/* 061 */ // Remember the current cursor so that we can calculate how many bytes are
/* 062 */ // written later.
/* 063 */ final int project_previousCursor_1 = columnartorow_mutableStateArray_3[2].cursor();
/* 064 */
/* 065 */ final ArrayData project_tmpInput_1 = project_expr_0_0;
/* 066 */ if (project_tmpInput_1 instanceof UnsafeArrayData) {
/* 067 */ columnartorow_mutableStateArray_3[2].write((UnsafeArrayData) project_tmpInput_1);
/* 068 */ } else {
/* 069 */ final int project_numElements_2 = project_tmpInput_1.numElements();
/* 070 */ columnartorow_mutableStateArray_4[2].initialize(project_numElements_2);
/* 071 */
/* 072 */ for (int project_index_1 = 0; project_index_1 < project_numElements_2; project_index_1++) {
/* 073 */ if (project_tmpInput_1.isNullAt(project_index_1)) {
/* 074 */ columnartorow_mutableStateArray_4[2].setNull8Bytes(project_index_1);
/* 075 */ } else {
/* 076 */ columnartorow_mutableStateArray_4[2].write(project_index_1, project_tmpInput_1.getBinary(project_index_1));
/* 077 */ }
/* 078 */
/* 079 */ }
/* 080 */ }
/* 081 */
/* 082 */ columnartorow_mutableStateArray_3[2].setOffsetAndSizeFromPreviousCursor(0, project_previousCursor_1);
/* 083 */ }
/* 084 */
/* 085 */ if (project_isNull_2) {
/* 086 */ columnartorow_mutableStateArray_3[2].setNullAt(1);
/* 087 */ } else {
/* 088 */ // Remember the current cursor so that we can calculate how many bytes are
/* 089 */ // written later.
/* 090 */ final int project_previousCursor_2 = columnartorow_mutableStateArray_3[2].cursor();
/* 091 */
/* 092 */ final ArrayData project_tmpInput_2 = project_value_2;
/* 093 */ if (project_tmpInput_2 instanceof UnsafeArrayData) {
/* 094 */ columnartorow_mutableStateArray_3[2].write((UnsafeArrayData) project_tmpInput_2);
/* 095 */ } else {
/* 096 */ final int project_numElements_3 = project_tmpInput_2.numElements();
/* 097 */ columnartorow_mutableStateArray_4[3].initialize(project_numElements_3);
/* 098 */
/* 099 */ for (int project_index_2 = 0; project_index_2 < project_numElements_3; project_index_2++) {
/* 100 */ if (project_tmpInput_2.isNullAt(project_index_2)) {
/* 101 */ columnartorow_mutableStateArray_4[3].setNull8Bytes(project_index_2);
/* 102 */ } else {
/* 103 */ columnartorow_mutableStateArray_4[3].write(project_index_2, project_tmpInput_2.getBinary(project_index_2));
/* 104 */ }
/* 105 */
/* 106 */ }
/* 107 */ }
/* 108 */
/* 109 */ columnartorow_mutableStateArray_3[2].setOffsetAndSizeFromPreviousCursor(1, project_previousCursor_2);
/* 110 */ }
/* 111 */ append((columnartorow_mutableStateArray_3[2].getRow()));
/* 112 */
/* 113 */ }
/* 114 */
/* 115 */ private void columnartorow_nextBatch_0() throws java.io.IOException {
/* 116 */ if (columnartorow_mutableStateArray_0[0].hasNext()) {
/* 117 */ columnartorow_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)columnartorow_mutableStateArray_0[0].next();
/* 118 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* numInputBatches */).add(1);
/* 119 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(columnartorow_mutableStateArray_1[0].numRows());
/* 120 */ columnartorow_batchIdx_0 = 0;
/* 121 */ columnartorow_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) columnartorow_mutableStateArray_1[0].column(0);
/* 122 */
/* 123 */ }
/* 124 */ }
/* 125 */
/* 126 */ protected void processNext() throws java.io.IOException {
/* 127 */ if (columnartorow_mutableStateArray_1[0] == null) {
/* 128 */ columnartorow_nextBatch_0();
/* 129 */ }
/* 130 */ while ( columnartorow_mutableStateArray_1[0] != null) {
/* 131 */ int columnartorow_numRows_0 = columnartorow_mutableStateArray_1[0].numRows();
/* 132 */ int columnartorow_localEnd_0 = columnartorow_numRows_0 - columnartorow_batchIdx_0;
/* 133 */ for (int columnartorow_localIdx_0 = 0; columnartorow_localIdx_0 < columnartorow_localEnd_0; columnartorow_localIdx_0++) {
/* 134 */ int columnartorow_rowIdx_0 = columnartorow_batchIdx_0 + columnartorow_localIdx_0;
/* 135 */ boolean columnartorow_isNull_0 = columnartorow_mutableStateArray_2[0].isNullAt(columnartorow_rowIdx_0);
/* 136 */ ArrayData columnartorow_value_0 = columnartorow_isNull_0 ? null : (columnartorow_mutableStateArray_2[0].getArray(columnartorow_rowIdx_0));
/* 137 */
/* 138 */ project_doConsume_0(columnartorow_value_0, columnartorow_isNull_0);
/* 139 */ if (shouldStop()) { columnartorow_batchIdx_0 = columnartorow_rowIdx_0 + 1; return; }
/* 140 */ }
/* 141 */ columnartorow_batchIdx_0 = columnartorow_numRows_0;
/* 142 */ columnartorow_mutableStateArray_1[0] = null;
/* 143 */ columnartorow_nextBatch_0();
/* 144 */ }
/* 145 */ }
/* 146 */
/* 147 */ } |
||
| val path = new Path(dir.toURI.toString, "test.parquet") | ||
| val filename = path.toString | ||
| val random = new Random(42) | ||
| withSQLConf(CometConf.COMET_ENABLED.key -> "false") { | ||
| val schemaOptions = | ||
| SchemaGenOptions(generateArray = true, generateStruct = false, generateMap = false) | ||
| val dataOptions = DataGenOptions(allowNull = true, generateNegativeZero = false) | ||
| ParquetGenerator.makeParquetFile(random, spark, filename, 100, schemaOptions, dataOptions) | ||
| } | ||
| withTempView("t1") { | ||
| val table = spark.read.parquet(filename) | ||
| table.createOrReplaceTempView("t1") | ||
| for (field <- table.schema.fields.filter(_.dataType.isInstanceOf[ArrayType])) { | ||
| val sql = s"SELECT ${field.name}, reverse(${field.name}) FROM t1 ORDER BY ${field.name}" | ||
| checkSparkAnswer(sql) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The version fixed by ChatGPT :)