Skip to content

Commit

Permalink
[improve][broker][PIP-384] Decouple Bookkeeper client from ManagedLed…
Browse files Browse the repository at this point in the history
…gerStorage and enable multiple ManagedLedgerFactory instances (#23313)
  • Loading branch information
lhotari authored Oct 8, 2024
1 parent 5451921 commit 731ec83
Show file tree
Hide file tree
Showing 50 changed files with 701 additions and 318 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public class ManagedLedgerConfig {
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;

@Getter
@Setter
private String storageClassName;
@Getter
@Setter
private String shadowSourceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.netty.channel.EventLoopGroup;
import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -39,16 +41,18 @@
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.commons.configuration.Configuration;
import org.apache.pulsar.broker.stats.prometheus.metrics.PrometheusMetricsProvider;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerClientFactory implements ManagedLedgerStorage {

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerClientFactory.class);

private static final String DEFAULT_STORAGE_CLASS_NAME = "bookkeeper";
private BookkeeperManagedLedgerStorageClass defaultStorageClass;
private ManagedLedgerFactory managedLedgerFactory;
private BookKeeper defaultBkClient;
private final AsyncCache<EnsemblePlacementPolicyConfig, BookKeeper>
Expand Down Expand Up @@ -119,20 +123,50 @@ public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadata
defaultBkClient.close();
throw e;
}

defaultStorageClass = new BookkeeperManagedLedgerStorageClass() {
@Override
public String getName() {
return DEFAULT_STORAGE_CLASS_NAME;
}

@Override
public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
}

@Override
public StatsProvider getStatsProvider() {
return statsProvider;
}

@Override
public BookKeeper getBookKeeperClient() {
return defaultBkClient;
}
};
}

public ManagedLedgerFactory getManagedLedgerFactory() {
return managedLedgerFactory;
@Override
public Collection<ManagedLedgerStorageClass> getStorageClasses() {
return List.of(getDefaultStorageClass());
}

public BookKeeper getBookKeeperClient() {
return defaultBkClient;
@Override
public Optional<ManagedLedgerStorageClass> getManagedLedgerStorageClass(String name) {
if (name == null || DEFAULT_STORAGE_CLASS_NAME.equals(name)) {
return Optional.of(getDefaultStorageClass());
} else {
return Optional.empty();
}
}

public StatsProvider getStatsProvider() {
return statsProvider;
@Override
public ManagedLedgerStorageClass getDefaultStorageClass() {
return defaultStorageClass;
}


@VisibleForTesting
public Map<EnsemblePlacementPolicyConfig, BookKeeper> getBkEnsemblePolicyToBookKeeperMap() {
return bkEnsemblePolicyToBkClientMap.synchronous().asMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,9 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.stats.prometheus.PulsarPrometheusMetricsServlet;
import org.apache.pulsar.broker.storage.BookkeeperManagedLedgerStorageClass;
import org.apache.pulsar.broker.storage.ManagedLedgerStorage;
import org.apache.pulsar.broker.storage.ManagedLedgerStorageClass;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
Expand Down Expand Up @@ -210,7 +212,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private static final int DEFAULT_MONOTONIC_CLOCK_GRANULARITY_MILLIS = 8;
private final ServiceConfiguration config;
private NamespaceService nsService = null;
private ManagedLedgerStorage managedLedgerClientFactory = null;
private ManagedLedgerStorage managedLedgerStorage = null;
private LeaderElectionService leaderElectionService = null;
private BrokerService brokerService = null;
private WebService webService = null;
Expand Down Expand Up @@ -606,13 +608,13 @@ public CompletableFuture<Void> closeAsync() {
this.brokerService = null;
}

if (this.managedLedgerClientFactory != null) {
if (this.managedLedgerStorage != null) {
try {
this.managedLedgerClientFactory.close();
this.managedLedgerStorage.close();
} catch (Exception e) {
LOG.warn("ManagedLedgerClientFactory closing failed {}", e.getMessage());
}
this.managedLedgerClientFactory = null;
this.managedLedgerStorage = null;
}

if (bkClientFactory != null) {
Expand Down Expand Up @@ -899,7 +901,7 @@ public void start() throws PulsarServerException {
// Now we are ready to start services
this.bkClientFactory = newBookKeeperClientFactory();

managedLedgerClientFactory = newManagedLedgerClientFactory();
managedLedgerStorage = newManagedLedgerStorage();

this.brokerService = newBrokerService(this);

Expand Down Expand Up @@ -1122,7 +1124,7 @@ protected OrderedExecutor newOrderedExecutor() {
}

@VisibleForTesting
protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception {
protected ManagedLedgerStorage newManagedLedgerStorage() throws Exception {
return ManagedLedgerStorage.create(
config, localMetadataStore,
bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry()
Expand Down Expand Up @@ -1348,7 +1350,7 @@ private synchronized void startLoadBalancerTasks() {
long resourceQuotaUpdateInterval = TimeUnit.MINUTES
.toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes());
loadSheddingTask = new LoadSheddingTask(loadManager, loadManagerExecutor,
config, getManagedLedgerFactory());
config, getDefaultManagedLedgerFactory());
loadSheddingTask.start();
loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate(
new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval,
Expand Down Expand Up @@ -1535,11 +1537,17 @@ public WorkerService getWorkerService() throws UnsupportedOperationException {
}

public BookKeeper getBookKeeperClient() {
return getManagedLedgerClientFactory().getBookKeeperClient();
ManagedLedgerStorageClass defaultStorageClass = getManagedLedgerStorage().getDefaultStorageClass();
if (defaultStorageClass instanceof BookkeeperManagedLedgerStorageClass bkStorageClass) {
return bkStorageClass.getBookKeeperClient();
} else {
// TODO: Refactor code to support other than default bookkeeper based storage class
throw new UnsupportedOperationException("BookKeeper client is not available");
}
}

public ManagedLedgerFactory getManagedLedgerFactory() {
return getManagedLedgerClientFactory().getManagedLedgerFactory();
public ManagedLedgerFactory getDefaultManagedLedgerFactory() {
return getManagedLedgerStorage().getDefaultStorageClass().getManagedLedgerFactory();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,9 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());

return pulsarService.getBrokerService().getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(
v -> transactionMetadataStoreProvider.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
v -> transactionMetadataStoreProvider.openStore(tcId,
pulsarService.getManagedLedgerStorage().getManagedLedgerStorageClass(v.getStorageClassName())
.get().getManagedLedgerFactory(), v,
timeoutTracker, recoverTracker,
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(), txnLogBufferedWriterConfig,
brokerClientSharedTimer));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2540,7 +2540,7 @@ protected void internalScanOffloadedLedgers(OffloaderObjectsScannerUtils.Scanner
String localClusterName = pulsar().getConfiguration().getClusterName();

OffloaderObjectsScannerUtils.scanOffloadedLedgers(managedLedgerOffloader,
localClusterName, pulsar().getManagedLedgerFactory(), sink);
localClusterName, pulsar().getDefaultManagedLedgerFactory(), sink);

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1405,19 +1405,27 @@ protected void internalGetManagedLedgerInfoForNonPartitionedTopic(AsyncResponse
validateTopicOperationAsync(topicName, TopicOperation.GET_STATS)
.thenAccept(__ -> {
String managedLedger = topicName.getPersistenceNamingEncoding();
pulsar().getManagedLedgerFactory()
.asyncGetManagedLedgerInfo(managedLedger, new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
objectWriter().writeValue(output, info);
pulsar().getBrokerService().getManagedLedgerFactoryForTopic(topicName)
.thenAccept(managedLedgerFactory -> {
managedLedgerFactory.asyncGetManagedLedgerInfo(managedLedger,
new ManagedLedgerInfoCallback() {
@Override
public void getInfoComplete(ManagedLedgerInfo info, Object ctx) {
asyncResponse.resume((StreamingOutput) output -> {
objectWriter().writeValue(output, info);
});
}

@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
})
.exceptionally(ex -> {
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}
@Override
public void getInfoFailed(ManagedLedgerException exception, Object ctx) {
asyncResponse.resume(exception);
}
}, null);
}).exceptionally(ex -> {
log.error("[{}] Failed to get managed info for {}", clientAppId(), topicName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
Expand Down Expand Up @@ -3174,7 +3182,9 @@ protected CompletableFuture<PersistentOfflineTopicStats> internalGetBacklogAsync
try {
PersistentOfflineTopicStats estimateOfflineTopicStats =
offlineTopicBacklog.estimateUnloadedTopicBacklog(
pulsar().getManagedLedgerFactory(),
pulsar().getBrokerService()
.getManagedLedgerFactoryForTopic(topicName,
config.getStorageClassName()),
topicName);
pulsar().getBrokerService()
.cacheOfflineTopicStats(topicName, estimateOfflineTopicStats);
Expand Down
Loading

0 comments on commit 731ec83

Please sign in to comment.