Skip to content

Commit

Permalink
[improve][admin,broker] Add option to unloadNamespaceBundle with bund…
Browse files Browse the repository at this point in the history
…le Affinity broker url (#18663)

Co-authored-by: Vineeth <vineeth.polamreddy@verizonmedia.com>
  • Loading branch information
vineeth1995 and Vineeth authored Dec 31, 2022
1 parent 36cff70 commit e194c01
Show file tree
Hide file tree
Showing 17 changed files with 249 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -56,6 +58,8 @@
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.authorization.AuthorizationService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
Expand Down Expand Up @@ -878,6 +882,53 @@ protected BookieAffinityGroupData internalGetBookieAffinityGroup() {
}
}

private void validateLeaderBroker() {
if (!this.isLeaderBroker()) {
LeaderBroker leaderBroker = pulsar().getLeaderElectionService().getCurrentLeader().get();
String leaderBrokerUrl = leaderBroker.getServiceUrl();
CompletableFuture<LookupResult> result = pulsar().getNamespaceService()
.createLookupResult(leaderBrokerUrl, false, null);
try {
LookupResult lookupResult = result.get(2L, TimeUnit.SECONDS);
String redirectUrl = isRequestHttps() ? lookupResult.getLookupData().getHttpUrlTls()
: lookupResult.getLookupData().getHttpUrl();
if (redirectUrl == null) {
log.error("Redirected broker's service url is not configured");
throw new RestException(Response.Status.PRECONDITION_FAILED,
"Redirected broker's service url is not configured.");
}
URL url = new URL(redirectUrl);
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(url.getHost())
.port(url.getPort())
.replaceQueryParam("authoritative",
false).build();

// Redirect
if (log.isDebugEnabled()) {
log.debug("Redirecting the request call to leader - {}", redirect);
}
throw new WebApplicationException(Response.temporaryRedirect(redirect).build());
} catch (MalformedURLException exception) {
log.error("The leader broker url is malformed - {}", leaderBrokerUrl);
throw new RestException(exception);
} catch (ExecutionException | InterruptedException exception) {
log.error("Leader broker not found - {}", leaderBrokerUrl);
throw new RestException(exception.getCause());
} catch (TimeoutException exception) {
log.error("Leader broker not found within timeout - {}", leaderBrokerUrl);
throw new RestException(exception);
}
}
}

public void setNamespaceBundleAffinity (String bundleRange, String destinationBroker) {
if (StringUtils.isBlank(destinationBroker)) {
return;
}
validateLeaderBroker();
pulsar().getLoadManager().get().setNamespaceBundleAffinity(bundleRange, destinationBroker);
}

public CompletableFuture<Void> internalUnloadNamespaceBundleAsync(String bundleRange, boolean authoritative) {
return validateSuperUserAccessAsync()
.thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse, @PathP
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(property, cluster, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,8 +813,10 @@ public void unloadNamespace(@Suspended final AsyncResponse asyncResponse,
public void unloadNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant, @PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("destinationBroker") String destinationBroker) {
validateNamespaceName(tenant, namespace);
setNamespaceBundleAffinity(bundleRange, destinationBroker);
internalUnloadNamespaceBundleAsync(bundleRange, authoritative)
.thenAccept(__ -> {
log.info("[{}] Successfully unloaded namespace bundle {}", clientAppId(), bundleRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ default void writeLoadReportOnZookeeper(boolean force) throws Exception {

CompletableFuture<Set<String>> getAvailableBrokersAsync();

String setNamespaceBundleAffinity(String bundle, String broker);

void stop() throws PulsarServerException;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,4 +140,6 @@ default void writeBrokerDataOnZooKeeper(boolean force) {
* @return bundle data
*/
BundleData getBundleDataOrDefault(String bundle);

String setNamespaceBundleAffinity(String bundle, String broker);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.impl.PulsarResourceDescription;
Expand All @@ -43,11 +46,13 @@ public class NoopLoadManager implements LoadManager {
private String lookupServiceAddress;
private ResourceUnit localResourceUnit;
private LockManager<LocalBrokerData> lockManager;
private Map<String, String> bundleBrokerAffinityMap;

@Override
public void initialize(PulsarService pulsar) {
this.pulsar = pulsar;
this.lockManager = pulsar.getCoordinationService().getLockManager(LocalBrokerData.class);
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

@Override
Expand Down Expand Up @@ -142,4 +147,12 @@ public void stop() throws PulsarServerException {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ public class ModularLoadManagerImpl implements ModularLoadManager {

private final Lock lock = new ReentrantLock();
private Set<String> knownBrokers = ConcurrentHashMap.newKeySet();
private Map<String, String> bundleBrokerAffinityMap;

/**
* Initializes fields which do not depend on PulsarService. initialize(PulsarService) should subsequently be called.
Expand All @@ -215,7 +216,7 @@ public ModularLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
new ExecutorProvider.ExtendedThreadFactory("pulsar-modular-load-manager"));
this.brokerToFailureDomainMap = new HashMap<>();

this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
this.brokerTopicLoadingPredicate = new BrokerTopicLoadingPredicate() {
@Override
public boolean isEnablePersistentTopics(String brokerUrl) {
Expand Down Expand Up @@ -1212,4 +1213,13 @@ public List<Metrics> getLoadBalancingMetrics() {

return metricsCollection;
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LoadManager;
Expand Down Expand Up @@ -65,13 +66,13 @@ public LoadManagerReport generateLoadReport() {

@Override
public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
String bundleRange = LoadManagerShared.getBundleRangeFromBundleName(serviceUnit.toString());
String affinityBroker = loadManager.setNamespaceBundleAffinity(bundleRange, null);
if (!StringUtils.isBlank(affinityBroker)) {
return Optional.of(buildBrokerResourceUnit(affinityBroker));
}
Optional<String> leastLoadedBroker = loadManager.selectBrokerForAssignment(serviceUnit);
return leastLoadedBroker.map(s -> {
String webServiceUrl = getBrokerWebServiceUrl(s);
String brokerZnodeName = getBrokerZnodeName(s, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
});
return leastLoadedBroker.map(this::buildBrokerResourceUnit);
}

private String getBrokerWebServiceUrl(String broker) {
Expand Down Expand Up @@ -146,4 +147,16 @@ public Set<String> getAvailableBrokers() throws Exception {
public CompletableFuture<Set<String>> getAvailableBrokersAsync() {
return loadManager.getAvailableBrokersAsync();
}

private SimpleResourceUnit buildBrokerResourceUnit (String broker) {
String webServiceUrl = getBrokerWebServiceUrl(broker);
String brokerZnodeName = getBrokerZnodeName(broker, webServiceUrl);
return new SimpleResourceUnit(webServiceUrl,
new PulsarResourceDescription(), Map.of(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME, brokerZnodeName));
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
return loadManager.setNamespaceBundleAffinity(bundle, broker);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
Expand Down Expand Up @@ -186,6 +188,8 @@ public class SimpleLoadManagerImpl implements LoadManager, Consumer<Notification

private volatile Future<?> updateRankingHandle;

private Map<String, String> bundleBrokerAffinityMap;

// Perform initializations which may be done without a PulsarService.
public SimpleLoadManagerImpl() {
scheduler = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -251,6 +255,7 @@ public Long load(String key) throws Exception {
}
});
this.pulsar = pulsar;
this.bundleBrokerAffinityMap = new ConcurrentHashMap<>();
}

public SimpleLoadManagerImpl(PulsarService pulsar) {
Expand Down Expand Up @@ -1443,6 +1448,15 @@ public void doNamespaceBundleSplit() throws Exception {
}
}

@Override
public String setNamespaceBundleAffinity(String bundle, String broker) {
if (StringUtils.isBlank(broker)) {
return this.bundleBrokerAffinityMap.remove(bundle);
}
broker = broker.replaceFirst("http[s]?://", "");
return this.bundleBrokerAffinityMap.put(bundle, broker);
}

@Override
public void stop() throws PulsarServerException {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,7 @@ private void searchForCandidateBroker(NamespaceBundle bundle,
}
}

protected CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
public CompletableFuture<LookupResult> createLookupResult(String candidateBroker, boolean authoritativeRedirect,
final String advertisedListenerName) {

CompletableFuture<LookupResult> lookupFuture = new CompletableFuture<>();
Expand Down Expand Up @@ -692,6 +692,7 @@ private Optional<Pair<String, String>> getLeastLoadedFromLoadManager(ServiceUnit
String lookupAddress = leastLoadedBroker.get().getResourceId();
String advertisedAddr = (String) leastLoadedBroker.get()
.getProperty(ResourceUnit.PROPERTY_KEY_BROKER_ZNODE_NAME);

if (LOG.isDebugEnabled()) {
LOG.debug("{} : redirecting to the least loaded broker, lookup address={}",
pulsar.getSafeWebServiceAddress(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,8 +710,8 @@ public CompletableFuture<Void> validateBundleOwnershipAsync(NamespaceBundle bund
// Replace the host and port of the current request and redirect
URI redirect = UriBuilder.fromUri(uri.getRequestUri()).host(webUrl.get().getHost())
.port(webUrl.get().getPort()).replaceQueryParam("authoritative",
newAuthoritative).build();

newAuthoritative).replaceQueryParam("destinationBroker",
null).build();
log.debug("{} is not a service unit owned", bundle);
// Redirect
log.debug("Redirecting the rest call to {}", redirect);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ public void testNamespacesApiRedirects() throws Exception {
doReturn(uri).when(uriInfo).getRequestUri();

namespaces.unloadNamespaceBundle(response, this.testTenant, this.testOtherCluster,
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false);
this.testLocalNamespaces.get(2).getLocalName(), "0x00000000_0xffffffff", false, null);
captor = ArgumentCaptor.forClass(WebApplicationException.class);
verify(response, timeout(5000).atLeast(1)).resume(captor.capture());
assertEquals(captor.getValue().getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
Expand Down Expand Up @@ -1053,7 +1053,7 @@ public void testUnloadNamespaceWithBundles() throws Exception {
doReturn(CompletableFuture.completedFuture(null)).when(nsSvc).unloadNamespaceBundle(testBundle);
AsyncResponse response = mock(AsyncResponse.class);
namespaces.unloadNamespaceBundle(response, testTenant, testLocalCluster, bundledNsLocal, "0x00000000_0x80000000",
false);
false, null);
verify(response, timeout(5000).times(1)).resume(any(RestException.class));

// cleanup
Expand Down
Loading

0 comments on commit e194c01

Please sign in to comment.