|
27 | 27 | import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
|
28 | 28 | import org.elasticsearch.compute.operator.DriverContext;
|
29 | 29 | import org.elasticsearch.compute.operator.Operator;
|
30 |
| -import org.elasticsearch.core.Assertions; |
31 | 30 | import org.elasticsearch.core.Releasable;
|
32 | 31 | import org.elasticsearch.core.Releasables;
|
33 | 32 | import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
|
@@ -161,12 +160,6 @@ public int get(int i) {
|
161 | 160 | many.run();
|
162 | 161 | }
|
163 | 162 | }
|
164 |
| - if (Assertions.ENABLED) { |
165 |
| - for (int f = 0; f < fields.length; f++) { |
166 |
| - assert blocks[f].elementType() == ElementType.NULL || blocks[f].elementType() == fields[f].info.type |
167 |
| - : blocks[f].elementType() + " NOT IN (NULL, " + fields[f].info.type + ")"; |
168 |
| - } |
169 |
| - } |
170 | 163 | success = true;
|
171 | 164 | for (Block b : blocks) {
|
172 | 165 | valuesLoaded += b.getTotalValueCount();
|
@@ -233,6 +226,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
|
233 | 226 | BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
|
234 | 227 | if (columnAtATime != null) {
|
235 | 228 | blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs);
|
| 229 | + sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f); |
236 | 230 | } else {
|
237 | 231 | rowStrideReaders.add(
|
238 | 232 | new RowStrideReaderWork(
|
@@ -282,6 +276,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
|
282 | 276 | }
|
283 | 277 | for (RowStrideReaderWork work : rowStrideReaders) {
|
284 | 278 | blocks[work.offset] = work.build();
|
| 279 | + sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset); |
285 | 280 | }
|
286 | 281 | } finally {
|
287 | 282 | Releasables.close(rowStrideReaders);
|
@@ -385,6 +380,7 @@ void run() throws IOException {
|
385 | 380 | try (Block targetBlock = fieldTypeBuilders[f].build()) {
|
386 | 381 | target[f] = targetBlock.filter(backwards);
|
387 | 382 | }
|
| 383 | + sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f); |
388 | 384 | }
|
389 | 385 | }
|
390 | 386 |
|
@@ -561,6 +557,40 @@ protected Status status(long processNanos, int pagesProcessed, long rowsReceived
|
561 | 557 | return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted, valuesLoaded);
|
562 | 558 | }
|
563 | 559 |
|
| 560 | + /** |
| 561 | + * Quick checks for on the loaded block to make sure it looks reasonable. |
| 562 | + * @param loader the object that did the loading - we use it to make error messages if the block is busted |
| 563 | + * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has |
| 564 | + * @param block the block to sanity check |
| 565 | + * @param field offset into the {@link #fields} array for the block being loaded |
| 566 | + */ |
| 567 | + private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { |
| 568 | + if (block.getPositionCount() != expectedPositions) { |
| 569 | + throw new IllegalStateException( |
| 570 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 571 | + + " has [" |
| 572 | + + block.getPositionCount() |
| 573 | + + "] positions instead of [" |
| 574 | + + expectedPositions |
| 575 | + + "]" |
| 576 | + ); |
| 577 | + } |
| 578 | + if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { |
| 579 | + throw new IllegalStateException( |
| 580 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 581 | + + "'s element_type [" |
| 582 | + + block.elementType() |
| 583 | + + "] NOT IN (NULL, " |
| 584 | + + fields[field].info.type |
| 585 | + + ")" |
| 586 | + ); |
| 587 | + } |
| 588 | + } |
| 589 | + |
| 590 | + private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { |
| 591 | + return fields[field].info.name + "[" + loader + "]: " + block; |
| 592 | + } |
| 593 | + |
564 | 594 | public static class Status extends AbstractPageMappingOperator.Status {
|
565 | 595 | public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
566 | 596 | Operator.Status.class,
|
|
0 commit comments