Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,40 @@ public interface ExecConstants {
String USE_DYNAMIC_UDFS_KEY = "exec.udf.use_dynamic";
BooleanValidator USE_DYNAMIC_UDFS = new BooleanValidator(USE_DYNAMIC_UDFS_KEY, true);


String QUERY_TRANSIENT_STATE_UPDATE_KEY = "exec.query.progress.update";
BooleanValidator QUERY_TRANSIENT_STATE_UPDATE = new BooleanValidator(QUERY_TRANSIENT_STATE_UPDATE_KEY, true);

String PERSISTENT_TABLE_UMASK = "exec.persistent_table.umask";
StringValidator PERSISTENT_TABLE_UMASK_VALIDATOR = new StringValidator(PERSISTENT_TABLE_UMASK, "002");

/**
* Enables batch iterator (operator) validation. Validation is normally enabled
* only when assertions are enabled. This option enables iterator validation even
* if assertions are not enabled. That is, it allows iterator validation even on
* a "production" Drill instance.
*/
String ENABLE_ITERATOR_VALIDATION_OPTION = "debug.validate_iterators";
BooleanValidator ENABLE_ITERATOR_VALIDATOR = new BooleanValidator(ENABLE_ITERATOR_VALIDATION_OPTION, false);

/**
* Boot-time config option to enable validation. Primarily used for tests.
* If true, overrrides the above. (That is validation is done if assertions are on,
* if the above session option is set to true, or if this config option is set to true.
*/

String ENABLE_ITERATOR_VALIDATION = "drill.exec.debug.validate_iterators";

/**
* When iterator validation is enabled, additionally validates the vectors in
* each batch passed to each iterator.
*/
String ENABLE_VECTOR_VALIDATION_OPTION = "debug.validate_vectors";
BooleanValidator ENABLE_VECTOR_VALIDATOR = new BooleanValidator(ENABLE_VECTOR_VALIDATION_OPTION, false);

/**
* Boot-time config option to enable vector validation. Primarily used for
* tests. Add the following to the command line to enable:<br>
* <tt>-ea -Ddrill.exec.debug.validate_vectors=true</tt>
*/
String ENABLE_VECTOR_VALIDATION = "drill.exec.debug.validate_vectors";
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -26,6 +26,7 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.FragmentRoot;
import org.apache.drill.exec.physical.base.PhysicalOperator;
Expand Down Expand Up @@ -69,9 +70,16 @@ public static RootExec getExec(FragmentContext context, FragmentRoot root) throw
Preconditions.checkNotNull(root);
Preconditions.checkNotNull(context);

if (AssertionUtil.isAssertionsEnabled()) {
// Enable iterator (operator) validation if assertions are enabled (debug mode)
// or if in production mode and the ENABLE_ITERATOR_VALIDATION option is set
// to true.

if (AssertionUtil.isAssertionsEnabled() ||
context.getOptionSet().getOption(ExecConstants.ENABLE_ITERATOR_VALIDATOR) ||
context.getConfig().getBoolean(ExecConstants.ENABLE_ITERATOR_VALIDATION)) {
root = IteratorValidatorInjector.rewritePlanWithIteratorValidator(context, root);
}

final ImplCreator creator = new ImplCreator();
Stopwatch watch = Stopwatch.createStarted();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
******************************************************************************/
package org.apache.drill.exec.physical.impl.validate;

import java.util.ArrayList;
import java.util.List;

import org.apache.drill.exec.record.SimpleVectorWrapper;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.FixedWidthVector;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.NullableVector;
import org.apache.drill.exec.vector.RepeatedVarCharVector;
import org.apache.drill.exec.vector.UInt4Vector;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.exec.vector.VariableWidthVector;
import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.complex.RepeatedFixedWidthVectorLike;


/**
* Validate a batch of value vectors. It is not possible to validate the
* data, but we can validate the structure, especially offset vectors.
* Only handles single (non-hyper) vectors at present. Current form is
* self-contained. Better checks can be done by moving checks inside
* vectors or by exposing more metadata from vectors.
*/

public class BatchValidator {
private static final org.slf4j.Logger logger =
org.slf4j.LoggerFactory.getLogger(BatchValidator.class);

public static final int MAX_ERRORS = 100;

private final int rowCount;
private final VectorAccessible batch;
private final List<String> errorList;
private int errorCount;

public BatchValidator(VectorAccessible batch) {
rowCount = batch.getRecordCount();
this.batch = batch;
errorList = null;
}

public BatchValidator(VectorAccessible batch, boolean captureErrors) {
rowCount = batch.getRecordCount();
this.batch = batch;
if (captureErrors) {
errorList = new ArrayList<>();
} else {
errorList = null;
}
}

public void validate() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a thought. Is there a way to enable these checks (and fail if invalid) for pre-commit tests as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea! Added a config option that forces vector validation. Add the following to the pom.xml file in the Surefire options:

{code}
-Ddrill.exec.debug.validate_vectors=true
{code}

Will try this out and enable the checks as a different JIRA ticket and PR.

if (batch.getRecordCount() == 0) {
return;
}
for (VectorWrapper<? extends ValueVector> w : batch) {
validateWrapper(w);
}
}

private void validateWrapper(VectorWrapper<? extends ValueVector> w) {
if (w instanceof SimpleVectorWrapper) {
validateVector(w.getValueVector());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned above that HyperVectorWrapper is not validated. Can you open a ticket for the functionality to-be-implemented in this validator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. See DRILL-5526.

}

private void validateVector(ValueVector vector) {
String name = vector.getField().getName();
if (vector instanceof NullableVector) {
validateNullableVector(name, (NullableVector) vector);
} else if (vector instanceof VariableWidthVector) {
validateVariableWidthVector(name, (VariableWidthVector) vector, rowCount);
} else if (vector instanceof FixedWidthVector) {
validateFixedWidthVector(name, (FixedWidthVector) vector);
} else if (vector instanceof BaseRepeatedValueVector) {
validateRepeatedVector(name, (BaseRepeatedValueVector) vector);
} else {
logger.debug("Don't know how to validate vector: " + name + " of class " + vector.getClass().getSimpleName());
}
}

private void validateVariableWidthVector(String name, VariableWidthVector vector, int entryCount) {

// Offsets are in the derived classes. Handle only VarChar for now.

if (vector instanceof VarCharVector) {
validateVarCharVector(name, (VarCharVector) vector, entryCount);
} else {
logger.debug("Don't know how to validate vector: " + name + " of class " + vector.getClass().getSimpleName());
}
}

private void validateVarCharVector(String name, VarCharVector vector, int entryCount) {
// int dataLength = vector.getAllocatedByteCount(); // Includes offsets and data.
int dataLength = vector.getBuffer().capacity();
validateOffsetVector(name + "-offsets", vector.getOffsetVector(), entryCount, dataLength);
}

private void validateRepeatedVector(String name, BaseRepeatedValueVector vector) {

int dataLength = Integer.MAX_VALUE;
if (vector instanceof RepeatedVarCharVector) {
dataLength = ((RepeatedVarCharVector) vector).getOffsetVector().getValueCapacity();
} else if (vector instanceof RepeatedFixedWidthVectorLike) {
dataLength = ((BaseDataValueVector) ((BaseRepeatedValueVector) vector).getDataVector()).getBuffer().capacity();
}
int itemCount = validateOffsetVector(name + "-offsets", vector.getOffsetVector(), rowCount, dataLength);

// Special handling of repeated VarChar vectors
// The nested data vectors are not quite exactly like top-level vectors.

@SuppressWarnings("resource")
ValueVector dataVector = vector.getDataVector();
if (dataVector instanceof VariableWidthVector) {
validateVariableWidthVector(name + "-data", (VariableWidthVector) dataVector, itemCount);
}
}

private int validateOffsetVector(String name, UInt4Vector offsetVector, int valueCount, int maxOffset) {
if (valueCount == 0) {
return 0;
}
UInt4Vector.Accessor accessor = offsetVector.getAccessor();

// First value must be zero in current version.

int prevOffset = accessor.get(0);
if (prevOffset != 0) {
error(name, offsetVector, "Offset (0) must be 0 but was " + prevOffset);
}

// Note <= comparison: offset vectors have (n+1) entries.

for (int i = 1; i <= valueCount; i++) {
int offset = accessor.get(i);
if (offset < prevOffset) {
error(name, offsetVector, "Decreasing offsets at (" + (i-1) + ", " + i + ") = (" + prevOffset + ", " + offset + ")");
} else if (offset > maxOffset) {
error(name, offsetVector, "Invalid offset at index " + i + " = " + offset + " exceeds maximum of " + maxOffset);
}
prevOffset = offset;
}
return prevOffset;
}

private void error(String name, ValueVector vector, String msg) {
if (errorCount == 0) {
logger.error("Found one or more vector errors from " + batch.getClass().getSimpleName());
}
errorCount++;
if (errorCount >= MAX_ERRORS) {
return;
}
String fullMsg = "Column " + name + " of type " + vector.getClass().getSimpleName( ) + ": " + msg;
logger.error(fullMsg);
if (errorList != null) {
errorList.add(fullMsg);
}
}

private void validateNullableVector(String name, NullableVector vector) {
// Can't validate at this time because the bits vector is in each
// generated subtype.

// Validate a VarChar vector because it is common.

if (vector instanceof NullableVarCharVector) {
@SuppressWarnings("resource")
VarCharVector values = ((NullableVarCharVector) vector).getValuesVector();
validateVarCharVector(name + "-values", values, rowCount);
}
}

private void validateFixedWidthVector(String name, FixedWidthVector vector) {
// TODO Auto-generated method stub

}

/**
* Obtain the list of errors. For use in unit-testing this class.
* @return the list of errors found, or null if error capture was
* not enabled
*/

public List<String> errors() { return errorList; }
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand Down Expand Up @@ -94,6 +94,11 @@ private enum ValidationState {
/** High-level IterOutcome sequence state. */
private ValidationState validationState = ValidationState.INITIAL_NO_SCHEMA;

/**
* Enable/disable per-batch vector validation. Enable only to debug vector
* corruption issues.
*/
private boolean validateBatches;

public IteratorValidatorBatchIterator(RecordBatch incoming) {
this.incoming = incoming;
Expand All @@ -103,6 +108,11 @@ public IteratorValidatorBatchIterator(RecordBatch incoming) {
logger.trace( "[#{}; on {}]: Being constructed.", instNum, batchTypeName);
}


public void enableBatchValidation(boolean option) {
validateBatches = option;
}

@Override
public String toString() {
return
Expand Down Expand Up @@ -224,6 +234,7 @@ public IterOutcome next() {
// above).
// OK_NEW_SCHEMA moves to have-seen-schema state.
validationState = ValidationState.HAVE_SCHEMA;
validateBatch();
break;
case OK:
// OK is allowed as long as OK_NEW_SCHEMA was seen, except if terminated
Expand All @@ -234,6 +245,7 @@ public IterOutcome next() {
"next() returned %s without first returning %s [#%d, %s]",
batchState, OK_NEW_SCHEMA, instNum, batchTypeName));
}
validateBatch();
// OK doesn't change high-level state.
break;
case NONE:
Expand Down Expand Up @@ -326,6 +338,12 @@ public IterOutcome next() {
}
}

private void validateBatch() {
if (validateBatches) {
new BatchValidator(incoming).validate();
}
}

@Override
public WritableBatch getWritableBatch() {
validateReadState("getWritableBatch()");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
Expand All @@ -20,6 +20,7 @@
import java.util.List;

import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.config.IteratorValidator;
import org.apache.drill.exec.physical.impl.BatchCreator;
Expand All @@ -35,6 +36,13 @@ public IteratorValidatorBatchIterator getBatch(FragmentContext context, Iterator
List<RecordBatch> children)
throws ExecutionSetupException {
Preconditions.checkArgument(children.size() == 1);
return new IteratorValidatorBatchIterator(children.iterator().next());
RecordBatch child = children.iterator().next();
IteratorValidatorBatchIterator iter = new IteratorValidatorBatchIterator(child);
boolean validateBatches = context.getOptionSet().getOption(ExecConstants.ENABLE_VECTOR_VALIDATOR) ||
context.getConfig().getBoolean(ExecConstants.ENABLE_VECTOR_VALIDATION);
iter.enableBatchValidation(validateBatches);
logger.trace("Iterator validation enabled for " + child.getClass().getSimpleName() +
(validateBatches ? " with vector validation" : ""));
return iter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public class SystemOptionManager extends BaseOptionManager implements OptionMana
ExecConstants.QUERY_PROFILE_DEBUG_VALIDATOR,
ExecConstants.USE_DYNAMIC_UDFS,
ExecConstants.QUERY_TRANSIENT_STATE_UPDATE,
ExecConstants.PERSISTENT_TABLE_UMASK_VALIDATOR
ExecConstants.PERSISTENT_TABLE_UMASK_VALIDATOR,
ExecConstants.ENABLE_ITERATOR_VALIDATOR,
ExecConstants.ENABLE_VECTOR_VALIDATOR
};
final Map<String, OptionValidator> tmp = new HashMap<>();
for (final OptionValidator validator : validators) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ public void setup(OperatorContext context, OutputMutator outputMutator) throws E
}

// setup Input using InputStream
logger.trace("Opening file {}", split.getPath());
stream = dfs.openPossiblyCompressedStream(split.getPath());
input = new TextInput(settings, stream, readBuffer, split.getStart(), split.getStart() + split.getLength());

Expand Down
Loading