Skip to content

Commit f81a257

Browse files
Technoboy-manas-ctds
authored andcommitted
[fix][meta] Use getChildrenFromStore to read children data to avoid lost data (apache#24665)
(cherry picked from commit 90a70db) (cherry picked from commit e4431bc)
1 parent 8e30498 commit f81a257

File tree

10 files changed

+42
-14
lines changed

10 files changed

+42
-14
lines changed

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/api/MetadataStore.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,20 @@ default CompletableFuture<Void> sync(String path) {
7777
*/
7878
CompletableFuture<List<String>> getChildren(String path);
7979

80+
81+
/**
82+
* Return all the nodes (lexicographically sorted) that are children to the specific path.
83+
*
84+
* If the path itself does not exist, it will return an empty list.
85+
*
86+
* This method is similar to {@link #getChildren(String)}, but it attempts to read directly from
87+
* the underlying store.
88+
*
89+
* @param path
90+
* the path of the key to get from the store
91+
* @return a future to track the async request
92+
*/
93+
CompletableFuture<List<String>> getChildrenFromStore(String path);
8094
/**
8195
* Read whether a specific path exists.
8296
*

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/AbstractHierarchicalLedgerManager.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ void asyncProcessLevelNodes(
6868
final AsyncCallback.VoidCallback finalCb, final Object context,
6969
final int successRc, final int failureRc) {
7070

71-
store.getChildren(path)
71+
store.sync(path)
72+
.thenCompose(__ -> store.getChildrenFromStore(path))
7273
.thenAccept(levelNodes -> {
7374
if (levelNodes.isEmpty()) {
7475
finalCb.processResult(successRc, null, context);
@@ -162,7 +163,7 @@ long getLedgerId(String... levelNodes) throws IOException {
162163
* Process ledgers in a single zk node.
163164
*
164165
* <p>
165-
* for each ledger found in this zk node, processor#process(ledgerId) will be triggerred
166+
* for each ledger found in this zk node, processor#process(ledgerId) will be triggered
166167
* to process a specific ledger. after all ledgers has been processed, the finalCb will
167168
* be called with provided context object. The RC passed to finalCb is decided by :
168169
* <ul>
@@ -188,7 +189,8 @@ protected void asyncProcessLedgersInSingleNode(
188189
final String path, final BookkeeperInternalCallbacks.Processor<Long> processor,
189190
final AsyncCallback.VoidCallback finalCb, final Object ctx,
190191
final int successRc, final int failureRc) {
191-
store.getChildren(path)
192+
store.sync(path)
193+
.thenCompose(__ -> store.getChildrenFromStore(path))
192194
.thenAccept(ledgerNodes -> {
193195
Set<Long> activeLedgers = HierarchicalLedgerUtils.ledgerListToSet(ledgerNodes,
194196
ledgerRootPath, path);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LegacyHierarchicalLedgerRangeIterator.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public LegacyHierarchicalLedgerRangeIterator(MetadataStore store, String ledgers
6969
* Iterate next level1 znode.
7070
*
7171
* @return false if have visited all level1 nodes
72-
* @throws InterruptedException/KeeperException if error occurs reading zookeeper children
72+
* @throws InterruptedException/ExecutionException/TimeoutException if error occurs reading zookeeper children
7373
*/
7474
private boolean nextL1Node() throws ExecutionException, InterruptedException, TimeoutException {
7575
l2NodesIter = null;
@@ -83,7 +83,9 @@ private boolean nextL1Node() throws ExecutionException, InterruptedException, Ti
8383
if (!isLedgerParentNode(curL1Nodes)) {
8484
continue;
8585
}
86-
List<String> l2Nodes = store.getChildren(ledgersRoot + "/" + curL1Nodes)
86+
String path = ledgersRoot + "/" + curL1Nodes;
87+
List<String> l2Nodes = store.sync(path)
88+
.thenCompose(__ -> store.getChildrenFromStore(path))
8789
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
8890
l2NodesIter = l2Nodes.iterator();
8991
if (!l2NodesIter.hasNext()) {
@@ -99,7 +101,8 @@ private synchronized void preload() throws IOException {
99101
boolean hasMoreElements = false;
100102
try {
101103
if (l1NodesIter == null) {
102-
List<String> l1Nodes = store.getChildren(ledgersRoot)
104+
List<String> l1Nodes = store.sync(ledgersRoot)
105+
.thenCompose(__ -> store.getChildrenFromStore(ledgersRoot))
103106
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
104107
l1NodesIter = l1Nodes.iterator();
105108
hasMoreElements = nextL1Node();
@@ -162,7 +165,8 @@ LedgerManager.LedgerRange getLedgerRangeByLevel(final String level1, final Strin
162165
String nodePath = nodeBuilder.toString();
163166
List<String> ledgerNodes = null;
164167
try {
165-
ledgerNodes = store.getChildren(nodePath).get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
168+
ledgerNodes = store.sync(nodePath).thenCompose(__ -> store.getChildrenFromStore(nodePath))
169+
.get(BLOCKING_CALL_TIMEOUT, MILLISECONDS);
166170
} catch (ExecutionException | TimeoutException e) {
167171
throw new IOException("Error when get child nodes from zk", e);
168172
} catch (InterruptedException e) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/LongHierarchicalLedgerRangeIterator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LongHierarchicalLedgerRangeIterator implements LedgerManager.LedgerRangeIt
5959
*/
6060
List<String> getChildrenAt(String path) throws IOException {
6161
try {
62-
return store.getChildren(path)
62+
return store.sync(path).thenCompose(__ -> store.getChildrenFromStore(path))
6363
.get(AbstractMetadataDriver.BLOCKING_CALL_TIMEOUT, TimeUnit.MILLISECONDS);
6464
} catch (ExecutionException | TimeoutException e) {
6565
if (log.isDebugEnabled()) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/AbstractMetadataStore.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,6 @@ public abstract class AbstractMetadataStore implements MetadataStoreExtended, Co
8787

8888
protected final AtomicBoolean isClosed = new AtomicBoolean(false);
8989

90-
protected abstract CompletableFuture<List<String>> getChildrenFromStore(String path);
91-
9290
protected abstract CompletableFuture<Boolean> existsFromStore(String path);
9391

9492
protected AbstractMetadataStore(String metadataStoreName, OpenTelemetry openTelemetry) {

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/FaultInjectionMetadataStore.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,16 @@ public CompletableFuture<List<String>> getChildren(String path) {
9494
return store.getChildren(path);
9595
}
9696

97+
@Override
98+
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
99+
Optional<MetadataStoreException> ex = programmedFailure(OperationType.GET_CHILDREN, path);
100+
if (ex.isPresent()) {
101+
return FutureUtil.failedFuture(ex.get());
102+
}
103+
104+
return store.getChildrenFromStore(path);
105+
}
106+
97107
@Override
98108
public CompletableFuture<Boolean> exists(String path) {
99109
Optional<MetadataStoreException> ex = programmedFailure(OperationType.EXISTS, path);

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/RocksdbMetadataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -404,7 +404,7 @@ public CompletableFuture<Optional<GetResult>> storeGet(String path) {
404404
}
405405

406406
@Override
407-
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
407+
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
408408
if (log.isDebugEnabled()) {
409409
log.debug("getChildrenFromStore.path={},instanceId={}", path, instanceId);
410410
}

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/batching/AbstractBatchedMetadataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public final CompletableFuture<Optional<GetResult>> storeGet(String path) {
144144
}
145145

146146
@Override
147-
protected final CompletableFuture<List<String>> getChildrenFromStore(String path) {
147+
public final CompletableFuture<List<String>> getChildrenFromStore(String path) {
148148
OpGetChildren op = new OpGetChildren(path);
149149
enqueue(readOps, op);
150150
return op.getFuture();

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/oxia/OxiaMetadataStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ Stat convertStat(String path, Version version) {
147147
}
148148

149149
@Override
150-
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
150+
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
151151
var pathWithSlash = path + "/";
152152

153153
return client

pulsar-metadata/src/test/java/org/apache/pulsar/metadata/impl/MetadataStoreFactoryImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ protected MyMetadataStore() {
100100
}
101101

102102
@Override
103-
protected CompletableFuture<List<String>> getChildrenFromStore(String path) {
103+
public CompletableFuture<List<String>> getChildrenFromStore(String path) {
104104
return null;
105105
}
106106

0 commit comments

Comments
 (0)