Skip to content

Commit

Permalink
[fix][broker] Fix returns wrong webServiceUrl when both webServicePor…
Browse files Browse the repository at this point in the history
…t and webServicePortTls are set (apache#21842)
  • Loading branch information
Technoboy- authored Jan 3, 2024
1 parent ed59967 commit e10d318
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public Optional<ResourceUnit> getLeastLoaded(final ServiceUnitId serviceUnit) {
private String getBrokerWebServiceUrl(String broker) {
LocalBrokerData localData = (loadManager).getBrokerLocalData(broker);
if (localData != null) {
return localData.getWebServiceUrl() != null ? localData.getWebServiceUrl()
: localData.getWebServiceUrlTls();
return localData.getWebServiceUrlTls() != null ? localData.getWebServiceUrlTls()
: localData.getWebServiceUrl();
}
return String.format("http://%s", broker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.apache.pulsar.broker.loadbalance.LoadBalancerTestingUtils;
import org.apache.pulsar.broker.loadbalance.LoadData;
import org.apache.pulsar.broker.loadbalance.LoadManager;
import org.apache.pulsar.broker.loadbalance.ResourceUnit;
import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared.BrokerTopicLoadingPredicate;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
Expand Down Expand Up @@ -120,8 +121,12 @@ public class ModularLoadManagerImplTest {
private PulsarService pulsar3;

private String primaryHost;

private String primaryTlsHost;
private String secondaryHost;

private String secondaryTlsHost;

private NamespaceBundleFactory nsFactory;

private ModularLoadManagerImpl primaryLoadManager;
Expand Down Expand Up @@ -167,16 +172,19 @@ void setup() throws Exception {
config1.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config1.setClusterName("use");
config1.setWebServicePort(Optional.of(0));
config1.setWebServicePortTls(Optional.of(0));
config1.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());

config1.setAdvertisedAddress("localhost");
config1.setBrokerShutdownTimeoutMs(0L);
config1.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config1.setBrokerServicePort(Optional.of(0));
config1.setBrokerServicePortTls(Optional.of(0));
pulsar1 = new PulsarService(config1);
pulsar1.start();

primaryHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTP().get());
primaryTlsHost = String.format("%s:%d", "localhost", pulsar1.getListenPortHTTPS().get());
url1 = new URL(pulsar1.getWebServiceAddress());
admin1 = PulsarAdmin.builder().serviceHttpUrl(url1.toString()).build();

Expand All @@ -186,11 +194,13 @@ void setup() throws Exception {
config2.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config2.setClusterName("use");
config2.setWebServicePort(Optional.of(0));
config2.setWebServicePortTls(Optional.of(0));
config2.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config2.setAdvertisedAddress("localhost");
config2.setBrokerShutdownTimeoutMs(0L);
config2.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config2.setBrokerServicePort(Optional.of(0));
config2.setBrokerServicePortTls(Optional.of(0));
pulsar2 = new PulsarService(config2);
pulsar2.start();

Expand All @@ -199,14 +209,17 @@ void setup() throws Exception {
config.setLoadBalancerLoadSheddingStrategy("org.apache.pulsar.broker.loadbalance.impl.OverloadShedder");
config.setClusterName("use");
config.setWebServicePort(Optional.of(0));
config.setWebServicePortTls(Optional.of(0));
config.setMetadataStoreUrl("zk:127.0.0.1:" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setBrokerShutdownTimeoutMs(0L);
config.setLoadBalancerOverrideBrokerNicSpeedGbps(Optional.of(1.0d));
config.setBrokerServicePort(Optional.of(0));
config.setBrokerServicePortTls(Optional.of(0));
pulsar3 = new PulsarService(config);

secondaryHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTP().get());
secondaryTlsHost = String.format("%s:%d", "localhost", pulsar2.getListenPortHTTPS().get());
url2 = new URL(pulsar2.getWebServiceAddress());
admin2 = PulsarAdmin.builder().serviceHttpUrl(url2.toString()).build();

Expand Down Expand Up @@ -431,9 +444,9 @@ public void testLoadShedding() throws Exception {
pulsar1.getConfiguration().setLoadBalancerEnabled(true);
final LoadData loadData = (LoadData) getField(primaryLoadManagerSpy, "loadData");
final Map<String, BrokerData> brokerDataMap = loadData.getBrokerData();
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryHost));
final BrokerData brokerDataSpy1 = spy(brokerDataMap.get(primaryTlsHost));
when(brokerDataSpy1.getLocalData()).thenReturn(localBrokerData);
brokerDataMap.put(primaryHost, brokerDataSpy1);
brokerDataMap.put(primaryTlsHost, brokerDataSpy1);
// Need to update all the bundle data for the shredder to see the spy.
primaryLoadManagerSpy.handleDataNotification(new Notification(NotificationType.Created, LoadManager.LOADBALANCE_BROKERS_ROOT + "/broker:8080"));

Expand All @@ -451,21 +464,21 @@ public void testLoadShedding() throws Exception {
verify(namespacesSpy1, Mockito.times(1))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(2));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);

primaryLoadManagerSpy.doLoadShedding();
// Now less expensive bundle will be unloaded (normally other bundle would move off and nothing would be
// unloaded, but this is not the case due to the spy's behavior).
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(bundleReference.get(), mockBundleName(1));
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);

primaryLoadManagerSpy.doLoadShedding();
// Now both are in grace period: neither should be unloaded.
verify(namespacesSpy1, Mockito.times(2))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());
assertEquals(selectedBrokerRef.get().get(), secondaryHost);
assertEquals(selectedBrokerRef.get().get(), secondaryTlsHost);

// Test bundle transfer to same broker

Expand All @@ -478,7 +491,7 @@ public void testLoadShedding() throws Exception {
loadData.getRecentlyUnloadedBundles().clear();
primaryLoadManagerSpy.doLoadShedding();
// The bundle shouldn't be unloaded because the broker is the same.
verify(namespacesSpy1, Mockito.times(3))
verify(namespacesSpy1, Mockito.times(4))
.unloadNamespaceBundle(Mockito.anyString(), Mockito.anyString(), Mockito.anyString());

}
Expand Down Expand Up @@ -705,7 +718,7 @@ public void testLoadSheddingWithNamespaceIsolationPolicies() throws Exception {
admin1.namespaces().createNamespace(namespace);

@Cleanup
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getSafeWebServiceAddress()).build();
PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsar1.getWebServiceAddress()).build();
Producer<byte[]> producer = pulsarClient.newProducer().topic("persistent://" + namespace + "/my-topic1")
.create();
ModularLoadManagerImpl loadManager = (ModularLoadManagerImpl) ((ModularLoadManagerWrapper) pulsar1
Expand Down Expand Up @@ -896,6 +909,10 @@ public void testRemoveNonExistBundleData()
String topicToFindBundle = topicName + 0;
NamespaceBundle bundleWillBeSplit = pulsar1.getNamespaceService().getBundle(TopicName.get(topicToFindBundle));

final Optional<ResourceUnit> leastLoaded = loadManagerWrapper.getLeastLoaded(bundleWillBeSplit);
assertFalse(leastLoaded.isEmpty());
assertTrue(leastLoaded.get().getResourceId().startsWith("https"));

String bundleDataPath = BUNDLE_DATA_BASE_PATH + "/" + tenant + "/" + namespace;
CompletableFuture<List<String>> children = bundlesCache.getChildren(bundleDataPath);
List<String> bundles = children.join();
Expand Down

0 comments on commit e10d318

Please sign in to comment.