Skip to content

Commit 4fecdbd

Browse files
committed
[FAB-7581] Enabling couchdb index creation
This CR introduces 1) 'ChaincodeLifecycleEventListener' - this needs to be implemented by couchdb implementation in order to get a chance to create indexes upon chaincode install or deploy. This function will be invoked for a chain + chaincode combination only if the latest chaincode event (i.e., deploy on a chain or install on peer) makes the state such that the chaincode is deployed on the chain and is installed on the peer. 2) Chaincode event management module - this exposes two functions that are to be invoked from chaincode deploy and chaincode install path respectively. Internally, it synchronizes the channel creation, chaincode install, and chaincode deploy so that we do not miss events for index creation 3) Functions for getting the status of a chaincode (deployed/installed). Further, if installed - extract the statedb artifacts from the chaincode package. These functions are used by the module listed in (2) above 4) statelistener functionality - This allows for registering an arbitrary listener for state changes over a namesapce. This is used for listening the changes over 'lscc' namespace that indicate the deployment of one or more chaincodes in a block 5) Enabling couchdb index creation upon chaincode deploy using the all above 6) In a separate CR, one function (namely, 'HandleChaincodeInstall') needs to be invoked from chaincode install code path so that index creation is enabled during chaincode install as well Change-Id: I928f6ffd10f7985cbef63a70f76da920a4ca4c56 Signed-off-by: manish <manish.sethi@gmail.com>
1 parent 14b4b0a commit 4fecdbd

File tree

17 files changed

+726
-19
lines changed

17 files changed

+726
-19
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package ccprovider
8+
9+
import (
10+
"bytes"
11+
"fmt"
12+
13+
"github.com/golang/protobuf/proto"
14+
"github.com/hyperledger/fabric/core/common/sysccprovider"
15+
)
16+
17+
// IsChaincodeDeployed returns true if the chaincode with given name and version is deployed
18+
func IsChaincodeDeployed(chainid, ccName, ccVersion string, ccHash []byte) (bool, error) {
19+
sccprovider := sysccprovider.GetSystemChaincodeProvider()
20+
qe, err := sccprovider.GetQueryExecutorForLedger(chainid)
21+
if err != nil {
22+
return false, fmt.Errorf("Could not retrieve QueryExecutor for channel %s, error %s", chainid, err)
23+
}
24+
defer qe.Done()
25+
26+
chaincodeDataBytes, err := qe.GetState("lscc", ccName)
27+
if err != nil {
28+
return false, fmt.Errorf("Could not retrieve state for chaincode %s on channel %s, error %s", ccName, chainid, err)
29+
}
30+
31+
if chaincodeDataBytes == nil {
32+
return false, nil
33+
}
34+
35+
chaincodeData := &ChaincodeData{}
36+
err = proto.Unmarshal(chaincodeDataBytes, chaincodeData)
37+
if err != nil {
38+
return false, fmt.Errorf("Unmarshalling ChaincodeQueryResponse failed, error %s", err)
39+
}
40+
return chaincodeData.CCVersion() == ccVersion && bytes.Equal(chaincodeData.Hash(), ccHash), nil
41+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package ccprovider
8+
9+
import (
10+
"archive/tar"
11+
"bytes"
12+
"compress/gzip"
13+
"io"
14+
"strings"
15+
)
16+
17+
const (
18+
ccPackageStatedbDir = "META-INF/statedb/"
19+
)
20+
21+
// ExtractStatedbArtifactsAsTarbytes extracts the statedb artifacts from the code package tar and create a statedb artifact tar.
22+
// The state db artifacts are expected to contain state db specific artifacts such as index specification in the case of couchdb.
23+
// This function is intented to be used during chaincode instantiate/upgrade so that statedb artifacts can be created.
24+
func ExtractStatedbArtifactsAsTarbytes(ccname, ccversion string) (installed bool, statedbArtifactsTar []byte, err error) {
25+
ccpackage, err := GetChaincodeFromFS(ccname, ccversion)
26+
if err != nil {
27+
// TODO for now, we assume that an error indicates that the chaincode is not installed on the peer.
28+
// However, we need a way to differentiate between the 'not installed' and a general error so that on general error,
29+
// we can abort the chaincode instantiate/upgrade/install operation.
30+
ccproviderLogger.Info("Error while loading installation package for ccname=%s, ccversion=%s. Err=%s", ccname, ccversion, err)
31+
return false, nil, nil
32+
}
33+
34+
cds := ccpackage.GetDepSpec()
35+
is := bytes.NewReader(cds.CodePackage)
36+
gr, err := gzip.NewReader(is)
37+
if err != nil {
38+
ccproviderLogger.Errorf("Failure opening codepackage gzip stream: %s", err)
39+
return true, nil, err
40+
}
41+
tr := tar.NewReader(gr)
42+
statedbTarBuffer := bytes.NewBuffer(nil)
43+
tw := tar.NewWriter(statedbTarBuffer)
44+
45+
// For each file in the code package tar,
46+
// add it to the statedb artifact tar if it has "statedb" in the path
47+
for {
48+
header, err := tr.Next()
49+
if err == io.EOF {
50+
// We only get here if there are no more entries to scan
51+
break
52+
}
53+
54+
if err != nil {
55+
return true, nil, err
56+
}
57+
ccproviderLogger.Debugf("header.Name = %s", header.Name)
58+
if !strings.HasPrefix(header.Name, ccPackageStatedbDir) {
59+
continue
60+
}
61+
if err = tw.WriteHeader(header); err != nil {
62+
ccproviderLogger.Error("Error adding header to statedb tar:", err, header.Name)
63+
return true, nil, err
64+
}
65+
if _, err := io.Copy(tw, tr); err != nil {
66+
ccproviderLogger.Error("Error copying file to statedb tar:", err, header.Name)
67+
return true, nil, err
68+
}
69+
ccproviderLogger.Debug("Wrote file to statedb tar:", header.Name)
70+
}
71+
if err = tw.Close(); err != nil {
72+
return true, nil, err
73+
}
74+
ccproviderLogger.Debug("Created statedb artifact tar")
75+
return true, statedbTarBuffer.Bytes(), nil
76+
}

core/ledger/cceventmgmt/defs.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cceventmgmt
8+
9+
import (
10+
"fmt"
11+
12+
"github.com/hyperledger/fabric/core/common/ccprovider"
13+
)
14+
15+
// ChaincodeDefinition captures the info about chaincode
16+
type ChaincodeDefinition struct {
17+
Name string
18+
Hash []byte
19+
Version string
20+
}
21+
22+
func (cdef *ChaincodeDefinition) String() string {
23+
return fmt.Sprintf("Name=%s, Version=%s, Hash=%#v", cdef.Name, cdef.Version, cdef.Hash)
24+
}
25+
26+
// ChaincodeLifecycleEventListener interface enables ledger components (mainly, intended for statedb)
27+
// to be able to listen to chaincode lifecycle events. 'dbArtifactsTar' represents db specific artifacts
28+
// (such as index specs) packaged in a tar
29+
type ChaincodeLifecycleEventListener interface {
30+
// HandleChaincodeDeploy is expected to creates all the necessary statedb structures (such as indexes)
31+
HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error
32+
}
33+
34+
// ChaincodeInfoProvider interface enables event mgr to retrieve chaincode info for a given chaincode
35+
type ChaincodeInfoProvider interface {
36+
// IsChaincodeDeployed returns true if the given chaincode is deployed on the given channel
37+
IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error)
38+
// RetrieveChaincodeArtifacts checks if the given chaincode is installed on the peer and if yes,
39+
// it extracts the state db specific artifacts from the chaincode package tarball
40+
RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error)
41+
}
42+
43+
type chaincodeInfoProviderImpl struct {
44+
}
45+
46+
// IsChaincodeDeployed implements function in the interface ChaincodeInfoProvider
47+
func (p *chaincodeInfoProviderImpl) IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error) {
48+
return ccprovider.IsChaincodeDeployed(chainid, chaincodeDefinition.Name, chaincodeDefinition.Version, chaincodeDefinition.Hash)
49+
}
50+
51+
// RetrieveChaincodeArtifacts implements function in the interface ChaincodeInfoProvider
52+
func (p *chaincodeInfoProviderImpl) RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error) {
53+
return ccprovider.ExtractStatedbArtifactsAsTarbytes(chaincodeDefinition.Name, chaincodeDefinition.Version)
54+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cceventmgmt
8+
9+
import (
10+
"fmt"
11+
12+
"github.com/golang/protobuf/proto"
13+
"github.com/hyperledger/fabric/core/common/ccprovider"
14+
"github.com/hyperledger/fabric/core/ledger"
15+
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
16+
)
17+
18+
// KVLedgerLSCCStateListener listens for state changes on 'lscc' namespace
19+
type KVLedgerLSCCStateListener struct {
20+
}
21+
22+
// HandleStateUpdates iterates over key-values being written in the 'lscc' namespace (which indicates deployment of a chaincode)
23+
// and invokes `HandleChaincodeDeploy` function on chaincode event manager (which in turn is responsible for creation of statedb
24+
// artifacts for the chaincode statedata)
25+
func (listener *KVLedgerLSCCStateListener) HandleStateUpdates(channelName string, stateUpdates ledger.StateUpdates) error {
26+
kvWrites := stateUpdates.([]*kvrwset.KVWrite)
27+
logger.Debugf("HandleStateUpdates() - channelName=%s, stateUpdates=%#v", channelName, kvWrites)
28+
chaincodeDefs := []*ChaincodeDefinition{}
29+
for _, kvWrite := range kvWrites {
30+
if kvWrite.IsDelete {
31+
continue
32+
}
33+
chaincodeData := &ccprovider.ChaincodeData{}
34+
if err := proto.Unmarshal(kvWrite.Value, chaincodeData); err != nil {
35+
return fmt.Errorf("Unmarshalling ChaincodeQueryResponse failed, error %s", err)
36+
}
37+
chaincodeDefs = append(chaincodeDefs, &ChaincodeDefinition{Name: chaincodeData.CCName(), Version: chaincodeData.CCVersion(), Hash: chaincodeData.Hash()})
38+
}
39+
return GetMgr().HandleChaincodeDeploy(channelName, chaincodeDefs)
40+
}

core/ledger/cceventmgmt/mgmt_test.go

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package cceventmgmt
8+
9+
import (
10+
"os"
11+
"testing"
12+
13+
"github.com/golang/protobuf/proto"
14+
"github.com/hyperledger/fabric/common/flogging"
15+
"github.com/hyperledger/fabric/core/common/ccprovider"
16+
"github.com/hyperledger/fabric/protos/ledger/rwset/kvrwset"
17+
"github.com/stretchr/testify/assert"
18+
)
19+
20+
func TestMain(m *testing.M) {
21+
flogging.SetModuleLevel("eventmgmt", "debug")
22+
os.Exit(m.Run())
23+
}
24+
func TestCCEventMgmt(t *testing.T) {
25+
cc1Def := &ChaincodeDefinition{Name: "cc1", Version: "v1", Hash: []byte("cc1")}
26+
cc1DBArtifactsTar := []byte("cc1DBArtifacts")
27+
28+
cc2Def := &ChaincodeDefinition{Name: "cc2", Version: "v1", Hash: []byte("cc2")}
29+
cc2DBArtifactsTar := []byte("cc2DBArtifacts")
30+
31+
cc3Def := &ChaincodeDefinition{Name: "cc3", Version: "v1", Hash: []byte("cc3")}
32+
cc3DBArtifactsTar := []byte("cc3DBArtifacts")
33+
34+
// cc1 is deployed and installed. cc2 is deployed but not installed. cc3 is not deployed but installed
35+
mockProvider := newMockProvider()
36+
mockProvider.setChaincodeInstalled(cc1Def, cc1DBArtifactsTar)
37+
mockProvider.setChaincodeDeployed("channel1", cc1Def)
38+
mockProvider.setChaincodeDeployed("channel1", cc2Def)
39+
mockProvider.setChaincodeInstalled(cc3Def, cc3DBArtifactsTar)
40+
setEventMgrForTest(newMgr(mockProvider))
41+
defer clearEventMgrForTest()
42+
43+
handler1, handler2 := &mockHandler{}, &mockHandler{}
44+
eventMgr := GetMgr()
45+
assert.NotNil(t, eventMgr)
46+
eventMgr.Register("channel1", handler1)
47+
eventMgr.Register("channel2", handler2)
48+
49+
cc2ExpectedEvent := &mockEvent{cc2Def, cc2DBArtifactsTar}
50+
cc3ExpectedEvent := &mockEvent{cc3Def, cc3DBArtifactsTar}
51+
52+
// Deploy cc3 on chain1 - only handler1 should recieve event because cc3 is being deployed only on chain1
53+
eventMgr.HandleChaincodeDeploy("channel1", []*ChaincodeDefinition{cc3Def})
54+
assert.Contains(t, handler1.eventsRecieved, cc3ExpectedEvent)
55+
assert.NotContains(t, handler2.eventsRecieved, cc3ExpectedEvent)
56+
57+
// Deploy cc3 on chain2 as well and this time handler2 should also recieve event
58+
eventMgr.HandleChaincodeDeploy("channel2", []*ChaincodeDefinition{cc3Def})
59+
assert.Contains(t, handler2.eventsRecieved, cc3ExpectedEvent)
60+
61+
// Install CC2 - only handler1 should receive event because cc2 is deployed only on chain1 and not on chain2
62+
eventMgr.HandleChaincodeInstall(cc2Def, cc2DBArtifactsTar)
63+
assert.Contains(t, handler1.eventsRecieved, cc2ExpectedEvent)
64+
assert.NotContains(t, handler2.eventsRecieved, cc2ExpectedEvent)
65+
}
66+
67+
func TestLSCCListener(t *testing.T) {
68+
channelName := "testChannel"
69+
cc1Def := &ChaincodeDefinition{Name: "testChaincode", Version: "v1", Hash: []byte("hash_testChaincode")}
70+
cc1DBArtifactsTar := []byte("cc1DBArtifacts")
71+
// cc1 is installed but not deployed
72+
mockProvider := newMockProvider()
73+
mockProvider.setChaincodeInstalled(cc1Def, cc1DBArtifactsTar)
74+
setEventMgrForTest(newMgr(mockProvider))
75+
defer clearEventMgrForTest()
76+
handler1 := &mockHandler{}
77+
GetMgr().Register(channelName, handler1)
78+
lsccStateListener := &KVLedgerLSCCStateListener{}
79+
80+
sampleChaincodeData := &ccprovider.ChaincodeData{Name: cc1Def.Name, Version: cc1Def.Version, Id: cc1Def.Hash}
81+
sampleChaincodeDataBytes, err := proto.Marshal(sampleChaincodeData)
82+
assert.NoError(t, err, "")
83+
lsccStateListener.HandleStateUpdates(channelName, []*kvrwset.KVWrite{
84+
&kvrwset.KVWrite{Key: cc1Def.Name, Value: sampleChaincodeDataBytes},
85+
})
86+
assert.Contains(t, handler1.eventsRecieved, &mockEvent{cc1Def, cc1DBArtifactsTar})
87+
}
88+
89+
type mockProvider struct {
90+
chaincodesDeployed map[[3]string]bool
91+
chaincodesInstalled map[[2]string][]byte
92+
}
93+
94+
type mockHandler struct {
95+
eventsRecieved []*mockEvent
96+
}
97+
98+
type mockEvent struct {
99+
def *ChaincodeDefinition
100+
dbArtifactsTar []byte
101+
}
102+
103+
func (l *mockHandler) HandleChaincodeDeploy(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) error {
104+
l.eventsRecieved = append(l.eventsRecieved, &mockEvent{def: chaincodeDefinition, dbArtifactsTar: dbArtifactsTar})
105+
return nil
106+
}
107+
108+
func newMockProvider() *mockProvider {
109+
return &mockProvider{
110+
make(map[[3]string]bool),
111+
make(map[[2]string][]byte),
112+
}
113+
}
114+
115+
func (p *mockProvider) setChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) {
116+
p.chaincodesDeployed[[3]string{chainid, chaincodeDefinition.Name, chaincodeDefinition.Version}] = true
117+
}
118+
119+
func (p *mockProvider) setChaincodeInstalled(chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) {
120+
p.chaincodesInstalled[[2]string{chaincodeDefinition.Name, chaincodeDefinition.Version}] = dbArtifactsTar
121+
}
122+
123+
func (p *mockProvider) setChaincodeDeployAndInstalled(chainid string, chaincodeDefinition *ChaincodeDefinition, dbArtifactsTar []byte) {
124+
p.setChaincodeDeployed(chainid, chaincodeDefinition)
125+
p.setChaincodeInstalled(chaincodeDefinition, dbArtifactsTar)
126+
}
127+
128+
func (p *mockProvider) IsChaincodeDeployed(chainid string, chaincodeDefinition *ChaincodeDefinition) (bool, error) {
129+
return p.chaincodesDeployed[[3]string{chainid, chaincodeDefinition.Name, chaincodeDefinition.Version}], nil
130+
}
131+
132+
func (p *mockProvider) RetrieveChaincodeArtifacts(chaincodeDefinition *ChaincodeDefinition) (installed bool, dbArtifactsTar []byte, err error) {
133+
dbArtifactsTar, ok := p.chaincodesInstalled[[2]string{chaincodeDefinition.Name, chaincodeDefinition.Version}]
134+
if !ok {
135+
return false, nil, nil
136+
}
137+
return true, dbArtifactsTar, nil
138+
}
139+
140+
func setEventMgrForTest(eventMgr *Mgr) {
141+
mgr = eventMgr
142+
}
143+
144+
func clearEventMgrForTest() {
145+
mgr = nil
146+
}

0 commit comments

Comments
 (0)