diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java index 54120732b0b38..63efba8f7246d 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/LocalMemoryMetadataStore.java @@ -161,24 +161,21 @@ public CompletableFuture storePut(String path, byte[] data, Optional long now = System.currentTimeMillis(); - CompletableFuture future = new CompletableFuture<>(); if (hasVersion && expectedVersion == -1) { Value newValue = new Value(0, data, now, now, options.contains(CreateOption.Ephemeral)); Value existingValue = map.putIfAbsent(path, newValue); if (existingValue != null) { - execute(() -> future.completeExceptionally(new BadVersionException("")), future); + return FutureUtils.exception(new BadVersionException("")); } else { receivedNotification(new Notification(NotificationType.Created, path)); notifyParentChildrenChanged(path); - String finalPath = path; - execute(() -> future.complete(new Stat(finalPath, 0, now, now, newValue.isEphemeral(), - true)), future); + return FutureUtils.value(new Stat(path, 0, now, now, newValue.isEphemeral(), true)); } } else { Value existingValue = map.get(path); long existingVersion = existingValue != null ? existingValue.version : -1; if (hasVersion && expectedVersion != existingVersion) { - execute(() -> future.completeExceptionally(new BadVersionException("")), future); + return FutureUtils.exception(new BadVersionException("")); } else { long newVersion = existingValue != null ? existingValue.version + 1 : 0; long createdTimestamp = existingValue != null ? existingValue.createdTimestamp : now; @@ -192,13 +189,12 @@ public CompletableFuture storePut(String path, byte[] data, Optional if (type == NotificationType.Created) { notifyParentChildrenChanged(path); } - String finalPath = path; - execute(() -> future.complete(new Stat(finalPath, newValue.version, newValue.createdTimestamp, - newValue.modifiedTimestamp, - false, true)), future); + return FutureUtils + .value(new Stat(path, newValue.version, newValue.createdTimestamp, + newValue.modifiedTimestamp, + false, true)); } } - return future; } } @@ -208,20 +204,18 @@ public CompletableFuture storeDelete(String path, Optional optExpect return FutureUtil.failedFuture(new MetadataStoreException.InvalidPathException(path)); } synchronized (map) { - CompletableFuture future = new CompletableFuture<>(); Value value = map.get(path); if (value == null) { - execute(() -> future.completeExceptionally(new NotFoundException("")), future); + return FutureUtils.exception(new NotFoundException("")); } else if (optExpectedVersion.isPresent() && optExpectedVersion.get() != value.version) { - execute(() -> future.completeExceptionally(new BadVersionException("")), future); + return FutureUtils.exception(new BadVersionException("")); } else { map.remove(path); receivedNotification(new Notification(NotificationType.Deleted, path)); notifyParentChildrenChanged(path); - execute(() -> future.complete(null), future); + return FutureUtils.value(null); } - return future; } } }