Skip to content

Commit

Permalink
FAB-1783 - use installed CC from file system
Browse files Browse the repository at this point in the history
https://jira.hyperledger.org/browse/FAB-1783. In particular,
"newdeploy.pptx" there.

Move off per channel "on-ledger" deploy model to "on-peer" install
model. This affects version, launch and per channel ledger instatiation.

Now that chaincode is not on ledger but is on-peer, there is no need
to create a container per instance of the chaincode deployed on
different channels. These are the tightly related changes
   . user has to specify version (see, version option in CLI commands)
   . load off file system and not on ledger (see ccprovider.go)
   . chaincode launch separated from chaincode instantiation (see chaincode_support.go)
   . chaincode "init" is no longer tied to deploy (install) (see handler.go)

Note 1 -a minor side effect is that the chaincode container is not killed after instantiate
like it was after the old deploy. It need not be as the chaincode is not bound to a
channel.

Note 2 - the "--peer-chaincodedev=true" has to be treated differently for the
new model. A JIRA https://jira.hyperledger.org/browse/FAB-2128 has been opened
for this

Change-Id: I34b417d119c16bf296130f8740cddd5af9a8b582
Signed-off-by: Srinivasan Muralidharan <muralisr@us.ibm.com>
  • Loading branch information
Srinivasan Muralidharan committed Feb 8, 2017
1 parent 32ae559 commit ab4b7f7
Show file tree
Hide file tree
Showing 22 changed files with 381 additions and 453 deletions.
83 changes: 27 additions & 56 deletions core/chaincode/chaincode_support.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package chaincode
import (
"fmt"
"io"
"path/filepath"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -105,6 +106,8 @@ func (chaincodeSupport *ChaincodeSupport) chaincodeHasBeenLaunched(chaincode str

// NewChaincodeSupport creates a new ChaincodeSupport instance
func NewChaincodeSupport(getPeerEndpoint func() (*pb.PeerEndpoint, error), userrunsCC bool, ccstartuptimeout time.Duration) *ChaincodeSupport {
ccprovider.SetChaincodesPath(viper.GetString("peer.fileSystemPath") + string(filepath.Separator) + "chaincodes")

pnid := viper.GetString("peer.networkId")
pid := viper.GetString("peer.id")

Expand Down Expand Up @@ -265,8 +268,8 @@ func (chaincodeSupport *ChaincodeSupport) deregisterHandler(chaincodehandler *Ha
return nil
}

// Based on state of chaincode send either init or ready to move to ready state
func (chaincodeSupport *ChaincodeSupport) sendInitOrReady(context context.Context, cccid *ccprovider.CCContext, initArgs [][]byte, timeout time.Duration) error {
// send ready to move to ready state
func (chaincodeSupport *ChaincodeSupport) sendReady(context context.Context, cccid *ccprovider.CCContext, timeout time.Duration) error {
canName := cccid.GetCanonicalName()
chaincodeSupport.runningChaincodes.Lock()
//if its in the map, there must be a connected stream...nothing to do
Expand All @@ -281,7 +284,7 @@ func (chaincodeSupport *ChaincodeSupport) sendInitOrReady(context context.Contex

var notfy chan *pb.ChaincodeMessage
var err error
if notfy, err = chrte.handler.initOrReady(context, cccid.ChainID, cccid.TxID, cccid.Proposal, initArgs); err != nil {
if notfy, err = chrte.handler.ready(context, cccid.ChainID, cccid.TxID, cccid.Proposal); err != nil {
return fmt.Errorf("Error sending %s: %s", pb.ChaincodeMessage_INIT, err)
}
if notfy != nil {
Expand Down Expand Up @@ -394,7 +397,7 @@ func (chaincodeSupport *ChaincodeSupport) launchAndWaitForRegister(ctxt context.

vmtype, _ := chaincodeSupport.getVMType(cds)

sir := container.StartImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, ChainID: cccid.ChainID, Version: cccid.Version}, Builder: builder, Args: args, Env: env}
sir := container.StartImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, Version: cccid.Version}, Builder: builder, Args: args, Env: env}

ipcCtxt := context.WithValue(ctxt, ccintf.GetCCHandlerKey(), chaincodeSupport)

Expand Down Expand Up @@ -437,7 +440,7 @@ func (chaincodeSupport *ChaincodeSupport) Stop(context context.Context, cccid *c
}

//stop the chaincode
sir := container.StopImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, ChainID: cccid.ChainID, Version: cccid.Version}, Timeout: 0}
sir := container.StopImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, Version: cccid.Version}, Timeout: 0}
// The line below is left for debugging. It replaces the line above to keep
// the chaincode container around to give you a chance to get data
//sir := container.StopImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, ChainID: cccid.ChainID, Version: cccid.Version}, Timeout: 0, Dontremove: true}
Expand Down Expand Up @@ -470,7 +473,6 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
var cID *pb.ChaincodeID
var cMsg *pb.ChaincodeInput
var cLang pb.ChaincodeSpec_Type
var initargs [][]byte

var cds *pb.ChaincodeDeploymentSpec
var ci *pb.ChaincodeInvocationSpec
Expand All @@ -483,7 +485,6 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
cID = cds.ChaincodeSpec.ChaincodeID
cMsg = cds.ChaincodeSpec.Input
cLang = cds.ChaincodeSpec.Type
initargs = cMsg.Args
} else {
cID = ci.ChaincodeSpec.ChaincodeID
cMsg = ci.ChaincodeSpec.Input
Expand Down Expand Up @@ -548,6 +549,20 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid

//launch container if it is a System container or not in dev mode
if (!chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) && (chrte == nil || chrte.handler == nil) {
//whether we deploying, upgrading or launching a chaincode we now have a
//deployment package. If lauching, we got it from LCCC and has gone through
//ccprovider.GetChaincodeFromFS
if cds.CodePackage == nil {
//no code bytes for these situations
if !(chaincodeSupport.userRunsCC || cds.ExecEnv == pb.ChaincodeDeploymentSpec_SYSTEM) {
_, cdsfs, err := ccprovider.GetChaincodeFromFS(cID.Name, cID.Version)
if err != nil {
return cID, cMsg, err
}
cds.CodePackage = cdsfs.CodePackage
chaincodeLogger.Debugf("launchAndWaitForRegister fetched %d from file system", len(cds.CodePackage), err)
}
}

builder := func() (io.Reader, error) { return platforms.GenerateDockerBuild(cds) }
err = chaincodeSupport.launchAndWaitForRegister(context, cccid, cds, cLang, builder)
Expand All @@ -558,8 +573,8 @@ func (chaincodeSupport *ChaincodeSupport) Launch(context context.Context, cccid
}

if err == nil {
//send init (if (args)) and wait for ready state
err = chaincodeSupport.sendInitOrReady(context, cccid, initargs, chaincodeSupport.ccStartupTimeout)
//launch will set the chaincode in Ready state
err = chaincodeSupport.sendReady(context, cccid, chaincodeSupport.ccStartupTimeout)
if err != nil {
chaincodeLogger.Errorf("sending init failed(%s)", err)
err = fmt.Errorf("Failed to init chaincode(%s)", err)
Expand All @@ -585,50 +600,6 @@ func (chaincodeSupport *ChaincodeSupport) getVMType(cds *pb.ChaincodeDeploymentS
return container.DOCKER, nil
}

// Deploy deploys the chaincode if not in development mode where user is running the chaincode.
func (chaincodeSupport *ChaincodeSupport) Deploy(context context.Context, cccid *ccprovider.CCContext, cds *pb.ChaincodeDeploymentSpec) (*pb.ChaincodeDeploymentSpec, error) {
cLang := cds.ChaincodeSpec.Type
canName := cccid.GetCanonicalName()

if chaincodeSupport.userRunsCC {
chaincodeLogger.Debug("user runs chaincode, not deploying chaincode")
return nil, nil
}

chaincodeSupport.runningChaincodes.Lock()
//if its in the map, there must be a connected stream...and we are trying to build the code ?!
if _, ok := chaincodeSupport.chaincodeHasBeenLaunched(canName); ok {
chaincodeLogger.Debugf("deploy ?!! there's a chaincode with that name running: %s", canName)
chaincodeSupport.runningChaincodes.Unlock()
return cds, fmt.Errorf("deploy attempted but a chaincode with same name running %s", canName)
}
chaincodeSupport.runningChaincodes.Unlock()

args, envs, err := chaincodeSupport.getArgsAndEnv(cccid, cLang)
if err != nil {
return cds, fmt.Errorf("error getting args for chaincode %s", err)
}

targz, err := platforms.GenerateDockerBuild(cds)
if err != nil {
return cds, fmt.Errorf("error converting CodePackage to Docker: %s", err)
}

cir := &container.CreateImageReq{CCID: ccintf.CCID{ChaincodeSpec: cds.ChaincodeSpec, NetworkID: chaincodeSupport.peerNetworkID, PeerID: chaincodeSupport.peerID, ChainID: cccid.ChainID, Version: cccid.Version}, Args: args, Reader: targz, Env: envs}

vmtype, _ := chaincodeSupport.getVMType(cds)

chaincodeLogger.Debugf("deploying chaincode %s(networkid:%s,peerid:%s)", canName, chaincodeSupport.peerNetworkID, chaincodeSupport.peerID)

//create image and create container
resp, err2 := container.VMCProcess(context, vmtype, cir)
if err2 != nil || (resp != nil && resp.(container.VMCResp).Err != nil) {
err = fmt.Errorf("Error creating image: %s", err2)
}

return cds, err
}

// HandleChaincodeStream implements ccintf.HandleChaincodeStream for all vms to call with appropriate stream
func (chaincodeSupport *ChaincodeSupport) HandleChaincodeStream(ctxt context.Context, stream ccintf.ChaincodeStream) error {
return HandleChaincodeStream(chaincodeSupport, ctxt, stream)
Expand All @@ -639,14 +610,14 @@ func (chaincodeSupport *ChaincodeSupport) Register(stream pb.ChaincodeSupport_Re
return chaincodeSupport.HandleChaincodeStream(stream.Context(), stream)
}

// createTransactionMessage creates a transaction message.
func createTransactionMessage(txid string, cMsg *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
// createCCMessage creates a transaction message.
func createCCMessage(typ pb.ChaincodeMessage_Type, txid string, cMsg *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
payload, err := proto.Marshal(cMsg)
if err != nil {
fmt.Printf(err.Error())
return nil, err
}
return &pb.ChaincodeMessage{Type: pb.ChaincodeMessage_TRANSACTION, Payload: payload, Txid: txid}, nil
return &pb.ChaincodeMessage{Type: typ, Payload: payload, Txid: txid}, nil
}

// Execute executes a transaction and waits for it to complete until a timeout value.
Expand Down
2 changes: 1 addition & 1 deletion core/chaincode/concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestExecuteConcurrentInvokes(t *testing.T) {

url := "github.com/hyperledger/fabric/examples/ccchecker/chaincodes/newkeyperinvoke"

chaincodeID := &pb.ChaincodeID{Name: "nkpi", Path: url}
chaincodeID := &pb.ChaincodeID{Name: "nkpi", Path: url, Version: "0"}

args := util.ToChaincodeArgs("init", "")

Expand Down
106 changes: 48 additions & 58 deletions core/chaincode/exectransaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,79 +35,69 @@ func Execute(ctxt context.Context, cccid *ccprovider.CCContext, spec interface{}
var err error
var cds *pb.ChaincodeDeploymentSpec
var ci *pb.ChaincodeInvocationSpec

//init will call the Init method of a on a chain
cctyp := pb.ChaincodeMessage_INIT
if cds, _ = spec.(*pb.ChaincodeDeploymentSpec); cds == nil {
if ci, _ = spec.(*pb.ChaincodeInvocationSpec); ci == nil {
panic("Execute should be called with deployment or invocation spec")
}
cctyp = pb.ChaincodeMessage_TRANSACTION
}

if cds != nil {
_, err := theChaincodeSupport.Deploy(ctxt, cccid, cds)
if err != nil {
return nil, nil, fmt.Errorf("Failed to deploy chaincode spec(%s)", err)
}
cID, cMsg, err := theChaincodeSupport.Launch(ctxt, cccid, spec)
if err != nil {
return nil, nil, fmt.Errorf("%s", err)
}

_, _, err = theChaincodeSupport.Launch(ctxt, cccid, cds)
if err != nil {
return nil, nil, fmt.Errorf("%s", err)
}
} else {
//will launch if necessary (and wait for ready)
cID, cMsg, err := theChaincodeSupport.Launch(ctxt, cccid, ci)
if err != nil {
return nil, nil, fmt.Errorf("Failed to launch chaincode spec(%s)", err)
}
//this should work because it worked above...
chaincode := cID.Name

//this should work because it worked above...
chaincode := cID.Name
if err != nil {
return nil, nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
}

if err != nil {
return nil, nil, fmt.Errorf("Failed to stablish stream to container %s", chaincode)
}
// TODO: Need to comment next line and uncomment call to getTimeout, when transaction blocks are being created
timeout := time.Duration(30000) * time.Millisecond

// TODO: Need to comment next line and uncomment call to getTimeout, when transaction blocks are being created
timeout := time.Duration(30000) * time.Millisecond
if err != nil {
return nil, nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
}

if err != nil {
return nil, nil, fmt.Errorf("Failed to retrieve chaincode spec(%s)", err)
}
var ccMsg *pb.ChaincodeMessage
ccMsg, err = createCCMessage(cctyp, cccid.TxID, cMsg)
if err != nil {
return nil, nil, fmt.Errorf("Failed to transaction message(%s)", err)
}

var ccMsg *pb.ChaincodeMessage
ccMsg, err = createTransactionMessage(cccid.TxID, cMsg)
if err != nil {
return nil, nil, fmt.Errorf("Failed to transaction message(%s)", err)
}
resp, err := theChaincodeSupport.Execute(ctxt, cccid, ccMsg, timeout)
if err != nil {
// Rollback transaction
return nil, nil, fmt.Errorf("Failed to execute transaction (%s)", err)
} else if resp == nil {
// Rollback transaction
return nil, nil, fmt.Errorf("Failed to receive a response for (%s)", cccid.TxID)
}
res := &pb.Response{}
unmarshalErr := proto.Unmarshal(resp.Payload, res)
if unmarshalErr != nil {
return nil, nil, fmt.Errorf("Failed to unmarshal response for (%s): %s", cccid.TxID, unmarshalErr)
}

resp, err := theChaincodeSupport.Execute(ctxt, cccid, ccMsg, timeout)
if err != nil {
// Rollback transaction
return nil, nil, fmt.Errorf("Failed to execute transaction (%s)", err)
} else if resp == nil {
// Rollback transaction
return nil, nil, fmt.Errorf("Failed to receive a response for (%s)", cccid.TxID)
}
res := &pb.Response{}
unmarshalErr := proto.Unmarshal(resp.Payload, res)
if unmarshalErr != nil {
return nil, nil, fmt.Errorf("Failed to unmarshal response for (%s): %s", cccid.TxID, unmarshalErr)
} else {
if resp.ChaincodeEvent != nil {
resp.ChaincodeEvent.ChaincodeID = cccid.Name
resp.ChaincodeEvent.TxID = cccid.TxID
}

if resp.Type == pb.ChaincodeMessage_COMPLETED {
// Success
return res, resp.ChaincodeEvent, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR {
// Rollback transaction
return nil, resp.ChaincodeEvent, fmt.Errorf("Transaction returned with failure: %s", string(resp.Payload))
}
return res, nil, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", cccid.TxID, resp.Type)
}
if resp.ChaincodeEvent != nil {
resp.ChaincodeEvent.ChaincodeID = cccid.Name
resp.ChaincodeEvent.TxID = cccid.TxID
}

if resp.Type == pb.ChaincodeMessage_COMPLETED {
// Success
return res, resp.ChaincodeEvent, nil
} else if resp.Type == pb.ChaincodeMessage_ERROR {
// Rollback transaction
return nil, resp.ChaincodeEvent, fmt.Errorf("Transaction returned with failure: %s", string(resp.Payload))
}
return &pb.Response{Status: shim.OK, Payload: nil}, nil, err

return res, nil, fmt.Errorf("receive a response for (%s) but in invalid state(%d)", cccid.TxID, resp.Type)
}

// ExecuteWithErrorFilter is similar to Execute, but filters error contained in chaincode response and returns Payload of response only.
Expand Down
Loading

0 comments on commit ab4b7f7

Please sign in to comment.