Skip to content

Commit

Permalink
FAB-4423 Threading issues usercontext on client.
Browse files Browse the repository at this point in the history
Change-Id: I39a6132d5f189e157a41997d725139d5d6b39fd6
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Jun 6, 2017
1 parent 00e6c80 commit 99f321b
Show file tree
Hide file tree
Showing 15 changed files with 322 additions and 128 deletions.
4 changes: 3 additions & 1 deletion checkstyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@
<!-- See http://checkstyle.sf.net/config_naming.html -->
<module name="ConstantName">
<property name="format"
value="^log(ger)?|[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$"/>
value="^log(ger)?|config|[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$"/>
<property
name="applyToPrivate" value="false"/>
</module>
<module name="LocalFinalVariableName"/>
<module name="LocalVariableName"/>
Expand Down
200 changes: 125 additions & 75 deletions src/main/java/org/hyperledger/fabric/sdk/Channel.java

Large diffs are not rendered by default.

21 changes: 13 additions & 8 deletions src/main/java/org/hyperledger/fabric/sdk/HFClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public CryptoSuite getCryptoSuite() {
}

public void setCryptoSuite(CryptoSuite cryptoSuite) throws CryptoException, InvalidArgumentException {
if (this.cryptoSuite != null) {
throw new InvalidArgumentException("CryptoSuite may only be set once.");

}
this.cryptoSuite = cryptoSuite;
this.cryptoSuite.init();
}
Expand Down Expand Up @@ -111,11 +115,11 @@ public Channel newChannel(String name) throws InvalidArgumentException {
/**
* Create a new channel
*
* @param name The channel's name
* @param orderer Orderer to create the channel with.
* @param name The channel's name
* @param orderer Orderer to create the channel with.
* @param channelConfiguration Channel configuration data.
* @param channelConfigurationSignatures byte arrays containing ConfigSignature's proto serialized.
* See {@link Channel#getChannelConfigurationSignature} on how to create
* See {@link Channel#getChannelConfigurationSignature} on how to create
* @return a new channel.
* @throws TransactionException
* @throws InvalidArgumentException
Expand Down Expand Up @@ -195,7 +199,7 @@ public Channel getChannel(String name) {
* @return InstallProposalRequest
*/
public InstallProposalRequest newInstallProposalRequest() {
return new InstallProposalRequest();
return new InstallProposalRequest(userContext);
}

/**
Expand All @@ -205,11 +209,11 @@ public InstallProposalRequest newInstallProposalRequest() {
*/

public InstantiateProposalRequest newInstantiationProposalRequest() {
return new InstantiateProposalRequest();
return new InstantiateProposalRequest(userContext);
}

public UpgradeProposalRequest newUpgradeProposalRequest() {
return new UpgradeProposalRequest();
return new UpgradeProposalRequest(userContext);
}

/**
Expand All @@ -219,7 +223,7 @@ public UpgradeProposalRequest newUpgradeProposalRequest() {
*/

public TransactionProposalRequest newTransactionProposalRequest() {
return TransactionProposalRequest.newInstance();
return TransactionProposalRequest.newInstance(userContext);
}

/**
Expand All @@ -229,7 +233,7 @@ public TransactionProposalRequest newTransactionProposalRequest() {
*/

public QueryByChaincodeRequest newQueryProposalRequest() {
return QueryByChaincodeRequest.newInstance();
return QueryByChaincodeRequest.newInstance(userContext);
}

/**
Expand Down Expand Up @@ -468,6 +472,7 @@ public byte[] getChannelConfigurationSignature(ChannelConfiguration channelConfi
public Collection<ProposalResponse> sendInstallProposal(InstallProposalRequest installProposalRequest, Collection<Peer> peers)
throws ProposalException, InvalidArgumentException {

installProposalRequest.setSubmitted();
Channel systemChannel = Channel.newSystemChannel(this);

return systemChannel.sendInstallProposal(installProposalRequest, peers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -27,6 +27,10 @@ public class InstallProposalRequest extends TransactionRequest {
private File chaincodeSourceLocation = null;
private InputStream chaincodeInputStream = null;

InstallProposalRequest(User userContext) {
super(userContext);
}

public InputStream getChaincodeInputStream() {
return chaincodeInputStream;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public class InstantiateProposalRequest extends TransactionRequest {

InstantiateProposalRequest(User userContext) {
super(userContext);
}

/**
* Transient data added to the proposal that is not added to the ledger.
*
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/hyperledger/fabric/sdk/Orderer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void setChannel(Channel channel) throws InvalidArgumentException {
throw new InvalidArgumentException("setChannel Channel can not be null");
}

if (null != this.channel) {
if (null != this.channel && this.channel != channel) {
throw new InvalidArgumentException(format("Can not add orderer %s to channel %s because it already belongs to channel %s.",
name, channel.getName(), this.channel.getName()));
}
Expand Down Expand Up @@ -125,7 +125,7 @@ Ab.BroadcastResponse sendTransaction(Common.Envelope transaction) throws Excepti
OrdererClient localOrdererClient = ordererClient;

if (localOrdererClient == null || !localOrdererClient.isChannelActive()) {
localOrdererClient = ordererClient = new OrdererClient(new Endpoint(url, properties).getChannelBuilder());
localOrdererClient = ordererClient = new OrdererClient(this, new Endpoint(url, properties).getChannelBuilder());
}

try {
Expand Down Expand Up @@ -160,7 +160,7 @@ DeliverResponse[] sendDeliver(Common.Envelope transaction) throws TransactionExc

logger.debug(format("Order.sendDeliver name: %s, url: %s", name, url));
if (localOrdererClient == null || !localOrdererClient.isChannelActive()) {
ordererClient =localOrdererClient = new OrdererClient(new Endpoint(url, properties).getChannelBuilder());
ordererClient = localOrdererClient = new OrdererClient(this, new Endpoint(url, properties).getChannelBuilder());
}

try {
Expand Down
28 changes: 20 additions & 8 deletions src/main/java/org/hyperledger/fabric/sdk/OrdererClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,21 +30,28 @@
import org.hyperledger.fabric.protos.orderer.AtomicBroadcastGrpc;
import org.hyperledger.fabric.sdk.exception.TransactionException;

import static java.lang.String.format;
import static org.hyperledger.fabric.protos.orderer.Ab.DeliverResponse.TypeCase.STATUS;

/**
* Sample client code that makes gRPC calls to the server.
*/
class OrdererClient {
private final String channelName;
boolean shutdown = false;
private static final Log logger = LogFactory.getLog(OrdererClient.class);
private ManagedChannel managedChannel;
private final String name;
private final String url;

/**
* Construct client for accessing Orderer server using the existing managedChannel.
*/
OrdererClient(ManagedChannelBuilder<?> channelBuilder) {
OrdererClient(Orderer orderer, ManagedChannelBuilder<?> channelBuilder) {
managedChannel = channelBuilder.build();
name = orderer.getName();
url = orderer.getUrl();
channelName = orderer.getChannel().getName();
}

synchronized void shutdown(boolean force) {
Expand Down Expand Up @@ -105,6 +112,10 @@ public void onNext(Ab.BroadcastResponse resp) {

@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
throwable[0] = t;
finishLatch.countDown();
}
Expand All @@ -122,7 +133,7 @@ public void onCompleted() {
//nso.onCompleted();

try {
if(!finishLatch.await(2, TimeUnit.MINUTES)){
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ste = new TransactionException("Send transactions failed. Reason: timeout");
logger.error("sendTransaction error " + ste.getMessage(), ste);
throw ste;
Expand Down Expand Up @@ -187,7 +198,8 @@ public void onNext(DeliverResponse resp) {
@Override
public void onError(Throwable t) {
if (!shutdown) {
logger.error("broadcast error " + t);
logger.error(format("Received error on channel %s, orderer %s, url %s, %s",
channelName, name, url, t.getMessage()), t);
}
throwableList.add(t);
finishLatch.countDown();
Expand All @@ -205,10 +217,10 @@ public void onCompleted() {
//nso.onCompleted();

try {
if(!finishLatch.await(2, TimeUnit.MINUTES)){
if (!finishLatch.await(2, TimeUnit.MINUTES)) {
TransactionException ex = new TransactionException("sendDeliver time exceeded for orderer");
logger.error(ex.getMessage(),ex);
throw ex;
logger.error(ex.getMessage(), ex);
throw ex;
}
logger.trace("Done waiting for reply!");

Expand All @@ -226,8 +238,8 @@ public void onCompleted() {
return retList.toArray(new DeliverResponse[retList.size()]);
}

boolean isChannelActive(){
boolean isChannelActive() {
ManagedChannel lchannel = managedChannel;
return lchannel != null && !lchannel.isShutdown() && ! lchannel.isTerminated();
return lchannel != null && !lchannel.isShutdown() && !lchannel.isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;

public class QueryByChaincodeRequest extends TransactionRequest {
private QueryByChaincodeRequest() {
private QueryByChaincodeRequest(User userContext) {
super(userContext);
}

public static QueryByChaincodeRequest newInstance() {
return new QueryByChaincodeRequest();
public static QueryByChaincodeRequest newInstance(User userContext) {
return new QueryByChaincodeRequest(userContext);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/hyperledger/fabric/sdk/QuerySCCRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public class QuerySCCRequest extends TransactionRequest {
public static final String GETTRANSACTIONBYID = "GetTransactionByID";
public static final String GETBLOCKBYTXID = "GetBlockByTxID";

public QuerySCCRequest(User userContext) {
super(userContext);
}

@Override
public ChaincodeID getChaincodeID() {
return new ChaincodeID(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;

public class TransactionProposalRequest extends TransactionRequest {
private TransactionProposalRequest() {

TransactionProposalRequest(User userContext) {
super(userContext);
}

public static TransactionProposalRequest newInstance() {
return new TransactionProposalRequest();
public static TransactionProposalRequest newInstance(User userContext) {
return new TransactionProposalRequest(userContext);

}

Expand Down
50 changes: 48 additions & 2 deletions src/main/java/org/hyperledger/fabric/sdk/TransactionRequest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 DTCC, Fujitsu Australia Software Technology - All Rights Reserved.
* Copyright 2016, 2017 DTCC, Fujitsu Australia Software Technology, IBM - All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,12 +18,16 @@
import java.util.Arrays;
import java.util.Map;

import org.hyperledger.fabric.sdk.exception.InvalidArgumentException;
import org.hyperledger.fabric.sdk.helper.Config;

/**
* A base transaction request common for InstallProposalRequest,trRequest, and QueryRequest.
*/
public class TransactionRequest {
private User userContext;

boolean submitted = false;

private final Config config = Config.getConfig();

Expand Down Expand Up @@ -53,6 +57,26 @@ public class TransactionRequest {

protected Map<String, byte[]> transientMap;

/**
* The user context to use on this request.
*
*
* @return User context that is used for signing
*/
User getUserContext() {
return userContext;
}

/**
* Set the user context for this request. This context will override the user context set
* on {@link HFClient#setUserContext(User)}
*
* @param userContext The user context for this request used for signing.
*/
public void setUserContext(User userContext) {
this.userContext = userContext;
}

/**
* Transient data added to the proposal that is not added to the ledger.
*
Expand All @@ -71,7 +95,7 @@ public Map<String, byte[]> getTransientMap() {
* This implementation returns {@code false}.
*
* @return {@code true} if an empty channel ID should be used; otherwise
* {@code false}.
* {@code false}.
*/
public boolean noChannelID() {
return false;
Expand Down Expand Up @@ -237,4 +261,26 @@ public void setProposalWaitTime(long proposalWaitTime) {
this.proposalWaitTime = proposalWaitTime;
}

/**
* If this request has been submitted already.
*
* @return true if the already submitted.
*/

public boolean isSubmitted() {
return submitted;
}

void setSubmitted() throws InvalidArgumentException {
if (submitted) {
// Has already been submitted.
throw new InvalidArgumentException("Request has been already submitted and can not be reused.");
}
this.submitted = true;
}

protected TransactionRequest(User userContext) {
this.userContext = userContext;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
*/
public class UpgradeProposalRequest extends TransactionRequest {

UpgradeProposalRequest(User userContext) {
super(userContext);
}

/**
* Transient data added to the proposal that is not added to the ledger.
*
Expand Down
6 changes: 3 additions & 3 deletions src/main/proto/orderer/configuration.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ message ConsensusType {
message BatchSize {
// Simply specified as number of messages for now, in the future
// we may want to allow this to be specified by size in bytes
uint32 maxMessageCount = 1;
uint32 max_message_count = 1;
// The byte count of the serialized messages in a batch cannot
// exceed this value.
uint32 absoluteMaxBytes = 2;
uint32 absolute_max_bytes = 2;
// The byte count of the serialized messages in a batch should not
// exceed this value.
uint32 preferredMaxBytes = 3;
uint32 preferred_max_bytes = 3;
}

message BatchTimeout {
Expand Down
Loading

0 comments on commit 99f321b

Please sign in to comment.