-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-27650][SQL] separate the row iterator functionality from ColumnarBatch #24546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.spark.sql.execution.vectorized; | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow; | ||
import org.apache.spark.sql.vectorized.ColumnarBatch; | ||
|
||
import java.util.Iterator; | ||
import java.util.NoSuchElementException; | ||
|
||
/** | ||
* This class provides a row view of a {@link ColumnarBatch}, so that Spark can access the data | ||
* row by row | ||
*/ | ||
public final class ColumnarBatchRowView { | ||
|
||
private final ColumnarBatch batch; | ||
|
||
// Staging row returned from `getRow`. | ||
private final MutableColumnarRow row; | ||
|
||
public ColumnarBatchRowView(ColumnarBatch batch) { | ||
this.batch = batch; | ||
this.row = new MutableColumnarRow(batch.columns()); | ||
} | ||
|
||
/** | ||
* Returns an iterator over the rows in this batch. | ||
*/ | ||
public Iterator<InternalRow> rowIterator() { | ||
final int maxRows = batch.numRows(); | ||
final MutableColumnarRow row = new MutableColumnarRow(batch.columns()); | ||
return new Iterator<InternalRow>() { | ||
int rowId = 0; | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return rowId < maxRows; | ||
} | ||
|
||
@Override | ||
public InternalRow next() { | ||
if (rowId >= maxRows) { | ||
throw new NoSuchElementException(); | ||
} | ||
row.rowId = rowId++; | ||
return row; | ||
} | ||
|
||
@Override | ||
public void remove() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Returns the row in this batch at `rowId`. Returned row is reused across calls. | ||
*/ | ||
public InternalRow getRow(int rowId) { | ||
assert(rowId >= 0 && rowId < batch.numRows()); | ||
row.rowId = rowId; | ||
return row; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,25 +16,17 @@ | |
*/ | ||
package org.apache.spark.sql.vectorized; | ||
|
||
import java.util.*; | ||
|
||
import org.apache.spark.annotation.Evolving; | ||
import org.apache.spark.sql.catalyst.InternalRow; | ||
import org.apache.spark.sql.execution.vectorized.MutableColumnarRow; | ||
|
||
/** | ||
* This class wraps multiple ColumnVectors as a row-wise table. It provides a row view of this | ||
* batch so that Spark can access the data row by row. Instance of it is meant to be reused during | ||
* the entire data loading process. | ||
* This class wraps multiple {@link ColumnVector}s as a table-like data batch. Instance of it is | ||
* meant to be reused during the entire data loading process. | ||
*/ | ||
@Evolving | ||
public final class ColumnarBatch { | ||
private int numRows; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it still proper to carry this info here now? Row-wise access isn't at There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Spark needs to know the row count to read the columnar data. |
||
private final ColumnVector[] columns; | ||
|
||
// Staging row returned from `getRow`. | ||
private final MutableColumnarRow row; | ||
|
||
/** | ||
* Called to close all the columns in this batch. It is not valid to access the data after | ||
* calling this. This must be called at the end to clean up memory allocations. | ||
|
@@ -45,36 +37,6 @@ public void close() { | |
} | ||
} | ||
|
||
/** | ||
* Returns an iterator over the rows in this batch. | ||
*/ | ||
public Iterator<InternalRow> rowIterator() { | ||
final int maxRows = numRows; | ||
final MutableColumnarRow row = new MutableColumnarRow(columns); | ||
return new Iterator<InternalRow>() { | ||
int rowId = 0; | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return rowId < maxRows; | ||
} | ||
|
||
@Override | ||
public InternalRow next() { | ||
if (rowId >= maxRows) { | ||
throw new NoSuchElementException(); | ||
} | ||
row.rowId = rowId++; | ||
return row; | ||
} | ||
|
||
@Override | ||
public void remove() { | ||
throw new UnsupportedOperationException(); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Sets the number of rows in this batch. | ||
*/ | ||
|
@@ -98,16 +60,13 @@ public void setNumRows(int numRows) { | |
public ColumnVector column(int ordinal) { return columns[ordinal]; } | ||
|
||
/** | ||
* Returns the row in this batch at `rowId`. Returned row is reused across calls. | ||
* Returns all the columns of this batch. | ||
*/ | ||
public InternalRow getRow(int rowId) { | ||
assert(rowId >= 0 && rowId < numRows); | ||
row.rowId = rowId; | ||
return row; | ||
public ColumnVector[] columns() { | ||
return columns; | ||
} | ||
|
||
public ColumnarBatch(ColumnVector[] columns) { | ||
this.columns = columns; | ||
this.row = new MutableColumnarRow(columns); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to create another constructor not to allocate
MutableColumnarRow
as an optimization?This is because most of the use cases are to immediately call
rowIterator()
that obviously never callsgetRow()
.