Skip to content

Commit

Permalink
Fix failed tests
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Sep 19, 2024
1 parent 157a38e commit ab53dca
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1086,26 +1086,26 @@ public void testPerProducerStats() throws Exception {
List<Metric> cm = (List<Metric>) metrics.get("pulsar_producer_msg_rate_in");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
assertEquals(cm.get(0).tags.get("producer_id"), "1");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("producer_name"), "producer1");
assertEquals(cm.get(0).tags.get("producer_id"), "0");

assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
assertEquals(cm.get(1).tags.get("producer_id"), "0");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("producer_name"), "producer2");
assertEquals(cm.get(1).tags.get("producer_id"), "1");

cm = (List<Metric>) metrics.get("pulsar_producer_msg_throughput_in");
assertEquals(cm.size(), 2);
assertEquals(cm.get(0).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(0).tags.get("producer_name"), "producer2");
assertEquals(cm.get(0).tags.get("producer_id"), "1");
assertEquals(cm.get(0).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(0).tags.get("producer_name"), "producer1");
assertEquals(cm.get(0).tags.get("producer_id"), "0");

assertEquals(cm.get(1).tags.get("namespace"), "my-property/use/my-ns");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic1");
assertEquals(cm.get(1).tags.get("producer_name"), "producer1");
assertEquals(cm.get(1).tags.get("producer_id"), "0");
assertEquals(cm.get(1).tags.get("topic"), "persistent://my-property/use/my-ns/my-topic2");
assertEquals(cm.get(1).tags.get("producer_name"), "producer2");
assertEquals(cm.get(1).tags.get("producer_id"), "1");

p1.close();
p2.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -201,20 +200,14 @@ private void recoverTest(String testTopic) throws Exception {

Awaitility.await().until(() -> {
for (int i = 0; i < getPulsarServiceList().size(); i++) {
Field field = BrokerService.class.getDeclaredField("topics");
field.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
final var topics = getPulsarServiceList().get(i).getBrokerService().getTopics();
CompletableFuture<Optional<Topic>> completableFuture = topics.get("persistent://" + testTopic);
if (completableFuture != null) {
Optional<Topic> topic = completableFuture.get();
if (topic.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) topic.get();
field = PersistentTopic.class.getDeclaredField("transactionBuffer");
field.setAccessible(true);
TopicTransactionBuffer topicTransactionBuffer =
(TopicTransactionBuffer) field.get(persistentTopic);
(TopicTransactionBuffer) persistentTopic.getTransactionBuffer();
if (topicTransactionBuffer.checkIfReady()) {
return true;
} else {
Expand Down Expand Up @@ -455,17 +448,13 @@ private void testTopicTransactionBufferDeleteAbort(Boolean enableSnapshotSegment
assertTrue(((MessageIdImpl) messageId2).getLedgerId() != ((MessageIdImpl) messageId1).getLedgerId());
boolean exist = false;
for (int i = 0; i < getPulsarServiceList().size(); i++) {
Field field = BrokerService.class.getDeclaredField("topics");
field.setAccessible(true);
ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics =
(ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>>) field
.get(getPulsarServiceList().get(i).getBrokerService());
final var topics = getPulsarServiceList().get(i).getBrokerService().getTopics();
CompletableFuture<Optional<Topic>> completableFuture = topics.get("persistent://" + ABORT_DELETE);
if (completableFuture != null) {
Optional<Topic> topic = completableFuture.get();
if (topic.isPresent()) {
PersistentTopic persistentTopic = (PersistentTopic) topic.get();
field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
var field = ManagedLedgerImpl.class.getDeclaredField("ledgers");
field.setAccessible(true);
NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers
= (NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>) field.get(persistentTopic.getManagedLedger());
Expand Down

0 comments on commit ab53dca

Please sign in to comment.