Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] Allow the content be empty when deserialize in MetadataCache #23131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String

try {
GetResult res = optRes.get();
if (res.getValue().length == 0) {
return FutureUtils.value(Optional.of(new CacheGetResult<>(null, res.getStat())));
}
Comment on lines +111 to +113
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There would be less code duplication if this change would be handled in a way where obj is set to null when res.getValue().length == 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for example T obj = res.getValue().length > 0 ? serde.deserialize(path, res.getValue(), res.getStat()) : null;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BookieServiceInfo handled the case of res.getValue.length==0 in deserialize

if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
}

That's why the test TestAutoRecoveryAlongWithBookieServers#testAutoRecoveryAlongWithBookieServers failed.

Maybe we need to deserialize first and add the res.getValue.length==0 check in the exception section?

Copy link
Member Author

@zymap zymap Aug 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hangc0276 Thank you! I realized I was wrong to do this fix here because the byte[0] may be a magic value for the SerDe implementation. It should be handled in the SerDe classes.

T obj = serde.deserialize(path, res.getValue(), res.getStat());
return FutureUtils
.value(Optional.of(new CacheGetResult<>(obj, res.getStat())));
Expand Down Expand Up @@ -152,10 +155,14 @@ public CompletableFuture<T> readModifyUpdateOrCreate(String path, Function<Optio
try {
// Use clone and CAS zk to ensure thread safety
clone = serde.deserialize(path, serde.serialize(path, entry.getValue()), entry.getStat());
if (clone == null) {
currentValue = Optional.empty();
} else {
currentValue = Optional.of(clone);
}
} catch (IOException e) {
return FutureUtils.exception(e);
}
currentValue = Optional.of(clone);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of making the change above, Optional.of could be changed to Optional.ofNullable to achieve the same outcome.

expectedVersion = entry.getStat().getVersion();
} else {
currentValue = Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
package org.apache.pulsar.metadata;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotSame;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
Expand Down Expand Up @@ -445,6 +447,26 @@ public void testCloneInReadModifyUpdateOrCreate(String provider, Supplier<String

}

@Test(dataProvider = "impl")
public void testReadModifyUpdateOrCreateWithEmptyValue(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

String path = "/testReadModifyUpdateOrCreateWithEmptyValue";
store.put(path, new byte[0], Optional.of(-1L)).get();

MetadataCache<Policies> objCache = store.getMetadataCache(Policies.class);

Optional<Policies> policies = objCache.get(path).get();
assertFalse(policies.isPresent());
Policies policies1 = objCache.readModifyUpdateOrCreate(path, (rp) -> {
Policies p = rp.orElse(new Policies());
p.max_unacked_messages_per_consumer = 100;
return p;
}).get();
assertEquals(policies1.max_unacked_messages_per_consumer.intValue(), 100);
}

@Test(dataProvider = "impl")
public void readModifyUpdate(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
Expand Down Expand Up @@ -475,6 +497,36 @@ public void readModifyUpdate(String provider, Supplier<String> urlSupplier) thro
}
}

@Test(dataProvider = "impl")
public void testReadModifyUpdateWithEmptyValue(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());

String path = "/testReadModifyUpdateWithEmptyValue";
store.put(path, new byte[0], Optional.of(-1L)).get();

MetadataCache<Policies> objCache = store.getMetadataCache(Policies.class);

Optional<Policies> policies = objCache.get(path).get();
assertFalse(policies.isPresent());
Policies policies1 = objCache.readModifyUpdate(path, (rp) -> {
if (rp != null) {
rp.max_unacked_messages_per_consumer = 100;
}
return rp;
}).get();
assertNull(policies1);

Policies policies2 = objCache.readModifyUpdate(path, (rp) -> {
if (rp == null) {
rp = new Policies();
}
rp.max_unacked_messages_per_consumer = 100;
return rp;
}).get();
assertEquals(policies2.max_unacked_messages_per_consumer.intValue(), 100);
}

/**
* This test validates that metadata-cache can handle BadVersion failure if other cache/metadata-source updates the
* data with different version.
Expand Down
Loading