diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index eab2054939048..3a16c4068a03e 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -168,21 +168,24 @@ public CompletableFuture storePut(String path, byte[] data, Optional long now = System.currentTimeMillis(); + CompletableFuture future = new CompletableFuture<>(); if (hasVersion && expectedVersion == -1) { Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral)); Value existingValue = map.putIfAbsent(path, newValue); if (existingValue != null) { - return FutureUtils.exception(new BadVersionException("")); + execute(() -> future.completeExceptionally(new BadVersionException("")), future); } else { receivedNotification(new Notification(NotificationType.Created, path)); notifyParentChildrenChanged(path); - return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true)); + String finalPath = path; + execute(() -> future.complete(new Stat(finalPath, 0, now, now, newValue.isEphemeral(), + true)), future); } } else { Value existingValue = map.get(path); long existingVersion = existingValue != null ? existingValue.version : -1; if (hasVersion && expectedVersion != existingVersion) { - return FutureUtils.exception(new BadVersionException("")); + execute(() -> future.completeExceptionally(new BadVersionException("")), future); } else { long newVersion = existingValue != null ? existingValue.version + 1 : 0; long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now; @@ -196,12 +199,13 @@ public CompletableFuture storePut(String path, byte[] data, Optional if (type == NotificationType.Created) { notifyParentChildrenChanged(path); } - return FutureUtils - .value(new Stat(path, newValue.version, newValue.createdTimestamp, - newValue.modifiedTimestamp, - false, true)); + String finalPath = path; + execute(() -> future.complete(new Stat(finalPath, newValue.version, newValue.createdTimestamp, + newValue.modifiedTimestamp, + false, true)), future); } } + return future; } } @@ -211,18 +215,20 @@ public CompletableFuture storeDelete(String path, Optional optExpect return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); } synchronized (map) { + CompletableFuture future = new CompletableFuture<>(); Value value = map.get(path); if (value == null) { - return FutureUtils.exception(new NotFoundException("")); + execute(() -> future.completeExceptionally(new NotFoundException("")), future); } else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) { - return FutureUtils.exception(new BadVersionException("")); + execute(() -> future.completeExceptionally(new BadVersionException("")), future); } else { map.remove(path); receivedNotification(new Notification(NotificationType.Deleted, path)); notifyParentChildrenChanged(path); - return FutureUtils.value(null); + execute(() -> future.complete(null), future); } + return future; } } }