Skip to content

Commit

Permalink
Optimize the reading of numerical frame arrays in MSQ (#15175) (#15188)
Browse files Browse the repository at this point in the history
  • Loading branch information
LakshSingla authored Oct 18, 2023
1 parent d51fbba commit c13eb98
Show file tree
Hide file tree
Showing 20 changed files with 241 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
{
return new NumericArrayFieldSelector<Double>(memory, fieldPointer)
{
private static final int FIELD_SIZE = Byte.BYTES + Double.BYTES;
final SettableFieldPointer fieldPointer = new SettableFieldPointer();
final ColumnValueSelector<?> columnValueSelector =
DoubleFieldReader.forArray().makeColumnValueSelector(memory, fieldPointer);
Expand All @@ -45,7 +46,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
@Override
public Double getIndividualValueAtMemory(long position)
{
fieldPointer.setPosition(position);
fieldPointer.setPositionAndLength(position, FIELD_SIZE);
if (columnValueSelector.isNull()) {
return null;
}
Expand All @@ -55,7 +56,7 @@ public Double getIndividualValueAtMemory(long position)
@Override
public int getIndividualFieldSize()
{
return Byte.BYTES + Double.BYTES;
return FIELD_SIZE;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
{
return new NumericArrayFieldSelector<Float>(memory, fieldPointer)
{
private static final int FIELD_SIZE = Byte.BYTES + Float.BYTES;
final SettableFieldPointer fieldPointer = new SettableFieldPointer();
final ColumnValueSelector<?> columnValueSelector =
FloatFieldReader.forArray().makeColumnValueSelector(memory, fieldPointer);
Expand All @@ -45,7 +46,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
@Override
public Float getIndividualValueAtMemory(long position)
{
fieldPointer.setPosition(position);
fieldPointer.setPositionAndLength(position, FIELD_SIZE);
if (columnValueSelector.isNull()) {
return null;
}
Expand All @@ -55,7 +56,7 @@ public Float getIndividualValueAtMemory(long position)
@Override
public int getIndividualFieldSize()
{
return Byte.BYTES + Float.BYTES;
return FIELD_SIZE;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
{
return new NumericArrayFieldSelector<Long>(memory, fieldPointer)
{
private static final int FIELD_SIZE = Byte.BYTES + Long.BYTES;
final SettableFieldPointer fieldPointer = new SettableFieldPointer();
final ColumnValueSelector<?> columnValueSelector =
LongFieldReader.forArray().makeColumnValueSelector(memory, fieldPointer);
Expand All @@ -45,7 +46,7 @@ public ColumnValueSelector<?> makeColumnValueSelector(
@Override
public Long getIndividualValueAtMemory(long position)
{
fieldPointer.setPosition(position);
fieldPointer.setPositionAndLength(position, FIELD_SIZE);
if (columnValueSelector.isNull()) {
return null;
}
Expand All @@ -55,7 +56,7 @@ public Long getIndividualValueAtMemory(long position)
@Override
public int getIndividualFieldSize()
{
return Byte.BYTES + Long.BYTES;
return FIELD_SIZE;
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
import org.apache.druid.segment.ColumnValueSelector;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;

/**
* Base implementation of the column value selector that the concrete numeric field reader implementations inherit from.
Expand Down Expand Up @@ -66,12 +64,8 @@ public abstract class NumericArrayFieldSelector<ElementType extends Number> impl
/**
* Value of the row at the location beginning at {@link #currentFieldPosition}
*/
private final List<ElementType> currentRow = new ArrayList<>();

/**
* Nullity of the row at the location beginning at {@link #currentFieldPosition}
*/
private boolean currentRowIsNull;
@Nullable
private Number[] currentRow = null;

public NumericArrayFieldSelector(final Memory memory, final ReadableFieldPointer fieldPointer)
{
Expand All @@ -89,13 +83,7 @@ public void inspectRuntimeShape(RuntimeShapeInspector inspector)
@Override
public Object getObject()
{
final List<ElementType> currentArray = computeCurrentArray();

if (currentArray == null) {
return null;
}

return currentArray.toArray();
return computeCurrentArray();
}

@Override
Expand Down Expand Up @@ -143,34 +131,29 @@ public boolean isNull()
public abstract int getIndividualFieldSize();

@Nullable
private List<ElementType> computeCurrentArray()
private Number[] computeCurrentArray()
{
final long fieldPosition = fieldPointer.position();
final long fieldLength = fieldPointer.length();

if (fieldPosition != currentFieldPosition) {
updateCurrentArray(fieldPosition);
updateCurrentArray(fieldPosition, fieldLength);
}

this.currentFieldPosition = fieldPosition;

if (currentRowIsNull) {
return null;
}
return currentRow;

}

private void updateCurrentArray(final long fieldPosition)
private void updateCurrentArray(final long fieldPosition, final long fieldLength)
{
currentRow.clear();
currentRowIsNull = false;
currentRow = null;

long position = fieldPosition;
long limit = memory.getCapacity();

// Check the first byte, and if it is null, update the current value to null and return
if (isNull()) {
currentRowIsNull = true;
// Already set the currentRow to null
return;
}

Expand All @@ -179,9 +162,13 @@ private void updateCurrentArray(final long fieldPosition)
position++;
}

int numElements = numElements(fieldLength);
currentRow = new Number[numElements];

// Sanity check, to make sure that we see the rowTerminator at the end
boolean rowTerminatorSeen = false;

int curElement = 0;
while (position < limit) {
final byte kind = memory.getByte(position);

Expand All @@ -193,12 +180,26 @@ private void updateCurrentArray(final long fieldPosition)

// If terminator not seen, then read the field at that location, and increment the position by the element's field
// size to read the next element.
currentRow.add(getIndividualValueAtMemory(position));
currentRow[curElement] = getIndividualValueAtMemory(position);
position += getIndividualFieldSize();
curElement++;
}

if (!rowTerminatorSeen) {
if (!rowTerminatorSeen || curElement != numElements) {
throw DruidException.defensive("Unexpected end of field");
}
}

int numElements(long fieldSize)
{
if (fieldSize <= 1) {
throw DruidException.defensive("fieldSize should be greater than 1 for non null array elements");
}
// Remove one byte for the nullity byte, and one for the array terminator
long cumulativeFieldSize = fieldSize - Byte.BYTES - Byte.BYTES;
if (cumulativeFieldSize % getIndividualFieldSize() != 0) {
throw DruidException.defensive("cumulativeFieldSize should be a multiple of the individual fieldSize");
}
return Math.toIntExact(cumulativeFieldSize / getIndividualFieldSize());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,9 @@ public interface ReadableFieldPointer
* Starting position of the field.
*/
long position();

/**
* Length of the field.
*/
long length();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

/**
* A {@link ReadableFieldPointer} that is derived from a row-based frame.
*
* Returns the position and the length of a field at a particular position for the row that the rowPointer is pointing
* to at the time. It caches the values of the position and the length based on position of the rowPointer.
* This method is not thread-safe
*/
public class RowMemoryFieldPointer implements ReadableFieldPointer
{
Expand All @@ -32,6 +36,16 @@ public class RowMemoryFieldPointer implements ReadableFieldPointer
private final int fieldNumber;
private final int fieldCount;

// Caching of position() calls
private long rowWithCachedPosition = -1L;
private long cachedPosition = -1L;

// Caching of length() calls
// We cache the length() calls separately, because not all field types call length(), therefore it's wasteful to
// compute and cache length() if we are not reading it
private long rowWithCachedLength = -1L;
private long cachedLength = -1L;

public RowMemoryFieldPointer(
final Memory memory,
final ReadableFrameRowPointer rowPointer,
Expand All @@ -47,6 +61,63 @@ public RowMemoryFieldPointer(

@Override
public long position()
{
updatePosition();
return cachedPosition;
}

@Override
public long length()
{
updatePositionAndLength();
return cachedLength;
}

private void updatePosition()
{
long rowPointerPosition = rowPointer.position();
if (rowPointerPosition == rowWithCachedPosition) {
return;
}
// Update the cached position for position()
rowWithCachedPosition = rowPointerPosition;

// Update the start position
cachedPosition = startPosition(fieldNumber);
}

// Not all field types call length(), and therefore there's no need to cache the length of the field. This method
// updates both the position and the length.
private void updatePositionAndLength()
{
updatePosition();

// rowPointerPosition = rowPointer.position() = rowWithCachedPosition, since that was updated in the call to update
// position above
long rowPointerPosition = rowWithCachedPosition;

if (rowPointerPosition == rowWithCachedLength) {
return;
}
// Update the cached position for length()
rowWithCachedLength = rowPointerPosition;

if (fieldNumber == fieldCount - 1) {
// If the field is the last field in the row, then the length of the field would be the end of the row minus the
// start position of the field. End of the row is the start of the row plus the length of the row.
cachedLength = (rowPointerPosition + rowPointer.length()) - cachedPosition;
} else {
// Else the length of the field would be the difference between the start position of the given field and
// the subsequent field
cachedLength = startPosition(fieldNumber + 1) - cachedPosition;
}
}

/**
* Given a fieldNumber, computes the start position of the field. Requires a memory access to read the start position,
* therefore callers should cache the value for better efficiency.
*/
private long startPosition(int fieldNumber)
{
if (fieldNumber == 0) {
// First field starts after the field end pointers -- one integer per field.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,31 @@
package org.apache.druid.frame.field;

/**
* A simple {@link ReadableFieldPointer} that returns the position that was set on its object.
* A simple {@link ReadableFieldPointer} that returns the position and the length that was set on its object.
*/
public class SettableFieldPointer implements ReadableFieldPointer
{

long position = 0;
long length = -1;

public void setPosition(long position)
/**
* Sets the position and the length to be returned when interface's methods are called.
*/
public void setPositionAndLength(long position, long length)
{
this.position = position;
this.length = length;
}

@Override
public long position()
{
return position;
}

@Override
public long length()
{
return length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@
*/
public interface ReadableFrameRowPointer
{
/**
* Position of the start of the row relative to the start of the Frame
*/
long position();

/**
* Length of the row (in bytes)
*/
long length();
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public void test_makeColumnValueSelector_null()
writeToMemory(null);

final ColumnValueSelector<?> readSelector =
new ComplexFieldReader(SERDE).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
new ComplexFieldReader(SERDE).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION, -1));

Assert.assertNull(readSelector.getObject());
}
Expand All @@ -134,7 +134,7 @@ public void test_makeColumnValueSelector_aValue()
writeToMemory("foo");

final ColumnValueSelector<?> readSelector =
new ComplexFieldReader(SERDE).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION));
new ComplexFieldReader(SERDE).makeColumnValueSelector(memory, new ConstantFieldPointer(MEMORY_POSITION, -1));

Assert.assertEquals("foo", readSelector.getObject());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@
public class ConstantFieldPointer implements ReadableFieldPointer
{
private final long position;
private final long length;

public ConstantFieldPointer(long position)
public ConstantFieldPointer(long position, long length)
{
this.position = position;
this.length = length;
}

@Override
public long position()
{
return position;
}

@Override
public long length()
{
return length;
}
}
Loading

0 comments on commit c13eb98

Please sign in to comment.