Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1377,14 +1377,15 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.setAggregateMetrics(indexingConfig.isAggregateMetrics())
.setIngestionAggregationConfigs(IngestionConfigUtils.getAggregationConfigs(tableConfig))
.setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setConsumerDir(consumerDir).setUpsertConfig(tableConfig.getUpsertConfig())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());

// Create message decoder
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(), _schema);
Set<String> fieldsToRead = IngestionUtils.getFieldsForRecordExtractor(_tableConfig.getIngestionConfig(),
_tableConfig.getUpsertConfig(), _schema);
StreamMessageDecoder streamMessageDecoder = StreamDecoderProvider.create(_partitionLevelStreamConfig, fieldsToRead);
_streamDataDecoder = new StreamDataDecoderImpl(streamMessageDecoder);
_clientId = streamTopic + "-" + _partitionGroupId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfigProperties;
Expand Down Expand Up @@ -215,6 +216,13 @@ protected void overrideServerConf(PinotConfiguration configuration) {
}
}

@Override
protected IngestionConfig getIngestionConfig() {
IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setContinueOnError(true);
return ingestionConfig;
}

@Override
protected void createSegmentsAndUpload(List<File> avroFiles, Schema schema, TableConfig tableConfig)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerMeter;
Expand Down Expand Up @@ -162,6 +163,7 @@ public class MutableSegmentImpl implements MutableSegment {
private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders;

private final UpsertConfig.Mode _upsertMode;
private final String _upsertDeleteKey;
private final String _upsertComparisonColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
Expand Down Expand Up @@ -399,7 +401,9 @@ public boolean isMutableSegment() {
}

// init upsert-related data structure
_upsertMode = config.getUpsertMode();
_upsertMode = (config.getUpsertConfig() == null || config.getUpsertConfig().getMode() == null)
? UpsertConfig.Mode.NONE : config.getUpsertConfig().getMode();
_upsertDeleteKey = config.getUpsertConfig() == null ? null : config.getUpsertConfig().getDeleteRecordKey();
_partitionDedupMetadataManager = config.getPartitionDedupMetadataManager();

if (isUpsertEnabled()) {
Expand Down Expand Up @@ -508,6 +512,12 @@ public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata)
}

if (isUpsertEnabled()) {

if (StringUtils.isNotEmpty(_upsertDeleteKey) && row.getValue(_upsertDeleteKey) != null) {
_partitionUpsertMetadataManager.removeRecord(recordInfo);
return true;
}

GenericRow updatedRow = _partitionUpsertMetadataManager.updateRecord(row, recordInfo);
updateDictionary(updatedRow);
addNewRow(numDocsIndexed, updatedRow);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public class RealtimeSegmentConfig {
private final int _partitionId;
private final boolean _aggregateMetrics;
private final boolean _nullHandlingEnabled;
private final UpsertConfig.Mode _upsertMode;
private final UpsertConfig _upsertConfig;
private final String _upsertComparisonColumn;
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
Expand All @@ -77,7 +77,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
Map<String, H3IndexConfig> h3IndexConfigs, SegmentZKMetadata segmentZKMetadata, boolean offHeap,
PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn,
PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled,
String consumerDir, UpsertConfig.Mode upsertMode, String upsertComparisonColumn,
String consumerDir, UpsertConfig upsertConfig, String upsertComparisonColumn,
PartitionUpsertMetadataManager partitionUpsertMetadataManager,
PartitionDedupMetadataManager partitionDedupMetadataManager, List<FieldConfig> fieldConfigList,
List<AggregationConfig> ingestionAggregationConfigs) {
Expand Down Expand Up @@ -105,7 +105,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri
_aggregateMetrics = aggregateMetrics;
_nullHandlingEnabled = nullHandlingEnabled;
_consumerDir = consumerDir;
_upsertMode = upsertMode != null ? upsertMode : UpsertConfig.Mode.NONE;
_upsertConfig = upsertConfig;
_upsertComparisonColumn = upsertComparisonColumn;
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
Expand Down Expand Up @@ -214,8 +214,8 @@ public String getConsumerDir() {
return _consumerDir;
}

public UpsertConfig.Mode getUpsertMode() {
return _upsertMode;
public UpsertConfig getUpsertConfig() {
return _upsertConfig;
}

public boolean isDedupEnabled() {
Expand Down Expand Up @@ -267,7 +267,7 @@ public static class Builder {
private boolean _aggregateMetrics = false;
private boolean _nullHandlingEnabled = false;
private String _consumerDir;
private UpsertConfig.Mode _upsertMode;
private UpsertConfig _upsertConfig;
private String _upsertComparisonColumn;
private PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private PartitionDedupMetadataManager _partitionDedupMetadataManager;
Expand Down Expand Up @@ -405,8 +405,8 @@ public Builder setConsumerDir(String consumerDir) {
return this;
}

public Builder setUpsertMode(UpsertConfig.Mode upsertMode) {
_upsertMode = upsertMode;
public Builder setUpsertConfig(UpsertConfig upsertConfig) {
_upsertConfig = upsertConfig;
return this;
}

Expand Down Expand Up @@ -440,7 +440,7 @@ public RealtimeSegmentConfig build() {
_capacity, _avgNumMultiValues, _noDictionaryColumns, _varLengthDictionaryColumns, _invertedIndexColumns,
_textIndexColumns, _fstIndexColumns, _jsonIndexConfigs, _h3IndexConfigs, _segmentZKMetadata, _offHeap,
_memoryManager, _statsHistory, _partitionColumn, _partitionFunction, _partitionId, _aggregateMetrics,
_nullHandlingEnabled, _consumerDir, _upsertMode, _upsertComparisonColumn, _partitionUpsertMetadataManager,
_nullHandlingEnabled, _consumerDir, _upsertConfig, _upsertComparisonColumn, _partitionUpsertMetadataManager,
_partitionDedupMetadataManager, _fieldConfigList, _ingestionAggregationConfigs);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,35 @@ public void addRecord(MutableSegment segment, RecordInfo recordInfo) {
_primaryKeyToRecordLocationMap.size());
}

//TODO: Doesn't handle the edge case when an older addRecord event arrives after removeRecord
@Override
public void removeRecord(RecordInfo recordInfo) {
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction),
(primaryKey, currentRecordLocation) -> {
if (currentRecordLocation != null) {
// Existing primary key

// Mark doc as invalid when the new comparison value is greater than or equal to the current value.
// Ignore the delete request otherwise
if (recordInfo.getComparisonValue().compareTo(currentRecordLocation.getComparisonValue()) >= 0) {
IndexSegment currentSegment = currentRecordLocation.getSegment();
int currentDocId = currentRecordLocation.getDocId();
Objects.requireNonNull(currentSegment.getValidDocIds()).remove(currentDocId);
return null;
} else {
return currentRecordLocation;
}
} else {
_logger.warn("Cannot find upsert metadata for primary key: {}", recordInfo.getPrimaryKey().toString());
return null;
}
});

// Update metrics
_serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
_primaryKeyToRecordLocationMap.size());
}

@Override
public GenericRow updateRecord(GenericRow record, RecordInfo recordInfo) {
// Directly return the record when partial-upsert is not enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public interface PartitionUpsertMetadataManager extends Closeable {
*/
void addRecord(MutableSegment segment, RecordInfo recordInfo);

/**
* Removes the upsert metadata for the given segment.
*/
void removeRecord(RecordInfo recordInfo);

/**
* Replaces the upsert metadata for the old segment with the new immutable segment.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.segment.spi.creator.name.SimpleSegmentNameGenerator;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
Expand Down Expand Up @@ -306,14 +307,21 @@ private static void registerPinotFS(String fileURIScheme, String fsClass, PinotC
* 2. The ingestion config in the table config. The ingestion config (e.g. filter, complexType) can have fields which
* are not in the schema.
*/
public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) {
public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig,
@Nullable UpsertConfig upsertConfig, Schema schema) {
Set<String> fieldsForRecordExtractor = new HashSet<>();
extractFieldsFromIngestionConfig(ingestionConfig, fieldsForRecordExtractor);
extractFieldsFromSchema(schema, fieldsForRecordExtractor);
extractFieldsFromUpsertConfig(upsertConfig, fieldsForRecordExtractor);
fieldsForRecordExtractor = getFieldsToReadWithComplexType(fieldsForRecordExtractor, ingestionConfig);
return fieldsForRecordExtractor;
}

//TODO: Remove this method
public static Set<String> getFieldsForRecordExtractor(@Nullable IngestionConfig ingestionConfig, Schema schema) {
return getFieldsForRecordExtractor(ingestionConfig, null, schema);
}

/**
* Extracts the root-level names from the fields, to support the complex-type handling. For example,
* a field a.b.c will return the top-level name a.
Expand Down Expand Up @@ -351,6 +359,15 @@ private static void extractFieldsFromSchema(Schema schema, Set<String> fields) {
}
}

private static void extractFieldsFromUpsertConfig(@Nullable UpsertConfig upsertConfig, Set<String> fields) {
if (upsertConfig == null) {
return;
}
if (StringUtils.isNotEmpty(upsertConfig.getDeleteRecordKey())) {
fields.add(upsertConfig.getDeleteRecordKey());
}
}

/**
* Extracts the fields needed by a RecordExtractor from given {@link IngestionConfig}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str
when(statsHistory.getEstimatedCardinality(anyString())).thenReturn(200);
when(statsHistory.getEstimatedAvgColSize(anyString())).thenReturn(32);

UpsertConfig.Mode upsertMode = upsertConfig == null ? UpsertConfig.Mode.NONE : upsertConfig.getMode();
String comparisonColumn = upsertConfig == null ? null : upsertConfig.getComparisonColumn();
RealtimeSegmentConfig realtimeSegmentConfig =
new RealtimeSegmentConfig.Builder().setTableNameWithType(TABLE_NAME_WITH_TYPE).setSegmentName(SEGMENT_NAME)
Expand All @@ -104,8 +103,8 @@ public static MutableSegmentImpl createMutableSegmentImpl(Schema schema, Set<Str
.setVarLengthDictionaryColumns(varLengthDictionaryColumns).setInvertedIndexColumns(invertedIndexColumns)
.setSegmentZKMetadata(new SegmentZKMetadata(SEGMENT_NAME))
.setMemoryManager(new DirectMemoryManager(SEGMENT_NAME)).setStatsHistory(statsHistory)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled).setUpsertMode(upsertMode)
.setUpsertComparisonColumn(comparisonColumn)
.setAggregateMetrics(aggregateMetrics).setNullHandlingEnabled(nullHandlingEnabled)
.setUpsertConfig(upsertConfig).setUpsertComparisonColumn(comparisonColumn)
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setIngestionAggregationConfigs(aggregationConfigs).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,13 @@ public void testAddRecord() {
verifyAddRecord(HashFunction.MURMUR3);
}

@Test
public void testRemoveRecord() {
verifyRemoveRecord(HashFunction.NONE);
verifyRemoveRecord(HashFunction.MD5);
verifyRemoveRecord(HashFunction.MURMUR3);
}

private void verifyAddRecord(HashFunction hashFunction) {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
Expand Down Expand Up @@ -341,6 +348,89 @@ private void verifyAddRecord(HashFunction hashFunction) {
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 3});
}


private void verifyRemoveRecord(HashFunction hashFunction) {
ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, Collections.singletonList("pk"),
"timeCol", hashFunction, null, false, mock(ServerMetrics.class));
Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap;

// Add the first segment
// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
int numRecords = 3;
int[] primaryKeys = new int[]{0, 1, 2};
int[] timestamps = new int[]{100, 120, 100};
ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap();
ImmutableSegmentImpl segment1 = mockImmutableSegment(1, validDocIds1, getPrimaryKeyList(numRecords, primaryKeys));
upsertMetadataManager.addSegment(segment1, validDocIds1,
getRecordInfoList(numRecords, primaryKeys, timestamps).iterator());

// Update records from the second segment
ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap();
MutableSegment segment2 = mockMutableSegment(1, validDocIds2);
upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(100)));

// segment1: 0 -> {0, 100}, 1 -> {1, 120}, 2 -> {2, 100}
// segment2: 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0});

upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(2), 1, new IntWrapper(120)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});

upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
assertEquals(recordLocationMap.size(), 4);
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment1, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});

upsertMetadataManager.removeRecord(new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(120)));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
assertEquals(recordLocationMap.size(), 3);
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
checkRecordLocation(recordLocationMap, 3, segment2, 0, 100, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 1});


upsertMetadataManager.removeRecord(new RecordInfo(makePrimaryKey(3), 0, new IntWrapper(120)));
// segment1: 1 -> {1, 120}
// segment2: 0 -> {3, 100}, 2 -> {1, 120}, 3 -> {0, 100}
assertEquals(recordLocationMap.size(), 2);
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1});

upsertMetadataManager.addRecord(segment2, new RecordInfo(makePrimaryKey(1), 2, new IntWrapper(100)));
// segment1: 0 -> {0, 100}, 1 -> {1, 120}
// segment2: 2 -> {1, 120}, 3 -> {0, 100}
assertEquals(recordLocationMap.size(), 3);
checkRecordLocation(recordLocationMap, 0, segment1, 0, 100, hashFunction);
checkRecordLocation(recordLocationMap, 1, segment2, 2, 100, hashFunction);
checkRecordLocation(recordLocationMap, 2, segment2, 1, 120, hashFunction);
assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0});
assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{1, 2});
}

@Test
public void testHashPrimaryKey() {
PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});
Expand Down
Loading