Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -550,12 +550,12 @@ private Comparable getComparisonValue(GenericRow row) {
"Documents must have exactly 1 non-null comparison column value");

comparableIndex = i;

Object comparisonValue = row.getValue(columnName);
Preconditions.checkState(comparisonValue instanceof Comparable,
"Upsert comparison column: %s must be comparable", columnName);
comparisonValues[i] = (Comparable) comparisonValue;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change means that we always set a non-null value for each index of comparisonValues, whereas previous all indices != comparableIndex would be null.

Object comparisonValue = row.getValue(columnName);
Preconditions.checkState(comparisonValue instanceof Comparable,
"Upsert comparison column: %s must be comparable", columnName);
comparisonValues[i] = (Comparable) comparisonValue;
}
Preconditions.checkState(comparableIndex != -1, "Documents must have exactly 1 non-null comparison column value");
return new ComparisonColumns(comparisonValues, comparableIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,27 +46,31 @@ public int compareToSealed(ComparisonColumns other) {
- if all values between the two sets of Comparables are equal (compareTo == 0), keep _values as-is and return 0
*/
for (int i = 0; i < _values.length; i++) {
Comparable comparisonValue = _values[i];
Comparable otherComparisonValue = other.getValues()[i];
if (comparisonValue == null && otherComparisonValue == null) {
continue;
int comparisonResult = compareToIndex(other, i);
if (comparisonResult != 0) {
return comparisonResult;
}
}

return 0;
}

// Always keep the record with non-null value, or that with the greater comparisonResult
if (comparisonValue == null) {
// implies comparisonValue == null && otherComparisonValue != null
return -1;
} else if (otherComparisonValue == null) {
// implies comparisonValue != null && otherComparisonValue == null
return 1;
} else {
int comparisonResult = comparisonValue.compareTo(otherComparisonValue);
if (comparisonResult != 0) {
return comparisonResult;
}
private int compareToIndex(ComparisonColumns other, int comparableIndex) {
Comparable otherComparisonValue = other.getValues()[comparableIndex];
return _values[comparableIndex].compareTo(otherComparisonValue);
}

private void mergeComparisonValues(ComparisonColumns other) {
// TODO(egalpin): This method currently may have side-effects on _values. Depending on the result of compareTo,
// entities from {@param other} may be merged into _values. This really should not be done implicitly as part
// of compareTo, but has been implemented this way to minimize the changes required within all subclasses of
// {@link BasePartitionUpsertMetadataManager}. Ideally, this merge should only be triggered explicitly by
// implementations of {@link BasePartitionUpsertMetadataManager}.
for (int i = 0; i < _values.length; i++) {
if (i != _comparableIndex) {
_values[i] = other._values[i];
}
}
return 0;
}

@Override
Expand All @@ -75,34 +79,11 @@ public int compareTo(ComparisonColumns other) {
return compareToSealed(other);
}

// _comparisonColumns should only at most one non-null comparison value for newly ingested data. If not, it is
// the user's responsibility. There is no attempt to guarantee behavior in the case where there are multiple
// non-null values
int comparisonResult;

Comparable comparisonValue = _values[_comparableIndex];
Comparable otherComparisonValue = other.getValues()[_comparableIndex];

if (otherComparisonValue == null) {
// Keep this record because the existing record has no value for the same comparison column, therefore the
// (lack of) existing value could not possibly cause the new value to be rejected.
comparisonResult = 1;
} else {
comparisonResult = comparisonValue.compareTo(otherComparisonValue);
}

int comparisonResult = compareToIndex(other, _comparableIndex);
if (comparisonResult >= 0) {
// TODO(egalpin): This method currently may have side-effects on _values. Depending on the comparison result,
// entities from {@param other} may be merged into _values. This really should not be done implicitly as part
// of compareTo, but has been implemented this way to minimize the changes required within all subclasses of
// {@link BasePartitionUpsertMetadataManager}. Ideally, this merge should only be triggered explicitly by
// implementations of {@link BasePartitionUpsertMetadataManager}.
for (int i = 0; i < _values.length; i++) {
if (i != _comparableIndex) {
_values[i] = other._values[i];
}
}
mergeComparisonValues(other);
}

return comparisonResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,17 +69,8 @@ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
if (!_primaryKeyColumns.contains(column)) {
if (!previousRecord.isNullValue(column)) {
if (newRecord.isNullValue(column)) {
// Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of using
// multiple comparison columns. We never apply a merge function to it, rather we just take any/all non-null
// comparison column values from the previous record, and the sole non-null comparison column value from
// the new record.
newRecord.putValue(column, previousRecord.getValue(column));
if (!_comparisonColumns.contains(column)) {
// Despite wanting to overwrite the values to comparison columns from prior records, we want to
// preserve for _this_ record which comparison column was non-null. Doing so will allow us to
// re-evaluate the same comparisons when reading a segment and during steady-state stream ingestion
newRecord.removeNullValueField(column);
}
newRecord.removeNullValueField(column);
} else if (!_comparisonColumns.contains(column)) {
PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger);
newRecord.putValue(column,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,7 @@ public Comparable getComparisonValue(int docId) {

for (int i = 0; i < _comparisonColumnReaders.length; i++) {
PinotSegmentColumnReader columnReader = _comparisonColumnReaders[i];
Comparable comparisonValue = null;
if (!columnReader.isNull(docId)) {
comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
}
Comparable comparisonValue = (Comparable) UpsertUtils.getValue(columnReader, docId);
comparisonColumns[i] = comparisonValue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ private void testUpsertIngestion(UpsertConfig upsertConfig)
// Confirm that both comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205397L, _mutableSegmentImpl.getValue(2, "secondsSinceEpoch"));
Assert.assertEquals(1567205395L, _mutableSegmentImpl.getValue(2, "otherComparisonColumn"));
Assert.assertTrue(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2));
// Confirm that both comparison columns are marked as not null
Assert.assertFalse(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(2));
Assert.assertFalse(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(2));

// bb
Assert.assertFalse(bitmap.contains(4));
Expand All @@ -148,6 +150,8 @@ private void testUpsertIngestion(UpsertConfig upsertConfig)
// Confirm that comparison column values have made it into the persisted upserted doc
Assert.assertEquals(1567205396L, _mutableSegmentImpl.getValue(5, "secondsSinceEpoch"));
Assert.assertEquals(Long.MIN_VALUE, _mutableSegmentImpl.getValue(5, "otherComparisonColumn"));
// Confirm that only "secondsSinceEpoch" comparison columns is not marked as null
Assert.assertFalse(_mutableSegmentImpl.getDataSource("secondsSinceEpoch").getNullValueVector().isNull(5));
Assert.assertTrue(_mutableSegmentImpl.getDataSource("otherComparisonColumn").getNullValueVector().isNull(5));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@


public class ComparisonColumnsTest {
private void nullFill(Comparable[]... comparables) {
private void minIntFill(Comparable[]... comparables) {
for (Comparable[] comps : comparables) {
Arrays.fill(comps, null);
Arrays.fill(comps, Integer.MIN_VALUE);
}
}

Expand All @@ -44,31 +44,31 @@ public void testRealtimeComparison() {
Assert.assertEquals(comparisonResult, -1);

// persist same col with equal value
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 0);

// persist same col with larger value
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, null});
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, Integer.MIN_VALUE, Integer.MIN_VALUE});

// persist doc with col which was previously null, even though its value is smaller than the previous non-null col
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
toBeIngested = new ComparisonColumns(newComparables, newComparables.length - 1);
newComparables[newComparables.length - 1] = 1;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, 1);
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, null, 1});
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{2, Integer.MIN_VALUE, 1});

// persist new doc where existing doc has multiple non-null comparison values
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
toBeIngested = new ComparisonColumns(newComparables, 1);
newComparables[1] = 2;
Arrays.fill(persistedComparables, 1);
Expand All @@ -77,7 +77,7 @@ public void testRealtimeComparison() {
Assert.assertEquals(toBeIngested.getValues(), new Comparable[]{1, 2, 1});

// reject new doc where existing doc has multiple non-null comparison values
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[1] = 0;
Arrays.fill(persistedComparables, 1);
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Expand Down Expand Up @@ -106,7 +106,7 @@ public void testSealedComparison() {
Assert.assertEquals(comparisonResult, -1);

// persist same col with equal value
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 2;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Expand All @@ -115,7 +115,7 @@ public void testSealedComparison() {
Assert.assertEquals(toBeIngested.getValues(), newComparables);

// persist same col with larger value
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 2;
persistedComparables[0] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Expand All @@ -124,15 +124,15 @@ public void testSealedComparison() {
// reject doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has 2
// null columns. The presence of null columns in one of the docs implies that it must have come before the doc
// with non-null columns.
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[1] = 1;
persistedComparables[0] = 1;
persistedComparables[2] = 1;
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// persist doc where existing doc has more than one, but not all, non-null comparison values, but _this_ doc has
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 1;
newComparables[2] = 2;
persistedComparables[0] = 1;
Expand All @@ -142,7 +142,7 @@ public void testSealedComparison() {

// persist doc with non-null value where existing doc had null value in same column previously (but multiple
// non-null in other columns)
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[0] = 1;
newComparables[1] = 1;
newComparables[2] = 1;
Expand All @@ -154,14 +154,14 @@ public void testSealedComparison() {
// reject doc where existing doc has all non-null comparison values, but _this_ doc has 2 null values.
// The presence of null columns in one of the docs implies that it must have come before the doc with non-null
// columns.
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
newComparables[1] = 1;
Arrays.fill(persistedComparables, 1);
comparisonResult = toBeIngested.compareTo(alreadyPersisted);
Assert.assertEquals(comparisonResult, -1);

// Persist doc where existing doc has all non-null comparison values, but _this_ doc has a larger value.
nullFill(newComparables, persistedComparables);
minIntFill(newComparables, persistedComparables);
Arrays.fill(newComparables, 1);
Arrays.fill(persistedComparables, 1);
newComparables[1] = 2;
Expand Down