Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
RESIZE_TIME_MS("milliseconds", false),
// Upsert metrics
UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false);
UPSERT_PRIMARY_KEYS_COUNT("upsertPrimaryKeysCount", false),
// Dedup metrics
DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false);

private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
REALTIME_PARTITION_MISMATCH("mismatch", false),
REALTIME_DEDUP_DROPPED("rows", false),
ROWS_WITH_ERRORS("rows", false),
LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import org.apache.commons.collections.MapUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.QueryConfig;
Expand Down Expand Up @@ -131,6 +132,12 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)
upsertConfig = JsonUtils.stringToObject(upsertConfigString, UpsertConfig.class);
}

DedupConfig dedupConfig = null;
String dedupConfigString = simpleFields.get(TableConfig.DEDUP_CONFIG_KEY);
if (dedupConfigString != null) {
dedupConfig = JsonUtils.stringToObject(dedupConfigString, DedupConfig.class);
}

IngestionConfig ingestionConfig = null;
String ingestionConfigString = simpleFields.get(TableConfig.INGESTION_CONFIG_KEY);
if (ingestionConfigString != null) {
Expand All @@ -153,7 +160,7 @@ public static TableConfig fromZNRecord(ZNRecord znRecord)

return new TableConfig(tableName, tableType, validationConfig, tenantConfig, indexingConfig, customConfig,
quotaConfig, taskConfig, routingConfig, queryConfig, instanceAssignmentConfigMap, fieldConfigList, upsertConfig,
ingestionConfig, tierConfigList, isDimTable, tunerConfigList);
dedupConfig, ingestionConfig, tierConfigList, isDimTable, tunerConfigList);
}

public static ZNRecord toZNRecord(TableConfig tableConfig)
Expand Down Expand Up @@ -200,6 +207,10 @@ public static ZNRecord toZNRecord(TableConfig tableConfig)
if (upsertConfig != null) {
simpleFields.put(TableConfig.UPSERT_CONFIG_KEY, JsonUtils.objectToString(upsertConfig));
}
DedupConfig dedupConfig = tableConfig.getDedupConfig();
if (dedupConfig != null) {
simpleFields.put(TableConfig.DEDUP_CONFIG_KEY, JsonUtils.objectToString(dedupConfig));
}
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
if (ingestionConfig != null) {
simpleFields.put(TableConfig.INGESTION_CONFIG_KEY, JsonUtils.objectToString(ingestionConfig));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import java.util.Map;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.spi.config.table.CompletionConfig;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
Expand Down Expand Up @@ -248,15 +250,22 @@ public void testSerDe()
}
{
// with upsert config
UpsertConfig upsertConfig =
new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", UpsertConfig.HashFunction.NONE);
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL, null, null, "comparison", HashFunction.NONE);

TableConfig tableConfig = tableConfigBuilder.setUpsertConfig(upsertConfig).build();

// Serialize then de-serialize
checkTableConfigWithUpsertConfig(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));
checkTableConfigWithUpsertConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// with dedup config
DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5);
TableConfig tableConfig = tableConfigBuilder.setDedupConfig(dedupConfig).build();
// Serialize then de-serialize
checkTableConfigWithDedupConfig(JsonUtils.stringToObject(tableConfig.toJsonString(), TableConfig.class));
checkTableConfigWithDedupConfig(TableConfigUtils.fromZNRecord(TableConfigUtils.toZNRecord(tableConfig)));
}
{
// with SegmentsValidationAndRetentionConfig
TableConfig tableConfig = tableConfigBuilder.setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL).build();
Expand Down Expand Up @@ -542,4 +551,12 @@ private void checkTableConfigWithUpsertConfig(TableConfig tableConfig) {

assertEquals(upsertConfig.getMode(), UpsertConfig.Mode.FULL);
}

private void checkTableConfigWithDedupConfig(TableConfig tableConfig) {
DedupConfig dedupConfig = tableConfig.getDedupConfig();
assertNotNull(dedupConfig);

assertTrue(dedupConfig.isDedupEnabled());
assertEquals(dedupConfig.getHashFunction(), HashFunction.MD5);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.segment.local.realtime.converter.RealtimeSegmentConverter;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig;
Expand Down Expand Up @@ -1210,7 +1211,8 @@ public void stop()
public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig,
RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig,
Schema schema, LLCSegmentName llcSegmentName, Semaphore partitionGroupConsumerSemaphore,
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
ServerMetrics serverMetrics, @Nullable PartitionUpsertMetadataManager partitionUpsertMetadataManager,
@Nullable PartitionDedupMetadataManager partitionDedupMetadataManager) {
_segBuildSemaphore = realtimeTableDataManager.getSegmentBuildSemaphore();
_segmentZKMetadata = segmentZKMetadata;
_tableConfig = tableConfig;
Expand Down Expand Up @@ -1325,7 +1327,7 @@ public LLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableCo
.setNullHandlingEnabled(_nullHandlingEnabled)
.setConsumerDir(consumerDir).setUpsertMode(tableConfig.getUpsertMode())
.setPartitionUpsertMetadataManager(partitionUpsertMetadataManager)
.setHashFunction(tableConfig.getHashFunction())
.setPartitionDedupMetadataManager(partitionDedupMetadataManager)
.setUpsertComparisonColumn(tableConfig.getUpsertComparisonColumn())
.setFieldConfigList(tableConfig.getFieldConfigList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
import org.apache.pinot.core.data.manager.BaseTableDataManager;
import org.apache.pinot.core.util.PeerServerSegmentFinder;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.dedup.TableDedupMetadataManager;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory;
Expand All @@ -60,9 +62,12 @@
import org.apache.pinot.segment.local.upsert.PartialUpsertHandler;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
import org.apache.pinot.segment.local.upsert.TableUpsertMetadataManager;
import org.apache.pinot.segment.local.utils.RecordInfo;
import org.apache.pinot.segment.local.utils.SchemaUtils;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
Expand Down Expand Up @@ -112,6 +117,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager {

private UpsertConfig.Mode _upsertMode;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
private TableDedupMetadataManager _tableDedupMetadataManager;
private List<String> _primaryKeyColumns;
private String _upsertComparisonColumn;

Expand Down Expand Up @@ -156,6 +162,17 @@ protected void doInit() {
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType);
_upsertMode = tableConfig.getUpsertMode();
if (tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled()) {
Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType);
Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType);
_primaryKeyColumns = schema.getPrimaryKeyColumns();
DedupConfig dedupConfig = tableConfig.getDedupConfig();
HashFunction dedupHashFunction = dedupConfig.getHashFunction();
_tableDedupMetadataManager =
new TableDedupMetadataManager(_helixManager, _tableNameWithType, _primaryKeyColumns, _serverMetrics,
dedupHashFunction);
}

if (isUpsertEnabled()) {
UpsertConfig upsertConfig = tableConfig.getUpsertConfig();
assert upsertConfig != null;
Expand All @@ -172,7 +189,7 @@ protected void doInit() {
upsertConfig.getPartialUpsertStrategies(), upsertConfig.getDefaultPartialUpsertStrategy(),
comparisonColumn);
}
UpsertConfig.HashFunction hashFunction = upsertConfig.getHashFunction();
HashFunction hashFunction = upsertConfig.getHashFunction();
_tableUpsertMetadataManager =
new TableUpsertMetadataManager(_tableNameWithType, _serverMetrics, partialUpsertHandler, hashFunction);
_primaryKeyColumns = schema.getPrimaryKeyColumns();
Expand Down Expand Up @@ -244,6 +261,10 @@ public String getConsumerDir() {
return consumerDir.getAbsolutePath();
}

public boolean isDedupEnabled() {
return _tableDedupMetadataManager != null;
}

public boolean isUpsertEnabled() {
return _upsertMode != UpsertConfig.Mode.NONE;
}
Expand Down Expand Up @@ -336,9 +357,13 @@ public void addSegment(String segmentName, TableConfig tableConfig, IndexLoading
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
_tableUpsertMetadataManager != null ? _tableUpsertMetadataManager.getOrCreatePartitionManager(
partitionGroupId) : null;
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ? _tableDedupMetadataManager
.getOrCreatePartitionManager(partitionGroupId) : null;
segmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager);
indexLoadingConfig, schema, llcSegmentName, semaphore, _serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager);
} else {
InstanceZKMetadata instanceZKMetadata = ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, instanceZKMetadata, this,
Expand All @@ -355,9 +380,27 @@ public void addSegment(ImmutableSegment immutableSegment) {
if (isUpsertEnabled()) {
handleUpsert((ImmutableSegmentImpl) immutableSegment);
}

if (isDedupEnabled()) {
buildDedupMeta((ImmutableSegmentImpl) immutableSegment);
}
super.addSegment(immutableSegment);
}

private void buildDedupMeta(ImmutableSegmentImpl immutableSegment) {
// TODO(saurabh) refactor commons code with handleUpsert
String segmentName = immutableSegment.getSegmentName();
Integer partitionGroupId = SegmentUtils
.getRealtimeSegmentPartitionId(segmentName, _tableNameWithType, _helixManager, _primaryKeyColumns.get(0));
Preconditions.checkNotNull(partitionGroupId, String
.format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName,
_tableNameWithType));
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
immutableSegment.enableDedup(partitionDedupMetadataManager);
partitionDedupMetadataManager.addSegment(immutableSegment);
}

private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
String segmentName = immutableSegment.getSegmentName();
Integer partitionGroupId = SegmentUtils
Expand All @@ -378,8 +421,8 @@ private void handleUpsert(ImmutableSegmentImpl immutableSegment) {
.put(_upsertComparisonColumn, new PinotSegmentColumnReader(immutableSegment, _upsertComparisonColumn));
int numTotalDocs = immutableSegment.getSegmentMetadata().getTotalDocs();
int numPrimaryKeyColumns = _primaryKeyColumns.size();
Iterator<PartitionUpsertMetadataManager.RecordInfo> recordInfoIterator =
new Iterator<PartitionUpsertMetadataManager.RecordInfo>() {
Iterator<RecordInfo> recordInfoIterator =
new Iterator<RecordInfo>() {
private int _docId = 0;

@Override
Expand All @@ -388,7 +431,7 @@ public boolean hasNext() {
}

@Override
public PartitionUpsertMetadataManager.RecordInfo next() {
public RecordInfo next() {
Object[] values = new Object[numPrimaryKeyColumns];
for (int i = 0; i < numPrimaryKeyColumns; i++) {
Object value = columnToReaderMap.get(_primaryKeyColumns.get(i)).getValue(_docId);
Expand All @@ -401,7 +444,7 @@ public PartitionUpsertMetadataManager.RecordInfo next() {
Object upsertComparisonValue = columnToReaderMap.get(_upsertComparisonColumn).getValue(_docId);
Preconditions.checkState(upsertComparisonValue instanceof Comparable,
"Upsert comparison column: %s must be comparable", _upsertComparisonColumn);
return new PartitionUpsertMetadataManager.RecordInfo(primaryKey, _docId++,
return new RecordInfo(primaryKey, _docId++,
(Comparable) upsertComparisonValue);
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ public FakeLLRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, Tab
throws Exception {
super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir,
new IndexLoadingConfig(makeInstanceDataManagerConfig(), tableConfig), schema, llcSegmentName,
semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null);
semaphoreMap.get(llcSegmentName.getPartitionGroupId()), serverMetrics, null, null);
_state = LLRealtimeSegmentDataManager.class.getDeclaredField("_state");
_state.setAccessible(true);
_shouldStop = LLRealtimeSegmentDataManager.class.getDeclaredField("_shouldStop");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
Expand Down Expand Up @@ -125,7 +125,7 @@ public void loadSegment()
_upsertIndexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.heap);
((ImmutableSegmentImpl) _upsertIndexSegment).enableUpsert(
new PartitionUpsertMetadataManager("testTable_REALTIME", 0, serverMetrics, null,
UpsertConfig.HashFunction.NONE), new ThreadSafeMutableRoaringBitmap());
HashFunction.NONE), new ThreadSafeMutableRoaringBitmap());
}

@AfterClass
Expand Down
Loading