From 768e22ab7e9369f1eba086aabf61cca5474710da Mon Sep 17 00:00:00 2001 From: rickr Date: Mon, 11 Mar 2019 16:36:10 -0400 Subject: [PATCH] FABJ-420 Switch to CompletableFutures Change-Id: Ia8dd4080e9f7ca255a6ff3fd3a2d295755db8aac Signed-off-by: rickr --- pom.xml | 21 +++++++--- .../fabric/sdk/EndorserClient.java | 33 +++++++++++---- .../java/org/hyperledger/fabric/sdk/Peer.java | 42 +++++++++++-------- .../hyperledger/fabric/sdk/ChannelTest.java | 18 ++++---- .../org/hyperledger/fabric/sdk/PeerTest.java | 11 +++-- 5 files changed, 84 insertions(+), 41 deletions(-) diff --git a/pom.xml b/pom.xml index e0924c77..8db58cc5 100644 --- a/pom.xml +++ b/pom.xml @@ -28,10 +28,10 @@ fabric-sdk-java-1.0 - 1.18.0 - 3.6.1 - 1.60 - 4.5.6 + 1.19.0 + 3.7.0 + 1.61 + 4.5.7 3.0.1 true 8.1.7.v20160121 @@ -178,6 +178,17 @@ test + + com.spotify + futures-extra + 4.1.3 + + + com.google.api + api-common + 1.7.0 + provided + @@ -205,7 +216,7 @@ org.yaml snakeyaml - 1.23 + 1.24 diff --git a/src/main/java/org/hyperledger/fabric/sdk/EndorserClient.java b/src/main/java/org/hyperledger/fabric/sdk/EndorserClient.java index 2167ef24..1919d3c8 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/EndorserClient.java +++ b/src/main/java/org/hyperledger/fabric/sdk/EndorserClient.java @@ -14,9 +14,11 @@ package org.hyperledger.fabric.sdk; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; -import com.google.common.util.concurrent.ListenableFuture; +import com.spotify.futures.CompletableFuturesExtra; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.commons.logging.Log; @@ -110,18 +112,35 @@ synchronized void shutdown(boolean force) { } } - public ListenableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) throws PeerException { + public CompletableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) { if (shutdown) { - throw new PeerException("Shutdown " + toString()); + CompletableFuture ret = new CompletableFuture<>(); + ret.completeExceptionally(new PeerException("Shutdown " + toString())); + return ret; } - return futureStub.processProposal(proposal); + + CompletableFuture future = CompletableFuturesExtra.toCompletableFuture(futureStub.processProposal(proposal)); + + return future.exceptionally(throwable -> { + throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable); + }); + + // return CompletableFuturesExtra.toCompletableFuture(futureStub.processProposal(proposal)); + // return futureStub.processProposal(proposal); } - public ListenableFuture sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) throws PeerException { + public CompletableFuture sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) { if (shutdown) { - throw new PeerException("Shutdown " + toString()); + CompletableFuture ret = new CompletableFuture<>(); + ret.completeExceptionally(new PeerException("Shutdown " + toString())); + return ret; } - return discoveryFutureStub.discover(signedRequest); + + CompletableFuture future = CompletableFuturesExtra.toCompletableFuture(discoveryFutureStub.discover(signedRequest)); + return future.exceptionally(throwable -> { + throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable); + }); + } boolean isChannelActive() { diff --git a/src/main/java/org/hyperledger/fabric/sdk/Peer.java b/src/main/java/org/hyperledger/fabric/sdk/Peer.java index 9e97b9e4..5da7c28e 100644 --- a/src/main/java/org/hyperledger/fabric/sdk/Peer.java +++ b/src/main/java/org/hyperledger/fabric/sdk/Peer.java @@ -20,10 +20,11 @@ import java.util.EnumSet; import java.util.Objects; import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; -import com.google.common.util.concurrent.ListenableFuture; import io.netty.util.internal.StringUtil; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -220,9 +221,14 @@ public int hashCode() { return Objects.hash(name, url); } - ListenableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) - throws PeerException, InvalidArgumentException { - checkSendProposal(proposal); + CompletableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) { + try { + checkSendProposal(proposal); + } catch (Exception e) { + CompletableFuture future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } if (IS_DEBUG_LEVEL) { logger.debug(format("peer.sendProposalAsync %s", toString())); @@ -230,12 +236,14 @@ ListenableFuture sendProposalAsync(Fabr EndorserClient localEndorserClient = getEndorserClient(); - try { - return localEndorserClient.sendProposalAsync(proposal); - } catch (Throwable t) { + return localEndorserClient.sendProposalAsync(proposal).exceptionally(throwable -> { removeEndorserClient(true); - throw t; - } + if (throwable instanceof CompletionException) { + throw (CompletionException) throwable; + } + throw new CompletionException(throwable); + }); + } private synchronized EndorserClient getEndorserClient() { @@ -274,19 +282,18 @@ private synchronized void removeEndorserClient(boolean force) { } } - ListenableFuture sendDiscoveryRequestAsync(Protocol.SignedRequest discoveryRequest) - throws PeerException, InvalidArgumentException { - + CompletableFuture sendDiscoveryRequestAsync(Protocol.SignedRequest discoveryRequest) { logger.debug(format("peer.sendDiscoveryRequstAsync %s", toString())); EndorserClient localEndorserClient = getEndorserClient(); - try { - return localEndorserClient.sendDiscoveryRequestAsync(discoveryRequest); - } catch (Throwable t) { + return localEndorserClient.sendDiscoveryRequestAsync(discoveryRequest).exceptionally(throwable -> { removeEndorserClient(true); - throw t; - } + if (throwable instanceof CompletionException) { + throw (CompletionException) throwable; + } + throw new CompletionException(throwable); + }); } synchronized byte[] getClientTLSCertificateDigest() { @@ -630,6 +637,7 @@ public enum PeerRole { public String getPropertyName() { return propertyName; } + } String getEndpoint() { diff --git a/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java b/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java index 5c69f636..4ad3e846 100644 --- a/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java +++ b/src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java @@ -30,8 +30,6 @@ import java.util.Properties; import java.util.concurrent.CompletableFuture; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.protobuf.ByteString; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -791,8 +789,9 @@ public void testQueryInstalledChaincodesERROR() throws Exception { final Channel channel = createRunningChannel(null); Peer peer = channel.getPeers().iterator().next(); - final SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new Error("Error bad bad bad")); + final CompletableFuture settableFuture = new CompletableFuture<>(); + // settableFuture.setException(new Error("Error bad bad bad")); + settableFuture.completeExceptionally(new Error("Error bad bad bad")); setField(peer, "endorserClent", new MockEndorserClient(settableFuture)); hfclient.queryChannels(peer); @@ -808,8 +807,9 @@ public void testQueryInstalledChaincodesStatusRuntimeException() throws Exceptio final Channel channel = createRunningChannel(null); Peer peer = channel.getPeers().iterator().next(); - final SettableFuture settableFuture = SettableFuture.create(); - settableFuture.setException(new StatusRuntimeException(Status.ABORTED)); + final CompletableFuture settableFuture = new CompletableFuture<>(); + settableFuture.completeExceptionally(new StatusRuntimeException(Status.ABORTED)); + setField(peer, "endorserClent", new MockEndorserClient(settableFuture)); hfclient.queryChannels(peer); @@ -1016,7 +1016,7 @@ public void testProposalBuilderWithMetaInfEmpty() throws Exception { class MockEndorserClient extends EndorserClient { final Throwable throwThis; - private final ListenableFuture returnedFuture; + private final CompletableFuture returnedFuture; MockEndorserClient(Throwable throwThis) { super("blahchannlname", "blahpeerName", "blahURL", new Endpoint("grpc://loclhost:99", null).getChannelBuilder()); @@ -1027,14 +1027,14 @@ class MockEndorserClient extends EndorserClient { this.returnedFuture = null; } - MockEndorserClient(ListenableFuture returnedFuture) { + MockEndorserClient(CompletableFuture returnedFuture) { super("blahchannlname", "blahpeerName", "blahURL", new Endpoint("grpc://loclhost:99", null).getChannelBuilder()); this.throwThis = null; this.returnedFuture = returnedFuture; } @Override - public ListenableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) throws PeerException { + public CompletableFuture sendProposalAsync(FabricProposal.SignedProposal proposal) { if (throwThis != null) { getUnsafe().throwException(throwThis); } diff --git a/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java b/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java index 3d33613d..a6efb57c 100644 --- a/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java +++ b/src/test/java/org/hyperledger/fabric/sdk/PeerTest.java @@ -14,6 +14,10 @@ package org.hyperledger.fabric.sdk; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.hyperledger.fabric.protos.peer.FabricProposalResponse; import org.hyperledger.fabric.sdk.exception.InvalidArgumentException; import org.hyperledger.fabric.sdk.exception.PeerException; import org.junit.Assert; @@ -66,9 +70,10 @@ public void testSetEmptyName() throws InvalidArgumentException { Assert.fail("expected set empty name to throw exception."); } - @Test (expected = PeerException.class) - public void testSendAsyncNullProposal() throws PeerException, InvalidArgumentException { - peer.sendProposalAsync(null); + @Test (expected = Exception.class) + public void testSendAsyncNullProposal() throws PeerException, InvalidArgumentException, ExecutionException, InterruptedException { + Future future = peer.sendProposalAsync(null); + future.get(); } @Test (expected = InvalidArgumentException.class)