|
30 | 30 | import io.trino.spi.function.WindowAccumulator; |
31 | 31 | import io.trino.spi.function.WindowIndex; |
32 | 32 | import io.trino.spi.type.Type; |
33 | | -import it.unimi.dsi.fastutil.ints.IntArrayList; |
34 | 33 |
|
35 | 34 | import java.util.List; |
36 | 35 |
|
37 | 36 | import static com.google.common.base.Preconditions.checkArgument; |
38 | 37 | import static com.google.common.base.Preconditions.checkState; |
39 | 38 | import static io.trino.spi.type.BooleanType.BOOLEAN; |
40 | | -import static java.lang.Math.min; |
| 39 | +import static java.lang.Math.toIntExact; |
41 | 40 | import static java.util.Objects.requireNonNull; |
42 | 41 |
|
43 | 42 | public class DistinctWindowAccumulator |
@@ -128,39 +127,46 @@ public void addInput(WindowIndex index, int startPosition, int endPosition) |
128 | 127 |
|
129 | 128 | private void indexCurrentPage(Page page) |
130 | 129 | { |
| 130 | + long initialGroupCount = hash.getGroupCount(); |
131 | 131 | Work<Block> work = hash.markDistinctRows(page); |
132 | 132 | checkState(work.process()); |
133 | 133 | Block distinctMask = work.getResult(); |
134 | 134 |
|
135 | 135 | int positionCount = distinctMask.getPositionCount(); |
136 | 136 | checkArgument(positionCount == page.getPositionCount(), "Page position count does not match distinct mask position count"); |
137 | | - PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(argumentTypes, positionCount); |
| 137 | + |
| 138 | + int distinctPositions = toIntExact(hash.getGroupCount() - initialGroupCount); |
| 139 | + if (distinctPositions == 0) { |
| 140 | + return; |
| 141 | + } |
| 142 | + PagesIndex pagesIndex = pagesIndexFactory.newPagesIndex(argumentTypes, distinctPositions); |
138 | 143 |
|
139 | 144 | if (distinctMask instanceof RunLengthEncodedBlock) { |
140 | | - if (test(distinctMask, 0)) { |
141 | | - // all positions selected |
142 | | - pagesIndex.addPage(page); |
143 | | - } |
| 145 | + // all positions selected |
| 146 | + checkState(test(distinctMask, 0), "all positions must be distinct"); |
| 147 | + pagesIndex.addPage(page); |
144 | 148 | } |
145 | 149 | else { |
146 | | - IntArrayList selectedPositions = new IntArrayList(min(128, positionCount)); |
| 150 | + int[] selectedPositions = new int[distinctPositions]; |
| 151 | + int selectedIndex = 0; |
147 | 152 | for (int position = 0; position < positionCount; position++) { |
148 | 153 | if (test(distinctMask, position)) { |
149 | | - selectedPositions.add(position); |
| 154 | + selectedPositions[selectedIndex++] = position; |
150 | 155 | } |
151 | 156 | } |
| 157 | + checkState(selectedIndex == selectedPositions.length, "Invalid positions in distinct mask"); |
152 | 158 |
|
153 | 159 | Block[] filteredBlocks = new Block[argumentChannels.size()]; |
154 | 160 | for (int channel = 0; channel < argumentChannels.size(); channel++) { |
155 | | - filteredBlocks[channel] = page.getBlock(channel).copyPositions(selectedPositions.elements(), 0, selectedPositions.size()); |
| 161 | + filteredBlocks[channel] = page.getBlock(channel).copyPositions(selectedPositions, 0, selectedPositions.length); |
156 | 162 | } |
157 | | - pagesIndex.addPage(new Page(selectedPositions.size(), filteredBlocks)); |
| 163 | + pagesIndex.addPage(new Page(selectedPositions.length, filteredBlocks)); |
158 | 164 | } |
159 | 165 | int selectedPositionsCount = pagesIndex.getPositionCount(); |
160 | | - if (selectedPositionsCount > 0) { |
161 | | - PagesWindowIndex selectedWindowIndex = new PagesWindowIndex(pagesIndex, 0, selectedPositionsCount); |
162 | | - delegate.addInput(selectedWindowIndex, 0, selectedPositionsCount - 1); |
163 | | - } |
| 166 | + checkState(selectedPositionsCount == distinctPositions, "unexpected pagesIndex positions: %s <> %s", selectedPositionsCount, distinctPositions); |
| 167 | + |
| 168 | + PagesWindowIndex selectedWindowIndex = new PagesWindowIndex(pagesIndex, 0, selectedPositionsCount); |
| 169 | + delegate.addInput(selectedWindowIndex, 0, selectedPositionsCount - 1); |
164 | 170 | } |
165 | 171 |
|
166 | 172 | private static boolean test(Block block, int position) |
|
0 commit comments