changing the dedup store to become pluggable#10639
Conversation
| public void compact() { | ||
| } | ||
|
|
||
| private static final class ByteArray { |
There was a problem hiding this comment.
We need this because java.lang.Array picks gets it's hashCode() & equals() method from java.lang.Object which does not work for HashMap keys.
https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Array.html
There was a problem hiding this comment.
There was a problem hiding this comment.
Will make the change.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #10639 +/- ##
============================================
- Coverage 70.35% 70.27% -0.09%
+ Complexity 6464 6433 -31
============================================
Files 2103 2114 +11
Lines 112769 114072 +1303
Branches 16981 17227 +246
============================================
+ Hits 79341 80160 +819
- Misses 27877 28306 +429
- Partials 5551 5606 +55
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
|
||
| private final Map<ByteArray, byte[]> _map; | ||
|
|
||
| public ConcurrentHashMapKeyValueStore(byte[] id) { |
There was a problem hiding this comment.
For concurrent hashmap this is unused, I will remove it.
| _hashFunction = hashFunction; | ||
| _dedupConfig = dedupConfig; | ||
| try { | ||
| byte[] id = (tableNameWithType + "@" + partitionId).getBytes(); |
There was a problem hiding this comment.
why do we need table name in the key?
There was a problem hiding this comment.
We don't need it for concurrent hashmap. But for persistent stores, this is needed to identify which table-partition combination we are storing/querying for.
| if (pk instanceof ByteArray) { | ||
| return ((ByteArray) pk).getBytes(); | ||
| } | ||
| throw new RuntimeException("Invalid primary key: " + pk); |
There was a problem hiding this comment.
the msg can better be "unsupported pk class"
There was a problem hiding this comment.
Will make the change.
| } | ||
|
|
||
| public String getKeyStore() { | ||
| return _keyStore; |
There was a problem hiding this comment.
for @JsonProperty, if a default value has not been unspecified it is taken to be empty string
There was a problem hiding this comment.
right, can you add a comment on the default impl of this store?
There was a problem hiding this comment.
Will make the change.
|
|
||
| void putBatch(List<Pair<byte[], byte[]>> keyValues); | ||
|
|
||
| long getKeyCount(); |
There was a problem hiding this comment.
does this need to be long?
There was a problem hiding this comment.
At 1K/sec this will get 1B keys in 12 days and int only goes till 2B.
|
LGTM. @Jackie-Jiang can you also take a look? |
| import org.apache.commons.lang3.tuple.Pair; | ||
|
|
||
| public interface LocalKeyValueStore { | ||
| byte[] get(byte[] key); |
There was a problem hiding this comment.
Can we use Generics here to allow the ConcurrentHashMap kv-store to skip serialization?
When we stick with in-memory hash-map we wouldn't want to add the overhead of serialization during ingestion.
There was a problem hiding this comment.
If we try generics, the code will have to know that serialization is not needed so, it will be coupled to the ConcurrentHashMapKeyValueStore implementation. This is undesirable. The serialization that we are using is simple enough to not add significant overhead.
| import org.testng.annotations.BeforeMethod; | ||
| import org.testng.annotations.Test; | ||
|
|
||
| public class ConcurrentHashMapKeyValueStoreTest { |
There was a problem hiding this comment.
Code format seems incorrect. (we use 2 spaces instead of 4 for indent)
There was a problem hiding this comment.
Will fix. Unsure why checkstyle + CI didn't catch this.
| _primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null; | ||
| if (!present) { | ||
| byte[] keyBytes = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction())); | ||
| if (Objects.isNull(_keyValueStore.get(keyBytes))) { |
There was a problem hiding this comment.
This is a non-atomic operation (contrast with putIfAbsent).
There was a problem hiding this comment.
Will fix this by bringing back the putIfAbsent.
| public void put(byte[] key, byte[] value) { | ||
| _map.put(new ByteArray(key), value); | ||
| } | ||
| public byte[] putIfAbsent(byte[] key, byte[] value) { |
There was a problem hiding this comment.
Missing space between methods.
| void delete(byte[] key); | ||
|
|
||
| void put(byte[] key, byte[] value); | ||
| byte[] putIfAbsent(byte[] key, byte[] value); |
There was a problem hiding this comment.
Missing space between methods.
| import java.util.List; | ||
| import org.apache.commons.lang3.tuple.Pair; | ||
|
|
||
| public interface LocalKeyValueStore { |
There was a problem hiding this comment.
Let's also add doc comments. Particularly for compact.
Jackie-Jiang
left a comment
There was a problem hiding this comment.
Making it pluggable at KV store level will add extra overhead to the default implementation because it will force us to serialize everything. Instead, we can make it pluggable at metadata manager level so that there is no performance penalty to the default implementation. You may take a look at #9186 of how we made upsert metadata manager pluggable. This PR should be very similar to that
@Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well. |
Jackie-Jiang
left a comment
There was a problem hiding this comment.
@Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.
It is not that trivial comparing to other operations when updating a record. Adding a record to KV store could be much cheaper than serializing a primary key, meaning this change could cause several times performance degradation. Also, certain operation requires reference comparison, and should be handled differently if reference comparison is not possible
| } | ||
| }); | ||
| byte[] pkBytes = serializePrimaryKey(pk); | ||
| if (Objects.deepEquals(_keyValueStore.get(pkBytes), segmentBytes)) { |
There was a problem hiding this comment.
This can potentially cause race condition when the segment is added again during the segment removal, and that's why we perform the reference check before
@raghavgautam maybe we can have some quick benchmark to see how much overhead it adds, and it can be helpful to decide which layer we shall do the abstraction |
|
@Jackie-Jiang I have addressed your comment. Can you take another look ? |
|
@Jackie-Jiang I made a pass on the most recent change, and it looks good to me. Feel free to review this post-merge when you are back. |
This patch is the first patch for memory problem discussed in #10571
It makes the LocalKeyValue store to be pluggable. Currently, it is hard-coded to be ConcurrentHashMap. This will make it easy to plugin different implementations.