Skip to content

changing the dedup store to become pluggable#10639

Merged
yupeng9 merged 12 commits intoapache:masterfrom
raghavgautam:pluggable-dedup
May 9, 2023
Merged

changing the dedup store to become pluggable#10639
yupeng9 merged 12 commits intoapache:masterfrom
raghavgautam:pluggable-dedup

Conversation

@raghavgautam
Copy link
Contributor

This patch is the first patch for memory problem discussed in #10571

It makes the LocalKeyValue store to be pluggable. Currently, it is hard-coded to be ConcurrentHashMap. This will make it easy to plugin different implementations.

public void compact() {
}

private static final class ByteArray {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this because java.lang.Array picks gets it's hashCode() & equals() method from java.lang.Object which does not work for HashMap keys.
https://docs.oracle.com/javase/8/docs/api/java/lang/reflect/Array.html

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change.

@codecov-commenter
Copy link

codecov-commenter commented Apr 18, 2023

Codecov Report

❌ Patch coverage is 67.05882% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 70.27%. Comparing base (f6c6d14) to head (724aec9).
⚠️ Report is 4035 commits behind head on master.

Files with missing lines Patch % Lines
...up/ConcurrentMapPartitionDedupMetadataManager.java 66.00% 17 Missing ⚠️
.../local/dedup/TableDedupMetadataManagerFactory.java 47.05% 7 Missing and 2 partials ⚠️
...ent/local/dedup/BaseTableDedupMetadataManager.java 81.81% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #10639      +/-   ##
============================================
- Coverage     70.35%   70.27%   -0.09%     
+ Complexity     6464     6433      -31     
============================================
  Files          2103     2114      +11     
  Lines        112769   114072    +1303     
  Branches      16981    17227     +246     
============================================
+ Hits          79341    80160     +819     
- Misses        27877    28306     +429     
- Partials       5551     5606      +55     
Flag Coverage Δ
integration1 24.30% <0.00%> (-0.17%) ⬇️
integration2 24.03% <1.17%> (-0.22%) ⬇️
unittests1 67.82% <65.88%> (-0.01%) ⬇️
unittests2 13.81% <0.00%> (-0.07%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.


private final Map<ByteArray, byte[]> _map;

public ConcurrentHashMapKeyValueStore(byte[] id) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not use the input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For concurrent hashmap this is unused, I will remove it.

_hashFunction = hashFunction;
_dedupConfig = dedupConfig;
try {
byte[] id = (tableNameWithType + "@" + partitionId).getBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need table name in the key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need it for concurrent hashmap. But for persistent stores, this is needed to identify which table-partition combination we are storing/querying for.

if (pk instanceof ByteArray) {
return ((ByteArray) pk).getBytes();
}
throw new RuntimeException("Invalid primary key: " + pk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the msg can better be "unsupported pk class"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change.

}

public String getKeyStore() {
return _keyStore;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has a default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for @JsonProperty, if a default value has not been unspecified it is taken to be empty string

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, can you add a comment on the default impl of this store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make the change.


void putBatch(List<Pair<byte[], byte[]>> keyValues);

long getKeyCount();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this need to be long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At 1K/sec this will get 1B keys in 12 days and int only goes till 2B.

@yupeng9
Copy link
Contributor

yupeng9 commented Apr 20, 2023

LGTM. @Jackie-Jiang can you also take a look?

import org.apache.commons.lang3.tuple.Pair;

public interface LocalKeyValueStore {
byte[] get(byte[] key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Generics here to allow the ConcurrentHashMap kv-store to skip serialization?

When we stick with in-memory hash-map we wouldn't want to add the overhead of serialization during ingestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we try generics, the code will have to know that serialization is not needed so, it will be coupled to the ConcurrentHashMapKeyValueStore implementation. This is undesirable. The serialization that we are using is simple enough to not add significant overhead.

import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

public class ConcurrentHashMapKeyValueStoreTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code format seems incorrect. (we use 2 spaces instead of 4 for indent)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. Unsure why checkstyle + CI didn't catch this.

_primaryKeyToSegmentMap.putIfAbsent(HashUtils.hashPrimaryKey(pk, _hashFunction), indexSegment) != null;
if (!present) {
byte[] keyBytes = serializePrimaryKey(HashUtils.hashPrimaryKey(pk, _dedupConfig.getHashFunction()));
if (Objects.isNull(_keyValueStore.get(keyBytes))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a non-atomic operation (contrast with putIfAbsent).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this by bringing back the putIfAbsent.

public void put(byte[] key, byte[] value) {
_map.put(new ByteArray(key), value);
}
public byte[] putIfAbsent(byte[] key, byte[] value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

void delete(byte[] key);

void put(byte[] key, byte[] value);
byte[] putIfAbsent(byte[] key, byte[] value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space between methods.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed.

import java.util.List;
import org.apache.commons.lang3.tuple.Pair;

public interface LocalKeyValueStore {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also add doc comments. Particularly for compact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making it pluggable at KV store level will add extra overhead to the default implementation because it will force us to serialize everything. Instead, we can make it pluggable at metadata manager level so that there is no performance penalty to the default implementation. You may take a look at #9186 of how we made upsert metadata manager pluggable. This PR should be very similar to that

@raghavgautam
Copy link
Contributor Author

Making it pluggable at KV store level will add extra overhead to the default implementation because it will force us to serialize everything. Instead, we can make it pluggable at metadata manager level so that there is no performance penalty to the default implementation. You may take a look at #9186 of how we made upsert metadata manager pluggable. This PR should be very similar to that

@Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.

Copy link
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.

It is not that trivial comparing to other operations when updating a record. Adding a record to KV store could be much cheaper than serializing a primary key, meaning this change could cause several times performance degradation. Also, certain operation requires reference comparison, and should be handled differently if reference comparison is not possible

}
});
byte[] pkBytes = serializePrimaryKey(pk);
if (Objects.deepEquals(_keyValueStore.get(pkBytes), segmentBytes)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can potentially cause race condition when the segment is added again during the segment removal, and that's why we perform the reference check before

@yupeng9
Copy link
Contributor

yupeng9 commented May 1, 2023

@Jackie-Jiang The serialization that we are using is simple enough to not add significant overhead. It will probably cost 100 or so CPU cycles per record. Moreover, this overhead happens only during ingestion time and does not affect query performance. Keeping the LocalKeyValueStore pluggable will make it reusable in other places as well.

It is not that trivial comparing to other operations when updating a record. Adding a record to KV store could be much cheaper than serializing a primary key, meaning this change could cause several times performance degradation. Also, certain operation requires reference comparison, and should be handled differently if reference comparison is not possible

@raghavgautam maybe we can have some quick benchmark to see how much overhead it adds, and it can be helpful to decide which layer we shall do the abstraction

@raghavgautam
Copy link
Contributor Author

@Jackie-Jiang I have addressed your comment. Can you take another look ?

@raghavgautam raghavgautam requested a review from Jackie-Jiang May 3, 2023 22:35
@yupeng9
Copy link
Contributor

yupeng9 commented May 9, 2023

@Jackie-Jiang I made a pass on the most recent change, and it looks good to me. Feel free to review this post-merge when you are back.

@yupeng9 yupeng9 merged commit 53469c0 into apache:master May 9, 2023
@raghavgautam raghavgautam deleted the pluggable-dedup branch May 10, 2023 00:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants