Skip to content

Commit

Permalink
Refactor more operators to use applicate Page method helpers
Browse files Browse the repository at this point in the history
Avoids extra copies of Block array values
  • Loading branch information
pettyjamesm authored and highker committed Sep 21, 2020
1 parent 20a6c40 commit 14ba79c
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void addInput(Page page)
// we know the exact size required for the block
BlockBuilder blockBuilder = BOOLEAN.createFixedSizeBlockBuilder(page.getPositionCount());

Page probeJoinPage = new Page(page.getBlock(probeJoinChannel));
Page probeJoinPage = page.extractChannel(probeJoinChannel);

// update hashing strategy to use probe cursor
for (int position = 0; position < page.getPositionCount(); position++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.operator.ChannelSet.ChannelSetBuilder;
import com.facebook.presto.spi.plan.PlanNodeId;
Expand Down Expand Up @@ -122,8 +121,7 @@ public OperatorFactory duplicate()

private final OperatorContext operatorContext;
private final SetSupplier setSupplier;
private final int setChannel;
private final Optional<Integer> hashChannel;
private final int[] sourceChannels;

private final ChannelSetBuilder channelSetBuilder;

Expand All @@ -142,9 +140,14 @@ public SetBuilderOperator(
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.setSupplier = requireNonNull(setSupplier, "setProvider is null");
this.setChannel = setChannel;

this.hashChannel = requireNonNull(hashChannel, "hashChannel is null");
if (requireNonNull(hashChannel, "hashChannel is null").isPresent()) {
this.sourceChannels = new int[]{setChannel, hashChannel.get()};
}
else {
this.sourceChannels = new int[]{setChannel};
}

// Set builder is has a single channel which goes in channel 0, if hash is present, add a hachBlock to channel 1
Optional<Integer> channelSetHashChannel = hashChannel.isPresent() ? Optional.of(1) : Optional.empty();
this.channelSetBuilder = new ChannelSetBuilder(
Expand Down Expand Up @@ -195,10 +198,7 @@ public void addInput(Page page)
requireNonNull(page, "page is null");
checkState(!isFinished(), "Operator is already finished");

Block sourceBlock = page.getBlock(setChannel);
Page sourcePage = hashChannel.isPresent() ? new Page(sourceBlock, page.getBlock(hashChannel.get())) : new Page(sourceBlock);

unfinishedWork = channelSetBuilder.addPage(sourcePage);
unfinishedWork = channelSetBuilder.addPage(page.extractChannels(sourceChannels));
processUnfinishedWork();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.block.SortOrder;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.memory.context.LocalMemoryContext;
Expand Down Expand Up @@ -129,7 +128,7 @@ public OperatorFactory duplicate()
private final OperatorContext operatorContext;
private final LocalMemoryContext localUserMemoryContext;

private final List<Integer> outputChannels;
private final int[] outputChannels;

private final GroupByHash groupByHash;
private final GroupedTopNBuilder groupedTopNBuilder;
Expand Down Expand Up @@ -162,7 +161,7 @@ public TopNRowNumberOperator(
if (generateRowNumber) {
outputChannelsBuilder.add(outputChannels.size());
}
this.outputChannels = outputChannelsBuilder.build();
this.outputChannels = Ints.toArray(outputChannelsBuilder.build());

checkArgument(maxRowCountPerPartition > 0, "maxRowCountPerPartition must be > 0");

Expand Down Expand Up @@ -253,13 +252,7 @@ public Page getOutput()

Page output = null;
if (outputIterator.hasNext()) {
Page page = outputIterator.next();
// rewrite to expected column ordering
Block[] blocks = new Block[page.getChannelCount()];
for (int i = 0; i < outputChannels.size(); i++) {
blocks[i] = page.getBlock(outputChannels.get(i));
}
output = new Page(blocks);
output = outputIterator.next().extractChannels(outputChannels);
}
updateMemoryReservation();
return output;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -550,14 +550,9 @@ public Type getFinalType()
@Override
public void addInput(GroupByIdBlock groupIdsBlock, Page page)
{
Block[] blocks = new Block[page.getChannelCount() + 1];
for (int i = 0; i < page.getChannelCount(); i++) {
blocks[i] = page.getBlock(i);
}
// Add group id block
blocks[page.getChannelCount()] = groupIdsBlock;
groupCount = max(groupCount, groupIdsBlock.getGroupCount());
pagesIndex.addPage(new Page(blocks));
// Add group id block
pagesIndex.addPage(page.appendColumn(groupIdsBlock));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.operator.index;

import com.facebook.presto.common.Page;
import com.facebook.presto.common.block.Block;
import com.facebook.presto.common.function.SqlFunctionProperties;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.operator.OperatorFactory;
Expand Down Expand Up @@ -91,8 +90,7 @@ public DynamicTupleFilterFactory(

public OperatorFactory filterWithTuple(Page tuplePage)
{
Page filterTuple = getFilterTuple(tuplePage);
Supplier<PageProcessor> processor = createPageProcessor(filterTuple, OptionalInt.empty());
Supplier<PageProcessor> processor = createPageProcessor(tuplePage.extractChannels(tupleFilterChannels), OptionalInt.empty());
return new FilterAndProjectOperatorFactory(filterOperatorId, planNodeId, processor, outputTypes, new DataSize(0, BYTE), 0);
}

Expand All @@ -107,13 +105,4 @@ public Supplier<PageProcessor> createPageProcessor(Page filterTuple, OptionalInt
.collect(toImmutableList()),
initialBatchSize);
}

private Page getFilterTuple(Page tuplePage)
{
Block[] normalizedBlocks = new Block[tupleFilterChannels.length];
for (int i = 0; i < tupleFilterChannels.length; i++) {
normalizedBlocks[i] = tuplePage.getBlock(tupleFilterChannels[i]);
}
return new Page(normalizedBlocks);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,8 @@ public UnloadedIndexKeyRecordSet(
for (UpdateRequest request : requests) {
Page page = request.getPage();

Block[] distinctBlocks = new Block[distinctChannels.length];
for (int i = 0; i < distinctBlocks.length; i++) {
distinctBlocks[i] = page.getBlock(distinctChannels[i]);
}

// Move through the positions while advancing the cursors in lockstep
Work<GroupByIdBlock> work = groupByHash.getGroupIds(new Page(distinctBlocks));
Work<GroupByIdBlock> work = groupByHash.getGroupIds(page.extractChannels(distinctChannels));
boolean done = work.process();
// TODO: this class does not yield wrt memory limit; enable it
verify(done);
Expand Down

0 comments on commit 14ba79c

Please sign in to comment.