Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.rowSet;

import org.apache.drill.exec.record.TupleMetadata;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.vector.BaseValueVector;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;

/**
* Builds a result set (series of zero or more row sets) based on a defined
* schema which may
* evolve (expand) over time. Automatically rolls "overflow" rows over
* when a batch fills.
* <p>
* Many of the methods in this interface verify that the loader is
* in the proper state. For example, an exception is thrown if the caller
* attempts to save a row before starting a batch. However, the per-column
* write methods are checked only through assertions that should enabled
* during testing, but will be disabled during production.
*
* @see {@link VectorContainerWriter}, the class which this class
* replaces
*/

public interface ResultSetLoader {

public static final int DEFAULT_ROW_COUNT = BaseValueVector.INITIAL_VALUE_ALLOCATION;

/**
* Current schema version. The version increments by one each time
* a column is added.
* @return the current schema version
*/

int schemaVersion();

/**
* Adjust the number of rows to produce in the next batch. Takes
* affect after the next call to {@link #startBatch()}.
*
* @param count target batch row count
*/

void setTargetRowCount(int count);

/**
* The number of rows produced by this loader (as configured in the loader
* options.)
*
* @return the target row count for batches that this loader produces
*/

int targetRowCount();

/**
* The largest vector size produced by this loader (as specified by
* the value vector limit.)
*
* @return the largest vector size. Attempting to extend a vector beyond
* this limit causes automatic vector overflow and terminates the
* in-flight batch, even if the batch has not yet reached the target
* row count
*/

int targetVectorSize();

/**
* Total number of batches created. Includes the current batch if
* the row count in this batch is non-zero.
* @return the number of batches produced including the current
* one
*/

int batchCount();

/**
* Total number of rows loaded for all previous batches and the
* current batch.
* @return total row count
*/

int totalRowCount();

/**
* Start a new row batch. Valid only when first started, or after the
* previous batch has been harvested.
*/

void startBatch();

/**
* Writer for the top-level tuple (the entire row). Valid only when
* the mutator is actively writing a batch (after <tt>startBatch()</tt>
* but before </tt>harvest()</tt>.)
*
* @return writer for the top-level columns
*/

RowSetLoader writer();
boolean writeable();

/**
* Load a row using column values passed as variable-length arguments. Expects
* map values to represented as an array.
* A schema of (a:int, b:map(c:varchar)) would be>
* set as <br><tt>loadRow(10, new Object[] {"foo"});</tt><br>
* Values of arrays can be expressed as a Java
* array. A schema of (a:int, b:int[]) can be set as<br>
* <tt>loadRow(10, new int[] {100, 200});</tt><br>.
* Primarily for testing, too slow for production code.
* <p>
* If the row consists of a single map or list, then the one value will be an
* <tt>Object</tt> array, creating an ambiguity. Use <tt>writer().set(0, value);</tt>
* in this case.
*
* @param values column values in column index order
* @return this loader
*/

ResultSetLoader setRow(Object...values);

/**
* Return the output container, primarily to obtain the schema
* and set of vectors. Depending on when this is called, the
* data may or may not be populated: call
* {@link #harvest()} to obtain the container for a batch.
* <p>
* This method is useful when the schema is known and fixed.
* After declaring the schema, call this method to get the container
* that holds the vectors for use in planning projection, etc.
* <p>
* If the result set schema changes, then a call to this method will
* return the latest schema. But, if the schema changes during the
* overflow row, then this method will not see those changes until
* after harvesting the current batch. (This avoid the appearance
* of phantom columns in the output since the new column won't
* appear until the next batch.)
* <p>
* Never count on the data in the container; it may be empty, half
* written, or inconsistent. Always call
* {@link #harvest()} to obtain the container for a batch.
*
* @return the output container including schema and value
* vectors
*/

VectorContainer outputContainer();

/**
* Harvest the current row batch, and reset the mutator
* to the start of the next row batch (which may already contain
* an overflow row.
* <p>
* The schema of the returned container is defined as:
* <ul>
* <li>The schema as passed in via the loader options, plus</li>
* <li>Columns added dynamically during write, minus</li>
* <li>Any columns not included in the project list, minus</li>
* <li>Any columns added in the overflow row.</li>
* </ul>
* That is, column order is as defined by the initial schema and column
* additions. In particular, the schema order is <b>not</b> defined by
* the projection list. (Another mechanism is required to reorder columns
* for the actual projection.)
*
* @return the row batch to send downstream
*/

VectorContainer harvest();

/**
* The schema of the harvested batch. Valid until the start of the
* next batch.
*
* @return the extended schema of the harvested batch which includes
* any allocation hints used when creating the batch
*/

TupleMetadata harvestSchema();

/**
* Called after all rows are returned, whether because no more data is
* available, or the caller wishes to cancel the current row batch
* and complete.
*/

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.vector.accessor.impl;
package org.apache.drill.exec.physical.rowSet;

import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.vector.ValueVector;

/**
* Abstract base class for column readers and writers that
* implements the mechanism for binding accessors to a row
* index. The row index is implicit: index a row, then
* column accessors pull out columns from that row.
* Interface for a cache that implements "vector persistence" across
* multiple result set loaders. Allows a single scan operator to offer
* the same set of vectors even when data is read by a set of readers.
*/

public abstract class AbstractColumnAccessor {

public interface RowIndex {
int batch();
int index();
}

protected RowIndex vectorIndex;

protected void bind(RowIndex rowIndex) {
this.vectorIndex = rowIndex;
}

public abstract void bind(RowIndex rowIndex, ValueVector vector);
public interface ResultVectorCache {
BufferAllocator allocator();
ValueVector addOrGet(MaterializedField colSchema);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.rowSet;

import org.apache.drill.exec.vector.accessor.TupleWriter;

/**
* Interface for writing values to a row set. Only available for newly-created
* single row sets.
* <p>
* Typical usage:
*
* <pre></code>
* void writeABatch() {
* RowSetLoader writer = ...
* while (! writer.isFull()) {
* writer.start();
* writer.scalar(0).setInt(10);
* writer.scalar(1).setString("foo");
* ...
* writer.save();
* }
* }</code></pre>
* Alternative usage:
*
* <pre></code>
* void writeABatch() {
* RowSetLoader writer = ...
* while (writer.start()) {
* writer.scalar(0).setInt(10);
* writer.scalar(1).setString("foo");
* ...
* writer.save();
* }
* }</code></pre>
*
* The above writes until the batch is full, based on size or vector overflow.
* That is, the details of vector overflow are hidden from the code that calls
* the writer.
*/

public interface RowSetLoader extends TupleWriter {

ResultSetLoader loader();

/**
* Write a row of values, given by Java objects. Object type must match
* expected column type. Stops writing, and returns false, if any value causes
* vector overflow. Value format:
* <ul>
* <li>For scalars, the value as a suitable Java type (int or Integer, say,
* for <tt>INTEGER</tt> values.)</li>
* <li>For scalar arrays, an array of a suitable Java primitive type for
* scalars. For example, <tt>int[]</tt> for an <tt>INTEGER</tt> column.</li>
* <li>For a Map, an <tt>Object<tt> array with values encoded as above.
* (In fact, the list here is the same as the map format.</li>
* <li>For a list (repeated map, list of list), an <tt>Object</tt> array with
* values encoded as above. (So, for a repeated map, an outer <tt>Object</tt>
* map encodes the array, an inner one encodes the map members.</li>
* </ul>
*
* @param values
* variable-length argument list of column values
*/

RowSetLoader addRow(Object... values);

/**
* Indicates that no more rows fit into the current row batch and that the row
* batch should be harvested and sent downstream. Any overflow row is
* automatically saved for the next cycle. The value is undefined when a batch
* is not active.
* <p>
* Will be false on the first row, and all subsequent rows until either the
* maximum number of rows are written, or a vector overflows. After that, will
* return true. The method returns false as soon as any column writer
* overflows even in the middle of a row write. That is, this writer does not
* automatically handle overflow rows because that added complexity is seldom
* needed for tests.
*
* @return true if another row can be written, false if not
*/

boolean isFull();

/**
* The number of rows in the current row set. Does not count any overflow row
* saved for the next batch.
*
* @return number of rows to be sent downstream
*/

int rowCount();

/**
* The index of the current row. Same as the row count except in an overflow
* row in which case the row index will revert to zero as soon as any vector
* overflows. Note: this means that the index can change between columns in a
* single row. Applications usually don't use this index directly; rely on the
* writers to write to the proper location.
*
* @return the current write index
*/

int rowIndex();

/**
* Prepare a new row for writing. Call this before each row.
* <p>
* Handles a very special case: that of discarding the last row written.
* A reader can read a row into vectors, then "sniff" the row to check,
* for example, against a filter. If the row is not wanted, simply omit
* the call to <tt>save()</tt> and the next all to <tt>start()</tt> will
* discard the unsaved row.
* <p>
* Note that the vectors still contain values in the
* discarded position; just the various pointers are unset. If
* the batch ends before the discarded values are overwritten, the
* discarded values just exist at the end of the vector. Since vectors
* start with garbage contents, the discarded values are simply a different
* kind of garbage. But, if the client writes a new row, then the new
* row overwrites the discarded row. This works because we only change
* the tail part of a vector; never the internals.
*
* @return true if another row can be added, false if the batch is full
*/

boolean start();

/**
* Saves the current row and moves to the next row. Failing to call this
* method effectively abandons the in-flight row; something that may be useful
* to recover from partially-written rows that turn out to contain errors.
* Done automatically if using <tt>setRow()</tt>.
*/

void save();
}
Loading