|
27 | 27 | import org.elasticsearch.compute.operator.AbstractPageMappingOperator;
|
28 | 28 | import org.elasticsearch.compute.operator.DriverContext;
|
29 | 29 | import org.elasticsearch.compute.operator.Operator;
|
30 |
| -import org.elasticsearch.core.Assertions; |
31 | 30 | import org.elasticsearch.core.Releasable;
|
32 | 31 | import org.elasticsearch.core.Releasables;
|
33 | 32 | import org.elasticsearch.index.fieldvisitor.StoredFieldLoader;
|
@@ -158,12 +157,6 @@ public int get(int i) {
|
158 | 157 | many.run();
|
159 | 158 | }
|
160 | 159 | }
|
161 |
| - if (Assertions.ENABLED) { |
162 |
| - for (int f = 0; f < fields.length; f++) { |
163 |
| - assert blocks[f].elementType() == ElementType.NULL || blocks[f].elementType() == fields[f].info.type |
164 |
| - : blocks[f].elementType() + " NOT IN (NULL, " + fields[f].info.type + ")"; |
165 |
| - } |
166 |
| - } |
167 | 160 | success = true;
|
168 | 161 | return page.appendBlocks(blocks);
|
169 | 162 | } catch (IOException e) {
|
@@ -227,6 +220,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
|
227 | 220 | BlockLoader.ColumnAtATimeReader columnAtATime = field.columnAtATime(ctx);
|
228 | 221 | if (columnAtATime != null) {
|
229 | 222 | blocks[f] = (Block) columnAtATime.read(loaderBlockFactory, docs);
|
| 223 | + sanityCheckBlock(columnAtATime, docs.count(), blocks[f], f); |
230 | 224 | } else {
|
231 | 225 | rowStrideReaders.add(
|
232 | 226 | new RowStrideReaderWork(
|
@@ -276,6 +270,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
|
276 | 270 | }
|
277 | 271 | for (RowStrideReaderWork work : rowStrideReaders) {
|
278 | 272 | blocks[work.offset] = work.build();
|
| 273 | + sanityCheckBlock(work.reader, docs.count(), blocks[work.offset], work.offset); |
279 | 274 | }
|
280 | 275 | } finally {
|
281 | 276 | Releasables.close(rowStrideReaders);
|
@@ -379,6 +374,7 @@ void run() throws IOException {
|
379 | 374 | try (Block targetBlock = fieldTypeBuilders[f].build()) {
|
380 | 375 | target[f] = targetBlock.filter(backwards);
|
381 | 376 | }
|
| 377 | + sanityCheckBlock(rowStride[f], docs.getPositionCount(), target[f], f); |
382 | 378 | }
|
383 | 379 | }
|
384 | 380 |
|
@@ -555,6 +551,40 @@ protected Status status(long processNanos, int pagesProcessed, long rowsReceived
|
555 | 551 | return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted);
|
556 | 552 | }
|
557 | 553 |
|
| 554 | + /** |
| 555 | + * Quick checks for on the loaded block to make sure it looks reasonable. |
| 556 | + * @param loader the object that did the loading - we use it to make error messages if the block is busted |
| 557 | + * @param expectedPositions how many positions the block should have - it's as many as the incoming {@link Page} has |
| 558 | + * @param block the block to sanity check |
| 559 | + * @param field offset into the {@link #fields} array for the block being loaded |
| 560 | + */ |
| 561 | + private void sanityCheckBlock(Object loader, int expectedPositions, Block block, int field) { |
| 562 | + if (block.getPositionCount() != expectedPositions) { |
| 563 | + throw new IllegalStateException( |
| 564 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 565 | + + " has [" |
| 566 | + + block.getPositionCount() |
| 567 | + + "] positions instead of [" |
| 568 | + + expectedPositions |
| 569 | + + "]" |
| 570 | + ); |
| 571 | + } |
| 572 | + if (block.elementType() != ElementType.NULL && block.elementType() != fields[field].info.type) { |
| 573 | + throw new IllegalStateException( |
| 574 | + sanityCheckBlockErrorPrefix(loader, block, field) |
| 575 | + + "'s element_type [" |
| 576 | + + block.elementType() |
| 577 | + + "] NOT IN (NULL, " |
| 578 | + + fields[field].info.type |
| 579 | + + ")" |
| 580 | + ); |
| 581 | + } |
| 582 | + } |
| 583 | + |
| 584 | + private String sanityCheckBlockErrorPrefix(Object loader, Block block, int field) { |
| 585 | + return fields[field].info.name + "[" + loader + "]: " + block; |
| 586 | + } |
| 587 | + |
558 | 588 | public static class Status extends AbstractPageMappingOperator.Status {
|
559 | 589 | public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(
|
560 | 590 | Operator.Status.class,
|
|
0 commit comments