Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -954,6 +954,8 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128

# The number of threads used for serializing and deserializing data to and from the metadata store
metadataStoreSerDesThreads=1

### --- Authentication --- ###

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ metadataStoreBatchingMaxOperations=1000
# Maximum size of a batch
metadataStoreBatchingMaxSizeKb=128

# The number of threads used for serializing and deserializing data to and from the metadata store
metadataStoreSerDesThreads=1

### --- TLS --- ###
# Deprecated - Use webServicePortTls and brokerServicePortTls instead
tlsEnabled=false
Expand Down
26 changes: 5 additions & 21 deletions pip/pip-453.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ Additionally, some code paths execute the compute intensive tasks in the metadat

# High Level Design

Create 3 set of threads:
Create 4 sets of threads:
- `<name>-event`: the original metadata store thread, which is now only responsible to handle notifications. This executor won't be a `ScheduledExecutorService` anymore.
- `<name>-scheduler`: a single thread, which is used to schedule tasks like flushing and retrying failed operations.
- `<name>-batch-flusher`: a single thread, which is used to schedule the flushing task at a fixed rate. It won't be created if `metadataStoreBatchingEnabled` is false.
- `<name>-worker`: a fixed thread pool shared by all `MetadataCache` instances to execute compute intensive tasks like serialization and deserialization. The same path will be handled by the same thread to keep the processing order on the same path.

Expand All @@ -53,25 +54,6 @@ The only concern is that introducing a new thread to execute callbacks allows wa
metadataStore.get(path).thenApply(__ -> metadataStore.get(otherPath).join());;
```

Other tasks like the retry on failure is executed in JVM's common `ForkJoinPool` by `CompletableFuture` APIs. For example:

```diff
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKMetadataStore.java
@@ -245,9 +245,8 @@ public class ZKMetadataStore extends AbstractBatchedMetadataStore
countsByType, totalSize, opsForLog);

// Retry with the individual operations
- executor.schedule(() -> {
- ops.forEach(o -> batchOperation(Collections.singletonList(o)));
- }, 100, TimeUnit.MILLISECONDS);
+ CompletableFuture.delayedExecutor(100, TimeUnit.MILLISECONDS).execute(() ->
+ ops.forEach(o -> batchOperation(Collections.singletonList(o))));
} else {
MetadataStoreException e = getException(code, path);
ops.forEach(o -> o.getFuture().completeExceptionally(e));
```

# Detailed Design

## Public-facing Changes
Expand All @@ -85,9 +67,11 @@ Add a configurations to specify the number of worker threads for `MetadataCache`
category = CATEGORY_SERVER,
doc = "The number of threads uses for serializing and deserializing data to and from the metadata store"
)
private int metadataStoreSerDesThreads = Runtime.getRuntime().availableProcessors();
private int metadataStoreSerDesThreads = 1;
```

Use 1 as the default value since the serialization and deserialization tasks are not frequent. This separated thread pool is mainly added to avoid blocking the metadata store callback thread.

### Metrics

The `pulsar_batch_metadata_store_executor_queue_size` metric will be removed because the `<name>-batch-flusher` thread won't execute other tasks except for flushing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.client.RackawareEnsemblePlacementPolicy;
Expand Down Expand Up @@ -57,6 +58,8 @@ public class IsolatedBookieEnsemblePlacementPolicy extends RackawareEnsemblePlac
// the secondary group.
private ImmutablePair<Set<String>, Set<String>> defaultIsolationGroups;

@Getter
@VisibleForTesting
private MetadataCache<BookiesRackConfiguration> bookieMappingCache;

private static final String PULSAR_SYSTEM_TOPIC_ISOLATION_GROUP = "*";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,12 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private boolean metadataStoreAllowReadOnlyOperations;

@FieldContext(
category = CATEGORY_SERVER,
doc = "The number of threads used for serializing and deserializing data to and from the metadata store"
)
private int metadataStoreSerDesThreads = 1;

@Deprecated
@FieldContext(
category = CATEGORY_SERVER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import io.netty.util.HashedWheelTimer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -288,8 +291,7 @@ public void testBasic() throws Exception {
secondaryBookieGroup.put(BOOKIE4, BookieInfo.builder().rack("rack0").build());
bookieMapping.put("group2", secondaryBookieGroup);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));

ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
null).getResult();
Expand Down Expand Up @@ -340,8 +342,7 @@ public void testNoBookieInfo() throws Exception {
+ "\": {\"rack\": \"rack0\", \"hostname\": \"bookie3.example.com\"}, \"" + BOOKIE4
+ "\": {\"rack\": \"rack2\", \"hostname\": \"bookie4.example.com\"}}}";

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(StandardCharsets.UTF_8),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, data.getBytes(StandardCharsets.UTF_8));

List<BookieId> ensemble = isolationPolicy.newEnsemble(2, 2, 2, Collections.emptyMap(),
new HashSet<>()).getResult();
Expand Down Expand Up @@ -399,8 +400,7 @@ public void testBookieInfoChange() throws Exception {
bookieMapping.put("group1", mainBookieGroup);
bookieMapping.put("group2", secondaryBookieGroup);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));

ensemble = isolationPolicy.newEnsemble(3, 3, 3, Collections.emptyMap(),
new HashSet<>()).getResult();
Expand Down Expand Up @@ -784,8 +784,7 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
bookieMapping.put(isolationGroup2, group2);
bookieMapping.put(isolationGroup3, group3);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));

groups.setLeft(Sets.newHashSet(isolationGroup1, isolationGroup3));
groups.setRight(Sets.newHashSet(""));
Expand All @@ -808,8 +807,7 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));

groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
Expand All @@ -831,12 +829,24 @@ public void testGetExcludedBookiesWithIsolationGroups() throws Exception {
bookieMapping.put(isolationGroup1, group1);
bookieMapping.put(isolationGroup2, group2);

store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, jsonMapper.writeValueAsBytes(bookieMapping),
Optional.empty()).join();
updateBookieInfo(isolationPolicy, jsonMapper.writeValueAsBytes(bookieMapping));

groups.setLeft(Sets.newHashSet(isolationGroup1));
groups.setRight(Sets.newHashSet(isolationGroup2));
blacklist = isolationPolicy.getExcludedBookiesWithIsolationGroups(2, groups);
assertTrue(blacklist.isEmpty());
}

// The policy gets the bookie info asynchronously before each query or update, when putting the bookie info into
// the metadata store, the cache needs some time to receive the notification and update accordingly.
private void updateBookieInfo(IsolatedBookieEnsemblePlacementPolicy isolationPolicy, byte[] bookieInfo) {
final var cache = isolationPolicy.getBookieMappingCache();
assertNotNull(cache); // the policy must have been initialized

final var key = BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH;
final var previousBookieInfo = cache.getIfCached(key);
store.put(key, bookieInfo, Optional.empty()).join();
Awaitility.await().atMost(Duration.ofSeconds(1)).untilAsserted(() ->
assertNotEquals(cache.getIfCached(key), previousBookieInfo));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,7 @@ public MetadataStore createConfigurationMetadataStore(PulsarMetadataEventSynchro
.synchronizer(synchronizer)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}

Expand Down Expand Up @@ -1328,6 +1329,7 @@ public MetadataStoreExtended createLocalMetadataStore(PulsarMetadataEventSynchro
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.openTelemetry(openTelemetry)
.nodeSizeStats(new DefaultMetadataNodeSizeStats())
.numSerDesThreads(config.getMetadataStoreSerDesThreads())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.testng.AssertJUnit.assertSame;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
Expand All @@ -38,6 +43,10 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.metadata.api.MetadataCacheConfig;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -339,4 +348,60 @@ public void testShutdownViaAdminApi() throws Exception {
assertTrue(e instanceof PulsarClientException.TimeoutException);
}
}

@Test
public void testMetadataSerDesThreads() throws Exception {
final var numSerDesThreads = 5;
final var config = new ServiceConfiguration();
config.setMetadataStoreSerDesThreads(numSerDesThreads);
config.setClusterName("test");
config.setMetadataStoreUrl("memory:local");
config.setConfigurationMetadataStoreUrl("memory:local");

@Cleanup final var pulsar = new PulsarService(config);
pulsar.start();

BiConsumer<MetadataStore, String> verifier = (store, prefix) -> {
final var serDes = new CustomMetadataSerDes();
final var cache = store.getMetadataCache(prefix, serDes, MetadataCacheConfig.builder().build());
for (int i = 0; i < 100 && serDes.threadNameToSerializedPaths.size() < numSerDesThreads; i++) {
cache.create(prefix + i, "value-" + i).join();
final var value = cache.get(prefix + i).join();
assertEquals(value.orElseThrow(), "value-" + i);
final var newValue = cache.readModifyUpdate(prefix + i, s -> s + "-updated").join();
assertEquals(newValue, "value-" + i + "-updated");
// Verify the serialization and deserialization are handled by the same thread
assertEquals(serDes.threadNameToSerializedPaths, serDes.threadNameToDeserializedPaths);
}
log.info("SerDes thread mapping: {}", serDes.threadNameToSerializedPaths);
assertEquals(serDes.threadNameToSerializedPaths.keySet().size(), numSerDesThreads);
// Verify a path cannot be handled by multiple threads
final var paths = serDes.threadNameToSerializedPaths.values().stream()
.flatMap(Set::stream).sorted().toList();
assertEquals(paths.stream().distinct().toList(), paths);
};

verifier.accept(pulsar.getLocalMetadataStore(), "/test-local/");
verifier.accept(pulsar.getConfigurationMetadataStore(), "/test-config/");
}

private static class CustomMetadataSerDes implements MetadataSerde<String> {

final Map<String, Set<String>> threadNameToSerializedPaths = new ConcurrentHashMap<>();
final Map<String, Set<String>> threadNameToDeserializedPaths = new ConcurrentHashMap<>();

@Override
public byte[] serialize(String path, String value) throws IOException{
threadNameToSerializedPaths.computeIfAbsent(Thread.currentThread().getName(),
__ -> ConcurrentHashMap.newKeySet()).add(path);
return value.getBytes();
}

@Override
public String deserialize(String path, byte[] data, Stat stat) throws IOException {
threadNameToDeserializedPaths.computeIfAbsent(Thread.currentThread().getName(),
__ -> ConcurrentHashMap.newKeySet()).add(path);
return new String(data);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import java.util.concurrent.ExecutorService;
import lombok.Cleanup;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.testcontext.NonClosingProxyHandler;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.impl.stats.BatchMetadataStoreStats;
import org.apache.pulsar.metadata.impl.stats.MetadataStoreStats;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand All @@ -53,14 +51,6 @@ protected void setup() throws Exception {
var newStats = new MetadataStoreStats(
localMetadataStoreName, pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "metadataStoreStats", newStats, true);

var currentBatchedStats = (BatchMetadataStoreStats) FieldUtils.readField(localMetadataStore,
"batchMetadataStoreStats", true);
currentBatchedStats.close();
var currentExecutor = (ExecutorService) FieldUtils.readField(currentBatchedStats, "executor", true);
var newBatchedStats = new BatchMetadataStoreStats(localMetadataStoreName, currentExecutor,
pulsar.getOpenTelemetry().getOpenTelemetryService().getOpenTelemetry());
FieldUtils.writeField(localMetadataStore, "batchMetadataStoreStats", newBatchedStats, true);
}

@AfterMethod(alwaysRun = true)
Expand Down Expand Up @@ -89,7 +79,5 @@ public void testMetadataStoreStats() throws Exception {
var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
assertMetricLongSumValue(metrics, MetadataStoreStats.METADATA_STORE_PUT_BYTES_COUNTER_METRIC_NAME,
attributes, value -> assertThat(value).isPositive());
assertMetricLongSumValue(metrics, BatchMetadataStoreStats.EXECUTOR_QUEUE_SIZE_METRIC_NAME, attributes,
value -> assertThat(value).isPositive());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,7 @@ public class MetadataStoreConfig {
* The estimator to estimate the payload length of metadata node, which used to limit the batch size requested.
*/
private MetadataNodeSizeStats nodeSizeStats;

@Builder.Default
private final int numSerDesThreads = 1;
}
Loading
Loading