Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
public class ComparisonColumns implements Comparable<ComparisonColumns> {
private final Comparable[] _values;
private final int _comparableIndex;
public static final int SEALED_SEGMENT_COMPARISON_INDEX = -1;

public ComparisonColumns(Comparable[] values, int comparableIndex) {
_values = values;
Expand All @@ -37,10 +38,46 @@ public int getComparableIndex() {
return _comparableIndex;
}

public int compareToSealed(ComparisonColumns other) {
/*
- iterate over all columns
- if any value in _values is greater than its counterpart in _other._values, keep _values as-is and return 1
- if any value in _values is less than its counterpart in _other._values, keep _values as-is and return -1
- if all values between the two sets of Comparables are equal (compareTo == 0), keep _values as-is and return 0
*/
for (int i = 0; i < _values.length; i++) {
Comparable comparisonValue = _values[i];
Comparable otherComparisonValue = other.getValues()[i];
if (comparisonValue == null && otherComparisonValue == null) {
continue;
}

// Always keep the record with non-null value, or that with the greater comparisonResult
if (comparisonValue == null) {
// implies comparisonValue == null && otherComparisonValue != null
return -1;
} else if (otherComparisonValue == null) {
// implies comparisonValue != null && otherComparisonValue == null
return 1;
} else {
int comparisonResult = comparisonValue.compareTo(otherComparisonValue);
if (comparisonResult != 0) {
return comparisonResult;
}
}
}
return 0;
}

@Override
public int compareTo(ComparisonColumns other) {
// _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility.
// There is no attempt to guarantee behavior in the case where there are multiple non-null values
if (_comparableIndex == SEALED_SEGMENT_COMPARISON_INDEX) {
return compareToSealed(other);
}

// _comparisonColumns should only at most one non-null comparison value for newly ingested data. If not, it is
// the user's responsibility. There is no attempt to guarantee behavior in the case where there are multiple
// non-null values
int comparisonResult;

Comparable comparisonValue = _values[_comparableIndex];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
// comparison column values from the previous record, and the sole non-null comparison column value from
// the new record.
newRecord.putValue(column, previousRecord.getValue(column));
newRecord.removeNullValueField(column);
if (!_comparisonColumns.contains(column)) {
// Despite wanting to overwrite the values to comparison columns from prior records, we want to
// preserve for _this_ record which comparison column was non-null. Doing so will allow us to
// re-evaluate the same comparisons when reading a segment and during steady-state stream ingestion
newRecord.removeNullValueField(column);
}
} else if (!_comparisonColumns.contains(column)) {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
newRecord.putValue(column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ public static class MultiComparisonColumnReader implements UpsertUtils.Compariso

public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) {
_comparisonColumnReaders = new PinotSegmentColumnReader[comparisonColumns.size()];

for (int i = 0; i < comparisonColumns.size(); i++) {
_comparisonColumnReaders[i] = new PinotSegmentColumnReader(segment, comparisonColumns.get(i));
}
Expand All @@ -190,13 +191,14 @@ public Comparable getComparisonValue(int docId) {

for (int i = 0; i < _comparisonColumnReaders.length; i++) {
PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
Comparable comparisonValue = null;
if (!columnReader.isNull(docId)) {
comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
}
comparisonColumns[i] = comparisonValue;
}

// Note that the comparable index is negative here to indicate that this instance could be the argument to
// ComparisonColumns#compareTo, but should never call compareTo itself.
return new ComparisonColumns(comparisonColumns, -1);
return new ComparisonColumns(comparisonColumns, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ private void setup(UpsertConfig upsertConfigWithHash)
_schema = Schema.fromFile(new File(schemaResourceUrl.getFile()));
_tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName("testTable").setUpsertConfig(upsertConfigWithHash)
.setNullHandlingEnabled(true)
.build();
_recordTransformer = CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
Expand Down Expand Up @@ -138,6 +139,7 @@ private void testUpsertIngestion(UpsertConfig upsertConfig)
// Confirm that both comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205397L, _mutableSegmentImpl.getValue(2, "secondsSinceEpoch"));
Assert.assertEquals(1567205395L, _mutableSegmentImpl.getValue(2, "otherComparisonColumn"));
Assert.assertTrue(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2));

// bb
Assert.assertFalse(bitmap.contains(4));
Expand All @@ -146,6 +148,7 @@ private void testUpsertIngestion(UpsertConfig upsertConfig)
// Confirm that comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205396L, _mutableSegmentImpl.getValue(5, "secondsSinceEpoch"));
Assert.assertEquals(Long.MIN_VALUE, _mutableSegmentImpl.getValue(5, "otherComparisonColumn"));
Assert.assertTrue(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(5));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.upsert;

import java.util.Arrays;
import org.testng.Assert;
import org.testng.annotations.Test;


public class ComparisonColumnsTest {
private void nullFill(Comparable[]... comparables) {
for (Comparable[] comps : comparables) {
Arrays.fill(comps, null);
}
}

@Test
public void testRealtimeComparison() {
Comparable[] newComparables = new Comparable[3];
Comparable[] persistedComparables = new Comparable[3];
ComparisonColumns alreadyPersisted = new ComparisonColumns(persistedComparables, 0);
ComparisonColumns toBeIngested = new ComparisonColumns(newComparables, 0);

// reject same col with smaller value
newComparables[0] = 1;
persistedComparables[0] = 2;
int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// persist same col with equal value
nullFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 0);

// persist same col with larger value
nullFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, null});

// persist doc with col which was previously null, even though its value is smaller than the previous non-null col
nullFill(newComparables, persistedComparables);
toBeIngested = new ComparisonColumns(newComparables, newComparables.length - 1);
newComparables[newComparables.length - 1] = 1;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, 1});

// persist new doc where existing doc has multiple non-null comparison values
nullFill(newComparables, persistedComparables);
toBeIngested = new ComparisonColumns(newComparables, 1);
newComparables[1] = 2;
Arrays.fill(persistedComparables, 1);
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{1, 2, 1});

// reject new doc where existing doc has multiple non-null comparison values
nullFill(newComparables, persistedComparables);
newComparables[1] = 0;
Arrays.fill(persistedComparables, 1);
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);
}

@Test
public void testSealedComparison() {
// Remember to be cognizant of which scenarios are _actually_ possible in a sealed segment. The way in which docs
// are compared during realtime ingestion dictates the possible scenarios of persisted rows. Ex. it is not
// possible for 2 docs with the same primary key to have a mutually exclusive set of non-null values; if such a
// scenario arose during realtime ingestion, the values would be merged such that the newly persisted doc would
// have all non-null comparison values. We should avoid making tests pass for scenarios that are not intended to
// be supported.
Comparable[] newComparables = new Comparable[3];
Comparable[] persistedComparables = new Comparable[3];
ComparisonColumns alreadyPersisted =
new ComparisonColumns(persistedComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);
ComparisonColumns toBeIngested =
new ComparisonColumns(newComparables, ComparisonColumns.SEALED_SEGMENT_COMPARISON_INDEX);

// reject same col with smaller value
newComparables[0] = 1;
persistedComparables[0] = 2;
int comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// persist same col with equal value
nullFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 0);
// Verify unchanged comparables in the case of SEALED comparison
Assert.assertEquals(toBeIngested.getValues(), newComparables);

// persist same col with larger value
nullFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);

// reject doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has 2
// null columns. The presence of null columns in one of the docs implies that it must have come before the doc
// with non-null columns.
nullFill(newComparables, persistedComparables);
newComparables[1] = 1;
persistedComparables[0] = 1;
persistedComparables[2] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// persist doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has
nullFill(newComparables, persistedComparables);
newComparables[0] = 1;
newComparables[2] = 2;
persistedComparables[0] = 1;
persistedComparables[2] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);

// persist doc with non-null value where existing doc had null value in same column previously (but multiple
// non-null in other columns)
nullFill(newComparables, persistedComparables);
newComparables[0] = 1;
newComparables[1] = 1;
newComparables[2] = 1;
persistedComparables[0] = 1;
persistedComparables[2] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);

// reject doc where existing doc has all non-null comparison values, but _this_ doc has 2 null values.
// The presence of null columns in one of the docs implies that it must have come before the doc with non-null
// columns.
nullFill(newComparables, persistedComparables);
newComparables[1] = 1;
Arrays.fill(persistedComparables, 1);
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// Persist doc where existing doc has all non-null comparison values, but _this_ doc has a larger value.
nullFill(newComparables, persistedComparables);
Arrays.fill(newComparables, 1);
Arrays.fill(persistedComparables, 1);
newComparables[1] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
}
}