Skip to content

Commit

Permalink
FABJ-420 Switch to CompletableFutures
Browse files Browse the repository at this point in the history
Change-Id: Ia8dd4080e9f7ca255a6ff3fd3a2d295755db8aac
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Mar 14, 2019
1 parent 8bdcbe9 commit 768e22a
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 41 deletions.
21 changes: 16 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@
<tag>fabric-sdk-java-1.0</tag>
</scm>
<properties>
<grpc.version>1.18.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.6.1</protobuf.version>
<bouncycastle.version>1.60</bouncycastle.version>
<httpclient.version>4.5.6</httpclient.version>
<grpc.version>1.19.0</grpc.version><!-- CURRENT_GRPC_VERSION -->
<protobuf.version>3.7.0</protobuf.version>
<bouncycastle.version>1.61</bouncycastle.version>
<httpclient.version>4.5.7</httpclient.version>
<javadoc.version>3.0.1</javadoc.version>
<skipITs>true</skipITs>
<alpn-boot-version>8.1.7.v20160121</alpn-boot-version>
Expand Down Expand Up @@ -178,6 +178,17 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.spotify</groupId>
<artifactId>futures-extra</artifactId>
<version>4.1.3</version>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
<version>1.7.0</version>
<scope>provided</scope><!-- so only used for compilation to work, not needed at runtime -->
</dependency>

<!-- https://mvnrepository.com/artifact/org.glassfish/javax.json -->
<dependency>
Expand Down Expand Up @@ -205,7 +216,7 @@
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
<version>1.23</version>
<version>1.24</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.miracl.milagro.amcl/milagro-crypto-java -->
Expand Down
33 changes: 26 additions & 7 deletions src/main/java/org/hyperledger/fabric/sdk/EndorserClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@

package org.hyperledger.fabric.sdk;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

import com.google.common.util.concurrent.ListenableFuture;
import com.spotify.futures.CompletableFuturesExtra;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.logging.Log;
Expand Down Expand Up @@ -110,18 +112,35 @@ synchronized void shutdown(boolean force) {
}
}

public ListenableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal) throws PeerException {
public CompletableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal) {
if (shutdown) {
throw new PeerException("Shutdown " + toString());
CompletableFuture<FabricProposalResponse.ProposalResponse> ret = new CompletableFuture<>();
ret.completeExceptionally(new PeerException("Shutdown " + toString()));
return ret;
}
return futureStub.processProposal(proposal);

CompletableFuture<FabricProposalResponse.ProposalResponse> future = CompletableFuturesExtra.toCompletableFuture(futureStub.processProposal(proposal));

return future.exceptionally(throwable -> {
throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable);
});

// return CompletableFuturesExtra.toCompletableFuture(futureStub.processProposal(proposal));
// return futureStub.processProposal(proposal);
}

public ListenableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) throws PeerException {
public CompletableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest signedRequest) {
if (shutdown) {
throw new PeerException("Shutdown " + toString());
CompletableFuture<Protocol.Response> ret = new CompletableFuture<>();
ret.completeExceptionally(new PeerException("Shutdown " + toString()));
return ret;
}
return discoveryFutureStub.discover(signedRequest);

CompletableFuture<Protocol.Response> future = CompletableFuturesExtra.toCompletableFuture(discoveryFutureStub.discover(signedRequest));
return future.exceptionally(throwable -> {
throw new CompletionException(format("%s %s", toString, throwable.getMessage()), throwable);
});

}

boolean isChannelActive() {
Expand Down
42 changes: 25 additions & 17 deletions src/main/java/org/hyperledger/fabric/sdk/Peer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import java.util.EnumSet;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.util.concurrent.ListenableFuture;
import io.netty.util.internal.StringUtil;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -220,22 +221,29 @@ public int hashCode() {
return Objects.hash(name, url);
}

ListenableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal)
throws PeerException, InvalidArgumentException {
checkSendProposal(proposal);
CompletableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal) {
try {
checkSendProposal(proposal);
} catch (Exception e) {
CompletableFuture<FabricProposalResponse.ProposalResponse> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}

if (IS_DEBUG_LEVEL) {
logger.debug(format("peer.sendProposalAsync %s", toString()));
}

EndorserClient localEndorserClient = getEndorserClient();

try {
return localEndorserClient.sendProposalAsync(proposal);
} catch (Throwable t) {
return localEndorserClient.sendProposalAsync(proposal).exceptionally(throwable -> {
removeEndorserClient(true);
throw t;
}
if (throwable instanceof CompletionException) {
throw (CompletionException) throwable;
}
throw new CompletionException(throwable);
});

}

private synchronized EndorserClient getEndorserClient() {
Expand Down Expand Up @@ -274,19 +282,18 @@ private synchronized void removeEndorserClient(boolean force) {
}
}

ListenableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest discoveryRequest)
throws PeerException, InvalidArgumentException {

CompletableFuture<Protocol.Response> sendDiscoveryRequestAsync(Protocol.SignedRequest discoveryRequest) {
logger.debug(format("peer.sendDiscoveryRequstAsync %s", toString()));

EndorserClient localEndorserClient = getEndorserClient();

try {
return localEndorserClient.sendDiscoveryRequestAsync(discoveryRequest);
} catch (Throwable t) {
return localEndorserClient.sendDiscoveryRequestAsync(discoveryRequest).exceptionally(throwable -> {
removeEndorserClient(true);
throw t;
}
if (throwable instanceof CompletionException) {
throw (CompletionException) throwable;
}
throw new CompletionException(throwable);
});
}

synchronized byte[] getClientTLSCertificateDigest() {
Expand Down Expand Up @@ -630,6 +637,7 @@ public enum PeerRole {
public String getPropertyName() {
return propertyName;
}

}

String getEndpoint() {
Expand Down
18 changes: 9 additions & 9 deletions src/test/java/org/hyperledger/fabric/sdk/ChannelTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.Properties;
import java.util.concurrent.CompletableFuture;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -791,8 +789,9 @@ public void testQueryInstalledChaincodesERROR() throws Exception {
final Channel channel = createRunningChannel(null);
Peer peer = channel.getPeers().iterator().next();

final SettableFuture<FabricProposalResponse.ProposalResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new Error("Error bad bad bad"));
final CompletableFuture<FabricProposalResponse.ProposalResponse> settableFuture = new CompletableFuture<>();
// settableFuture.setException(new Error("Error bad bad bad"));
settableFuture.completeExceptionally(new Error("Error bad bad bad"));
setField(peer, "endorserClent", new MockEndorserClient(settableFuture));

hfclient.queryChannels(peer);
Expand All @@ -808,8 +807,9 @@ public void testQueryInstalledChaincodesStatusRuntimeException() throws Exceptio
final Channel channel = createRunningChannel(null);
Peer peer = channel.getPeers().iterator().next();

final SettableFuture<FabricProposalResponse.ProposalResponse> settableFuture = SettableFuture.create();
settableFuture.setException(new StatusRuntimeException(Status.ABORTED));
final CompletableFuture<FabricProposalResponse.ProposalResponse> settableFuture = new CompletableFuture<>();
settableFuture.completeExceptionally(new StatusRuntimeException(Status.ABORTED));

setField(peer, "endorserClent", new MockEndorserClient(settableFuture));

hfclient.queryChannels(peer);
Expand Down Expand Up @@ -1016,7 +1016,7 @@ public void testProposalBuilderWithMetaInfEmpty() throws Exception {

class MockEndorserClient extends EndorserClient {
final Throwable throwThis;
private final ListenableFuture<FabricProposalResponse.ProposalResponse> returnedFuture;
private final CompletableFuture<FabricProposalResponse.ProposalResponse> returnedFuture;

MockEndorserClient(Throwable throwThis) {
super("blahchannlname", "blahpeerName", "blahURL", new Endpoint("grpc://loclhost:99", null).getChannelBuilder());
Expand All @@ -1027,14 +1027,14 @@ class MockEndorserClient extends EndorserClient {
this.returnedFuture = null;
}

MockEndorserClient(ListenableFuture<FabricProposalResponse.ProposalResponse> returnedFuture) {
MockEndorserClient(CompletableFuture<FabricProposalResponse.ProposalResponse> returnedFuture) {
super("blahchannlname", "blahpeerName", "blahURL", new Endpoint("grpc://loclhost:99", null).getChannelBuilder());
this.throwThis = null;
this.returnedFuture = returnedFuture;
}

@Override
public ListenableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal) throws PeerException {
public CompletableFuture<FabricProposalResponse.ProposalResponse> sendProposalAsync(FabricProposal.SignedProposal proposal) {
if (throwThis != null) {
getUnsafe().throwException(throwThis);
}
Expand Down
11 changes: 8 additions & 3 deletions src/test/java/org/hyperledger/fabric/sdk/PeerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

package org.hyperledger.fabric.sdk;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.hyperledger.fabric.protos.peer.FabricProposalResponse;
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
import org.hyperledger.fabric.sdk.exception.PeerException;
import org.junit.Assert;
Expand Down Expand Up @@ -66,9 +70,10 @@ public void testSetEmptyName() throws InvalidArgumentException {
Assert.fail("expected set empty name to throw exception.");
}

@Test (expected = PeerException.class)
public void testSendAsyncNullProposal() throws PeerException, InvalidArgumentException {
peer.sendProposalAsync(null);
@Test (expected = Exception.class)
public void testSendAsyncNullProposal() throws PeerException, InvalidArgumentException, ExecutionException, InterruptedException {
Future<FabricProposalResponse.ProposalResponse> future = peer.sendProposalAsync(null);
future.get();
}

@Test (expected = InvalidArgumentException.class)
Expand Down

0 comments on commit 768e22a

Please sign in to comment.