@@ -19,57 +19,75 @@ package producer
19
19
import (
20
20
"fmt"
21
21
22
+ "github.com/hyperledger/fabric/core/ledger/util"
22
23
"github.com/hyperledger/fabric/protos/common"
23
24
pb "github.com/hyperledger/fabric/protos/peer"
24
25
"github.com/hyperledger/fabric/protos/utils"
25
26
)
26
27
27
- // SendProducerBlockEvent sends block event to clients
28
- func SendProducerBlockEvent (block * common.Block ) error {
28
+ // CreateBlockEvents creates block events for a block. It removes the RW set
29
+ // and creates a block event and a filtered block event. Sending the events
30
+ // is the responsibility of the code that calls this function.
31
+ func CreateBlockEvents (block * common.Block ) (bevent * pb.Event , fbevent * pb.Event , channelID string , err error ) {
29
32
logger .Debugf ("Entry" )
30
33
defer logger .Debugf ("Exit" )
31
- bevent := & common.Block {}
32
- bevent .Header = block .Header
33
- bevent .Metadata = block .Metadata
34
- bevent .Data = & common.BlockData {}
35
- var channelId string
36
- for _ , d := range block .Data .Data {
34
+
35
+ blockForEvent := & common.Block {}
36
+ filteredBlockForEvent := & pb.FilteredBlock {}
37
+ filteredTxArray := []* pb.FilteredTransaction {}
38
+ blockForEvent .Header = block .Header
39
+ blockForEvent .Metadata = block .Metadata
40
+ blockForEvent .Data = & common.BlockData {}
41
+ txsFltr := util .TxValidationFlags (block .Metadata .Metadata [common .BlockMetadataIndex_TRANSACTIONS_FILTER ])
42
+
43
+ for txIndex , d := range block .Data .Data {
37
44
ebytes := d
38
45
if ebytes != nil {
39
46
if env , err := utils .GetEnvelopeFromBlock (ebytes ); err != nil {
40
- logger .Errorf ("error getting tx from block(%s) \n " , err )
47
+ logger .Errorf ("error getting tx from block: %s " , err )
41
48
} else if env != nil {
42
49
// get the payload from the envelope
43
50
payload , err := utils .GetPayload (env )
44
51
if err != nil {
45
- return fmt .Errorf ("could not extract payload from envelope, err %s" , err )
52
+ return nil , nil , "" , fmt .Errorf ("could not extract payload from envelope: %s" , err )
46
53
}
47
54
48
55
chdr , err := utils .UnmarshalChannelHeader (payload .Header .ChannelHeader )
49
56
if err != nil {
50
- return err
57
+ return nil , nil , "" , err
51
58
}
52
- channelId = chdr .ChannelId
59
+ channelID = chdr .ChannelId
53
60
54
61
if common .HeaderType (chdr .Type ) == common .HeaderType_ENDORSER_TRANSACTION {
55
- logger .Debugf ("Channel [%s]: Block event for block number [%d] contains transaction id: %s" , channelId , block .Header .Number , chdr .TxId )
62
+ logger .Debugf ("Channel [%s]: Block event for block number [%d] contains transaction id: %s" , channelID , block .Header .Number , chdr .TxId )
56
63
tx , err := utils .GetTransaction (payload .Data )
57
64
if err != nil {
58
- return fmt .Errorf ("error unmarshalling transaction payload for block event: %s" , err )
65
+ return nil , nil , "" , fmt .Errorf ("error unmarshalling transaction payload for block event: %s" , err )
59
66
}
60
67
chaincodeActionPayload , err := utils .GetChaincodeActionPayload (tx .Actions [0 ].Payload )
61
68
if err != nil {
62
- return fmt .Errorf ("error unmarshalling transaction action payload for block event: %s" , err )
69
+ return nil , nil , "" , fmt .Errorf ("error unmarshalling transaction action payload for block event: %s" , err )
63
70
}
64
71
propRespPayload , err := utils .GetProposalResponsePayload (chaincodeActionPayload .Action .ProposalResponsePayload )
65
72
if err != nil {
66
- return fmt .Errorf ("error unmarshalling proposal response payload for block event: %s" , err )
73
+ return nil , nil , "" , fmt .Errorf ("error unmarshalling proposal response payload for block event: %s" , err )
67
74
}
68
75
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
69
76
caPayload , err := utils .GetChaincodeAction (propRespPayload .Extension )
70
77
if err != nil {
71
- return fmt .Errorf ("error unmarshalling chaincode action for block event: %s" , err )
78
+ return nil , nil , "" , fmt .Errorf ("error unmarshalling chaincode action for block event: %s" , err )
79
+ }
80
+
81
+ ccEvent , err := utils .GetChaincodeEvents (caPayload .Events )
82
+ filteredTx := & pb.FilteredTransaction {Txid : chdr .TxId , TxValidationCode : txsFltr .Flag (txIndex )}
83
+
84
+ if ccEvent != nil {
85
+ filteredCcEvent := ccEvent
86
+ // nil out ccevent payload
87
+ filteredCcEvent .Payload = nil
88
+ filteredTx .CcEvent = filteredCcEvent
72
89
}
90
+ filteredTxArray = append (filteredTxArray , filteredTx )
73
91
// Drop read write set from transaction before sending block event
74
92
// Performance issue with chaincode deploy txs and causes nodejs grpc
75
93
// to hit max message size bug
@@ -78,40 +96,46 @@ func SendProducerBlockEvent(block *common.Block) error {
78
96
caPayload .Results = nil
79
97
chaincodeActionPayload .Action .ProposalResponsePayload , err = utils .GetBytesProposalResponsePayload (propRespPayload .ProposalHash , caPayload .Response , caPayload .Results , caPayload .Events , caPayload .ChaincodeId )
80
98
if err != nil {
81
- return fmt .Errorf ("error marshalling tx proposal payload for block event: %s" , err )
99
+ return nil , nil , "" , fmt .Errorf ("error marshalling tx proposal payload for block event: %s" , err )
82
100
}
83
101
tx .Actions [0 ].Payload , err = utils .GetBytesChaincodeActionPayload (chaincodeActionPayload )
84
102
if err != nil {
85
- return fmt .Errorf ("error marshalling tx action payload for block event: %s" , err )
103
+ return nil , nil , "" , fmt .Errorf ("error marshalling tx action payload for block event: %s" , err )
86
104
}
87
105
payload .Data , err = utils .GetBytesTransaction (tx )
88
106
if err != nil {
89
- return fmt .Errorf ("error marshalling payload for block event: %s" , err )
107
+ return nil , nil , "" , fmt .Errorf ("error marshalling payload for block event: %s" , err )
90
108
}
91
109
env .Payload , err = utils .GetBytesPayload (payload )
92
110
if err != nil {
93
- return fmt .Errorf ("error marshalling tx envelope for block event: %s" , err )
111
+ return nil , nil , "" , fmt .Errorf ("error marshalling tx envelope for block event: %s" , err )
94
112
}
95
113
ebytes , err = utils .GetBytesEnvelope (env )
96
114
if err != nil {
97
- return fmt .Errorf ("cannot marshal transaction %s" , err )
115
+ return nil , nil , "" , fmt .Errorf ("cannot marshal transaction %s" , err )
98
116
}
99
117
}
100
118
}
101
119
}
102
- bevent .Data .Data = append (bevent .Data .Data , ebytes )
120
+ blockForEvent .Data .Data = append (blockForEvent .Data .Data , ebytes )
103
121
}
122
+ filteredBlockForEvent .ChannelId = channelID
123
+ filteredBlockForEvent .Number = block .Header .Number
124
+ filteredBlockForEvent .FilteredTx = filteredTxArray
104
125
105
- logger .Infof ("Channel [%s]: Sending event for block number [%d]" , channelId , block .Header .Number )
106
-
107
- return Send (CreateBlockEvent (bevent ))
126
+ return CreateBlockEvent (blockForEvent ), CreateFilteredBlockEvent (filteredBlockForEvent ), channelID , nil
108
127
}
109
128
110
129
//CreateBlockEvent creates a Event from a Block
111
130
func CreateBlockEvent (te * common.Block ) * pb.Event {
112
131
return & pb.Event {Event : & pb.Event_Block {Block : te }}
113
132
}
114
133
134
+ //CreateFilteredBlockEvent creates a Event from a FilteredBlock
135
+ func CreateFilteredBlockEvent (te * pb.FilteredBlock ) * pb.Event {
136
+ return & pb.Event {Event : & pb.Event_FilteredBlock {FilteredBlock : te }}
137
+ }
138
+
115
139
//CreateChaincodeEvent creates a Event from a ChaincodeEvent
116
140
func CreateChaincodeEvent (te * pb.ChaincodeEvent ) * pb.Event {
117
141
return & pb.Event {Event : & pb.Event_ChaincodeEvent {ChaincodeEvent : te }}
0 commit comments