Conversation
Codecov Report
@@ Coverage Diff @@
## master #8708 +/- ##
=============================================
- Coverage 69.81% 14.13% -55.69%
+ Complexity 4622 168 -4454
=============================================
Files 1735 1695 -40
Lines 91320 89448 -1872
Branches 13644 13440 -204
=============================================
- Hits 63759 12642 -51117
- Misses 23144 75868 +52724
+ Partials 4417 938 -3479
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
f684376 to
3d42ad5
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Good job extracting several common properties from upsert and dedup
There was a problem hiding this comment.
Suggest modeling it as a util class (TableStateUtils) and have one static method public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType). The _allSegmentsLoaded can still be tracked within the metadata manager. We don't want this util class to track the loaded flag, instead it should always re-calculate the state.
There was a problem hiding this comment.
_allSegmentsLoaded will need to present in both upsert and dedupe metadata classes separately. Here its with just one instance of this class. Is that okay?
There was a problem hiding this comment.
The reason why I suggest modeling this class as a util and not tracking _allSegmentsLoaded within this class is because we may reuse this util method for other features, and we don't want to couple this "check once then always true" semantic into this util method/class
There was a problem hiding this comment.
(minor) Let's rename it to TableStateUtils
There was a problem hiding this comment.
IMO it is okay to increase the value since we are just tracking the row count fed into the index(). We should use another metrics to track the rows ignored because of the dedup
There was a problem hiding this comment.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
There was a problem hiding this comment.
Ack. I think with the metric added, this is no longer needed
There was a problem hiding this comment.
This flag is redundant. It is implicit on the presence of partitionDedupMetadataManager
There was a problem hiding this comment.
This flag is redundant, and is implicit on the presence of _tableDedupMetadataManager
There was a problem hiding this comment.
| .format("PartitionGroupId is not available for segment: '%s' (upsert-enabled table: %s)", segmentName, | |
| .format("PartitionGroupId is not available for segment: '%s' (dedup-enabled table: %s)", segmentName, |
There was a problem hiding this comment.
We should remove this method. The hash function can come from both upsert config and dedup config
There was a problem hiding this comment.
(minor) We don't usually put final for local variables or parameters
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Non-null config doesn't mean it is enabled
| if (tableConfig.getUpsertConfig() != null) { | |
| if (tableConfig.getUpsertMode() != UpsertConfig.Mode.NONE) { |
There was a problem hiding this comment.
Non-null dedup config doesn't mean it is enabled. We either remove the dedupEnabled field and treat non-null dedup as dedup-enabled, or check the flag.
There was a problem hiding this comment.
Idea here is, if the config json doesn't have dedupeConfig field, no need to run the validaiton
There was a problem hiding this comment.
Understood. We should also skip the validation when DedupConfig is available, but dedup is not enabled
d41aa14 to
f594261
Compare
Jackie-Jiang
left a comment
There was a problem hiding this comment.
LGTM with some non-blocking comments. Good job!
There was a problem hiding this comment.
(minor) Since we already track the dropped records, we can remove this TODO and consider changing it to a comment
| private final HashFunction _hashFunction; | ||
| private boolean _allSegmentsLoaded; | ||
|
|
||
| // TODO(saurabh) : We can replace this with a ocncurrent Set |
There was a problem hiding this comment.
(minor) Remove this TODO
|
|
||
| // TODO(saurabh) : We can replace this with a ocncurrent Set | ||
| @VisibleForTesting | ||
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeySet = new ConcurrentHashMap<>(); | |
| final ConcurrentHashMap<Object, IndexSegment> _primaryKeyToSegmentMap = new ConcurrentHashMap<>(); |
| } | ||
|
|
||
| public boolean checkRecordPresentOrUpdate(RecordInfo recordInfo, IndexSegment indexSegment) { | ||
| if (!_allSegmentsLoaded) { |
There was a problem hiding this comment.
Let's move the if check into the waitTillAllSegmentsLoaded() for thread safety. It is single threaded now, but in case that changes
There was a problem hiding this comment.
Could you help me understand the thread safety concerns with this? I don't any, single threaded or multi threaded.
Infact, moving this if check inside waitTillAllSegmentsLoaded() would lead to unnecessary serialization even when all segments have already been loaded. Even in single threaded env, that's a heavy lock acquisition cost, when _allSegmentsLoaded is already true.
To the point where, I think we should reduce the critical section here https://github.com/apache/pinot/blob/master/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java#L74, once _allSegmentsLoaded has been set to true, no need to enter a syncronized block.
Do let me know your thoughts
There was a problem hiding this comment.
I misused the word thread-safety. I was suggesting adding an extra if check in the synchronized block to avoid potential unnecessary checks when multiple threads invoke waitTillAllSegmentsLoaded().
Good point on reducing the critical section in PartialUpsertHandler. We should first check the flag, then enter the critical section
| } | ||
|
|
||
| if (isDedupEnabled() && _partitionDedupMetadataManager.checkRecordPresentOrUpdate(recordInfo, this)) { | ||
| _logger.info("Dropped row {} since its primary key already exists", row); |
There was a problem hiding this comment.
Don't log anything here, it can flood the log
| if (_serverMetrics != null) { | ||
| _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_DEDUP_DROPPED, 1); | ||
| } | ||
| return numDocsIndexed < _capacity; |
There was a problem hiding this comment.
(minor)
| return numDocsIndexed < _capacity; | |
| return true; |
There was a problem hiding this comment.
(minor) Let's rename it to TableStateUtils
| "description" : "second", | ||
| "secondsSinceEpoch": 1567205392 | ||
| } | ||
| ] No newline at end of file |
| } | ||
| }, | ||
| "primaryKeyColumns": ["event_id"] | ||
| } No newline at end of file |
| } | ||
|
|
||
| @VisibleForTesting | ||
| public static Iterator<RecordInfo> getRecordInfoIterator(IndexSegment segment, List<String> primaryKeyColumns) { |
There was a problem hiding this comment.
Suggest returning an iterator of PrimaryKey. For dedup, we don't need the docId and comparisonValue information from the RecordInfo. Similar for the checkRecordPresentOrUpdate() which can just take the PrimaryKey object. This is not a blocker, so maybe put a TODO and address it later
There was a problem hiding this comment.
Ack. Didn't see any big impact of changing the method signature to accept PK, hence made that change too.
693659f to
b56bfe2
Compare
|
@saurabhd336 Please add documentation for this in |
This PR adds support for enabling deduplication for realtime table, via a top level table config. At a high level, primaryKey (as defined in the table schema) hashes are stored into in-memory data structures, and each incoming row is validated against it. Duplicate rows are dropped. The expectation while using this feature, is for the stream to be partitioned by the primary key,
strictReplicaGrouprouting to be enabled and the configured stream consumer type to belowLevel. These requirements are therefore mandated via tableConfig API's input validations.Design doc: https://docs.google.com/document/d/17sOSRQ1slff30z7jDc0ec5qKwv0xSfPkDjpMOY07POQ/edit?usp=sharing
How to use
https://docs.pinot.apache.org/basics/data-import/dedup