Skip to content
This repository was archived by the owner on Apr 1, 2024. It is now read-only.

Commit 3b3c713

Browse files
authored
[fix][broker] Fix broker not starting when both transactions and the Extensible Load Manager are enabled (apache#22139)
1 parent f33a3f4 commit 3b3c713

File tree

9 files changed

+353
-125
lines changed

9 files changed

+353
-125
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.commons.lang3.tuple.MutablePair;
5959
import org.apache.pulsar.broker.ServiceConfiguration;
6060
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
61+
import org.apache.pulsar.broker.loadbalance.extensions.ExtensibleLoadManagerImpl;
6162
import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData;
6263
import org.apache.pulsar.broker.service.AbstractSubscription;
6364
import org.apache.pulsar.broker.service.AnalyzeBacklogResult;
@@ -157,7 +158,8 @@ public PersistentSubscription(PersistentTopic topic, String subscriptionName, Ma
157158
this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties)
158159
? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties);
159160
if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled()
160-
&& !isEventSystemTopic(TopicName.get(topicName))) {
161+
&& !isEventSystemTopic(TopicName.get(topicName))
162+
&& !ExtensibleLoadManagerImpl.isInternalTopic(topicName)) {
161163
this.pendingAckHandle = new PendingAckHandleImpl(this);
162164
} else {
163165
this.pendingAckHandle = new PendingAckHandleDisabled();

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,7 +350,8 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS
350350
TopicName topicName = TopicName.get(topic);
351351
if (brokerService.getPulsar().getConfiguration().isTransactionCoordinatorEnabled()
352352
&& !isEventSystemTopic(topicName)
353-
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) {
353+
&& !NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())
354+
&& !ExtensibleLoadManagerImpl.isInternalTopic(topic)) {
354355
this.transactionBuffer = brokerService.getPulsar()
355356
.getTransactionBufferProvider().newTransactionBuffer(this);
356357
} else {
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions;
20+
21+
import static org.mockito.Mockito.reset;
22+
import static org.mockito.Mockito.spy;
23+
import com.google.common.collect.Sets;
24+
import java.util.concurrent.CompletableFuture;
25+
import org.apache.commons.lang3.reflect.FieldUtils;
26+
import org.apache.commons.lang3.tuple.Pair;
27+
import org.apache.pulsar.broker.PulsarService;
28+
import org.apache.pulsar.broker.ServiceConfiguration;
29+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
30+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannelImpl;
31+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.TransferShedder;
32+
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
33+
import org.apache.pulsar.client.admin.PulsarAdminException;
34+
import org.apache.pulsar.client.impl.LookupService;
35+
import org.apache.pulsar.common.naming.NamespaceBundle;
36+
import org.apache.pulsar.common.naming.SystemTopicNames;
37+
import org.apache.pulsar.common.naming.TopicName;
38+
import org.apache.pulsar.common.policies.data.ClusterData;
39+
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
40+
import org.apache.pulsar.common.policies.data.TopicType;
41+
import org.testng.annotations.AfterClass;
42+
import org.testng.annotations.BeforeClass;
43+
import org.testng.annotations.BeforeMethod;
44+
45+
public abstract class ExtensibleLoadManagerImplBaseTest extends MockedPulsarServiceBaseTest {
46+
47+
protected PulsarService pulsar1;
48+
protected PulsarService pulsar2;
49+
50+
protected PulsarTestContext additionalPulsarTestContext;
51+
52+
protected ExtensibleLoadManagerImpl primaryLoadManager;
53+
54+
protected ExtensibleLoadManagerImpl secondaryLoadManager;
55+
56+
protected ServiceUnitStateChannelImpl channel1;
57+
protected ServiceUnitStateChannelImpl channel2;
58+
59+
protected final String defaultTestNamespace;
60+
61+
protected LookupService lookupService;
62+
63+
protected ExtensibleLoadManagerImplBaseTest(String defaultTestNamespace) {
64+
this.defaultTestNamespace = defaultTestNamespace;
65+
}
66+
67+
protected ServiceConfiguration initConfig(ServiceConfiguration conf) {
68+
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
69+
// stuck when doing unload.
70+
conf.setLoadBalancerInFlightServiceUnitStateWaitingTimeInMillis(5 * 1000);
71+
conf.setLoadBalancerServiceUnitStateMonitorIntervalInSeconds(1);
72+
conf.setForceDeleteNamespaceAllowed(true);
73+
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
74+
conf.setAllowAutoTopicCreation(true);
75+
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
76+
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
77+
conf.setLoadBalancerSheddingEnabled(false);
78+
conf.setLoadBalancerDebugModeEnabled(true);
79+
conf.setTopicLevelPoliciesEnabled(true);
80+
return conf;
81+
}
82+
83+
@Override
84+
@BeforeClass(alwaysRun = true)
85+
protected void setup() throws Exception {
86+
initConfig(conf);
87+
super.internalSetup(conf);
88+
pulsar1 = pulsar;
89+
var conf2 = initConfig(getDefaultConf());
90+
additionalPulsarTestContext = createAdditionalPulsarTestContext(conf2);
91+
pulsar2 = additionalPulsarTestContext.getPulsarService();
92+
93+
setPrimaryLoadManager();
94+
setSecondaryLoadManager();
95+
96+
admin.clusters().createCluster(this.conf.getClusterName(),
97+
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
98+
admin.tenants().createTenant("public",
99+
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
100+
Sets.newHashSet(this.conf.getClusterName())));
101+
admin.namespaces().createNamespace("public/default");
102+
admin.namespaces().setNamespaceReplicationClusters("public/default",
103+
Sets.newHashSet(this.conf.getClusterName()));
104+
105+
admin.namespaces().createNamespace(defaultTestNamespace, 128);
106+
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
107+
Sets.newHashSet(this.conf.getClusterName()));
108+
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
109+
}
110+
111+
@Override
112+
@AfterClass(alwaysRun = true)
113+
protected void cleanup() throws Exception {
114+
this.additionalPulsarTestContext.close();
115+
super.internalCleanup();
116+
}
117+
118+
@BeforeMethod(alwaysRun = true)
119+
protected void initializeState() throws PulsarAdminException, IllegalAccessException {
120+
admin.namespaces().unload(defaultTestNamespace);
121+
reset(primaryLoadManager, secondaryLoadManager);
122+
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
123+
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
124+
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
125+
}
126+
127+
protected void setPrimaryLoadManager() throws IllegalAccessException {
128+
ExtensibleLoadManagerWrapper wrapper =
129+
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
130+
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
131+
FieldUtils.readField(wrapper, "loadManager", true));
132+
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
133+
channel1 = (ServiceUnitStateChannelImpl)
134+
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
135+
}
136+
137+
private void setSecondaryLoadManager() throws IllegalAccessException {
138+
ExtensibleLoadManagerWrapper wrapper =
139+
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
140+
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
141+
FieldUtils.readField(wrapper, "loadManager", true));
142+
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
143+
channel2 = (ServiceUnitStateChannelImpl)
144+
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
145+
}
146+
147+
protected CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
148+
return pulsar.getNamespaceService().getBundleAsync(topic);
149+
}
150+
151+
protected Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
152+
throws Exception {
153+
TopicName changeEventsTopicName =
154+
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
155+
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
156+
int i = 0;
157+
while(true) {
158+
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
159+
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
160+
if (!bundle.equals(changeEventsBundle)) {
161+
return Pair.of(topicName, bundle);
162+
}
163+
i++;
164+
}
165+
}
166+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

Lines changed: 3 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
import org.apache.commons.lang3.tuple.Pair;
8585
import org.apache.pulsar.broker.PulsarService;
8686
import org.apache.pulsar.broker.ServiceConfiguration;
87-
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
8887
import org.apache.pulsar.broker.loadbalance.BrokerFilterException;
8988
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
9089
import org.apache.pulsar.broker.loadbalance.LeaderElectionService;
@@ -110,7 +109,6 @@
110109
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
111110
import org.apache.pulsar.broker.namespace.NamespaceService;
112111
import org.apache.pulsar.broker.service.BrokerServiceException;
113-
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
114112
import org.apache.pulsar.client.admin.PulsarAdminException;
115113
import org.apache.pulsar.client.api.Consumer;
116114
import org.apache.pulsar.client.api.Message;
@@ -124,25 +122,18 @@
124122
import org.apache.pulsar.common.naming.NamespaceBundle;
125123
import org.apache.pulsar.common.naming.NamespaceName;
126124
import org.apache.pulsar.common.naming.ServiceUnitId;
127-
import org.apache.pulsar.common.naming.SystemTopicNames;
128125
import org.apache.pulsar.common.naming.TopicDomain;
129126
import org.apache.pulsar.common.naming.TopicName;
130127
import org.apache.pulsar.common.naming.TopicVersion;
131128
import org.apache.pulsar.common.policies.data.BrokerAssignment;
132129
import org.apache.pulsar.common.policies.data.BundlesData;
133-
import org.apache.pulsar.common.policies.data.ClusterData;
134130
import org.apache.pulsar.common.policies.data.NamespaceOwnershipStatus;
135-
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
136-
import org.apache.pulsar.common.policies.data.TopicType;
137131
import org.apache.pulsar.common.stats.Metrics;
138132
import org.apache.pulsar.common.util.FutureUtil;
139133
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
140134
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
141135
import org.awaitility.Awaitility;
142136
import org.testng.AssertJUnit;
143-
import org.testng.annotations.AfterClass;
144-
import org.testng.annotations.BeforeClass;
145-
import org.testng.annotations.BeforeMethod;
146137
import org.testng.annotations.DataProvider;
147138
import org.testng.annotations.Test;
148139

@@ -152,81 +143,10 @@
152143
@Slf4j
153144
@Test(groups = "broker")
154145
@SuppressWarnings("unchecked")
155-
public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
146+
public class ExtensibleLoadManagerImplTest extends ExtensibleLoadManagerImplBaseTest {
156147

157-
private PulsarService pulsar1;
158-
private PulsarService pulsar2;
159-
160-
private PulsarTestContext additionalPulsarTestContext;
161-
162-
private ExtensibleLoadManagerImpl primaryLoadManager;
163-
164-
private ExtensibleLoadManagerImpl secondaryLoadManager;
165-
166-
private ServiceUnitStateChannelImpl channel1;
167-
private ServiceUnitStateChannelImpl channel2;
168-
169-
private final String defaultTestNamespace = "public/test";
170-
171-
private LookupService lookupService;
172-
173-
private static void initConfig(ServiceConfiguration conf){
174-
conf.setForceDeleteNamespaceAllowed(true);
175-
conf.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
176-
conf.setAllowAutoTopicCreation(true);
177-
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
178-
conf.setLoadBalancerLoadSheddingStrategy(TransferShedder.class.getName());
179-
conf.setLoadBalancerSheddingEnabled(false);
180-
conf.setLoadBalancerDebugModeEnabled(true);
181-
conf.setTopicLevelPoliciesEnabled(true);
182-
}
183-
184-
@BeforeClass
185-
@Override
186-
public void setup() throws Exception {
187-
// Set the inflight state waiting time and ownership monitor delay time to 5 seconds to avoid
188-
// stuck when doing unload.
189-
initConfig(conf);
190-
super.internalSetup(conf);
191-
pulsar1 = pulsar;
192-
ServiceConfiguration defaultConf = getDefaultConf();
193-
initConfig(defaultConf);
194-
additionalPulsarTestContext = createAdditionalPulsarTestContext(defaultConf);
195-
pulsar2 = additionalPulsarTestContext.getPulsarService();
196-
197-
setPrimaryLoadManager();
198-
199-
setSecondaryLoadManager();
200-
201-
admin.clusters().createCluster(this.conf.getClusterName(),
202-
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
203-
admin.tenants().createTenant("public",
204-
new TenantInfoImpl(Sets.newHashSet("appid1", "appid2"),
205-
Sets.newHashSet(this.conf.getClusterName())));
206-
admin.namespaces().createNamespace("public/default");
207-
admin.namespaces().setNamespaceReplicationClusters("public/default",
208-
Sets.newHashSet(this.conf.getClusterName()));
209-
210-
admin.namespaces().createNamespace(defaultTestNamespace, 128);
211-
admin.namespaces().setNamespaceReplicationClusters(defaultTestNamespace,
212-
Sets.newHashSet(this.conf.getClusterName()));
213-
lookupService = (LookupService) FieldUtils.readDeclaredField(pulsarClient, "lookup", true);
214-
}
215-
216-
@Override
217-
@AfterClass(alwaysRun = true)
218-
protected void cleanup() throws Exception {
219-
this.additionalPulsarTestContext.close();
220-
super.internalCleanup();
221-
}
222-
223-
@BeforeMethod(alwaysRun = true)
224-
protected void initializeState() throws PulsarAdminException, IllegalAccessException {
225-
admin.namespaces().unload(defaultTestNamespace);
226-
reset(primaryLoadManager, secondaryLoadManager);
227-
FieldUtils.writeDeclaredField(pulsarClient, "lookup", lookupService, true);
228-
pulsar1.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
229-
pulsar2.getConfig().setLoadBalancerMultiPhaseBundleUnload(true);
148+
public ExtensibleLoadManagerImplTest() {
149+
super("public/test");
230150
}
231151

232152
@Test
@@ -1678,43 +1598,4 @@ public String name() {
16781598

16791599
}
16801600

1681-
private void setPrimaryLoadManager() throws IllegalAccessException {
1682-
ExtensibleLoadManagerWrapper wrapper =
1683-
(ExtensibleLoadManagerWrapper) pulsar1.getLoadManager().get();
1684-
primaryLoadManager = spy((ExtensibleLoadManagerImpl)
1685-
FieldUtils.readField(wrapper, "loadManager", true));
1686-
FieldUtils.writeField(wrapper, "loadManager", primaryLoadManager, true);
1687-
channel1 = (ServiceUnitStateChannelImpl)
1688-
FieldUtils.readField(primaryLoadManager, "serviceUnitStateChannel", true);
1689-
}
1690-
1691-
private void setSecondaryLoadManager() throws IllegalAccessException {
1692-
ExtensibleLoadManagerWrapper wrapper =
1693-
(ExtensibleLoadManagerWrapper) pulsar2.getLoadManager().get();
1694-
secondaryLoadManager = spy((ExtensibleLoadManagerImpl)
1695-
FieldUtils.readField(wrapper, "loadManager", true));
1696-
FieldUtils.writeField(wrapper, "loadManager", secondaryLoadManager, true);
1697-
channel2 = (ServiceUnitStateChannelImpl)
1698-
FieldUtils.readField(secondaryLoadManager, "serviceUnitStateChannel", true);
1699-
}
1700-
1701-
private CompletableFuture<NamespaceBundle> getBundleAsync(PulsarService pulsar, TopicName topic) {
1702-
return pulsar.getNamespaceService().getBundleAsync(topic);
1703-
}
1704-
1705-
private Pair<TopicName, NamespaceBundle> getBundleIsNotOwnByChangeEventTopic(String topicNamePrefix)
1706-
throws Exception {
1707-
TopicName changeEventsTopicName =
1708-
TopicName.get(defaultTestNamespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME);
1709-
NamespaceBundle changeEventsBundle = getBundleAsync(pulsar1, changeEventsTopicName).get();
1710-
int i = 0;
1711-
while(true) {
1712-
TopicName topicName = TopicName.get(defaultTestNamespace + "/" + topicNamePrefix + "-" + i);
1713-
NamespaceBundle bundle = getBundleAsync(pulsar1, topicName).get();
1714-
if (!bundle.equals(changeEventsBundle)) {
1715-
return Pair.of(topicName, bundle);
1716-
}
1717-
i++;
1718-
}
1719-
}
17201601
}

0 commit comments

Comments
 (0)