Skip to content

Commit e2f285c

Browse files
committed
[FAB-5568] Add filtered block event
This CR adds a new FilteredBlock event, which is currently sent whenever a Block event is sent. This FilteredBlock contains minimal information about the block and the transactions it contains. Change-Id: Ia416184451f9073bd855b3a4f5811bd3332b9928 Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent a945767 commit e2f285c

File tree

10 files changed

+279
-90
lines changed

10 files changed

+279
-90
lines changed

core/committer/committer_impl.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,17 @@ func (lc *LedgerCommitter) GetPvtDataAndBlockByNum(seqNum uint64) (*ledger.Block
125125

126126
// postCommit publish event or handle other tasks once block committed to the ledger
127127
func (lc *LedgerCommitter) postCommit(block *common.Block) {
128-
// send block event *after* the block has been committed
129-
if err := producer.SendProducerBlockEvent(block); err != nil {
130-
logger.Errorf("Error publishing block %d, because: %v", block.Header.Number, err)
128+
// create/send block events *after* the block has been committed
129+
bevent, fbevent, channelID, err := producer.CreateBlockEvents(block)
130+
if err != nil {
131+
logger.Errorf("Channel [%s] Error processing block events for block number [%d]: %s", channelID, block.Header.Number, err)
132+
} else {
133+
if err := producer.Send(bevent); err != nil {
134+
logger.Errorf("Channel [%s] Error sending block event for block number [%d]: %s", channelID, block.Header.Number, err)
135+
}
136+
if err := producer.Send(fbevent); err != nil {
137+
logger.Errorf("Channel [%s] Error sending filtered block event for block number [%d]: %s", channelID, block.Header.Number, err)
138+
}
131139
}
132140
}
133141

core/scc/cscc/configure.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,13 @@ func joinChain(chainID string, block *common.Block) pb.Response {
194194

195195
peer.InitChain(chainID)
196196

197-
if err := producer.SendProducerBlockEvent(block); err != nil {
198-
cnflogger.Errorf("Error sending block event %s", err)
197+
bevent, _, _, err := producer.CreateBlockEvents(block)
198+
if err != nil {
199+
cnflogger.Errorf("Error processing block events for block number [%d]: %s", block.Header.Number, err)
200+
} else {
201+
if err := producer.Send(bevent); err != nil {
202+
cnflogger.Errorf("Channel [%s] Error sending block event for block number [%d]: %s", chainID, block.Header.Number, err)
203+
}
199204
}
200205

201206
return shim.Success(nil)

events/producer/eventhelper.go

Lines changed: 50 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -19,57 +19,75 @@ package producer
1919
import (
2020
"fmt"
2121

22+
"github.com/hyperledger/fabric/core/ledger/util"
2223
"github.com/hyperledger/fabric/protos/common"
2324
pb "github.com/hyperledger/fabric/protos/peer"
2425
"github.com/hyperledger/fabric/protos/utils"
2526
)
2627

27-
// SendProducerBlockEvent sends block event to clients
28-
func SendProducerBlockEvent(block *common.Block) error {
28+
// CreateBlockEvents creates block events for a block. It removes the RW set
29+
// and creates a block event and a filtered block event. Sending the events
30+
// is the responsibility of the code that calls this function.
31+
func CreateBlockEvents(block *common.Block) (bevent *pb.Event, fbevent *pb.Event, channelID string, err error) {
2932
logger.Debugf("Entry")
3033
defer logger.Debugf("Exit")
31-
bevent := &common.Block{}
32-
bevent.Header = block.Header
33-
bevent.Metadata = block.Metadata
34-
bevent.Data = &common.BlockData{}
35-
var channelId string
36-
for _, d := range block.Data.Data {
34+
35+
blockForEvent := &common.Block{}
36+
filteredBlockForEvent := &pb.FilteredBlock{}
37+
filteredTxArray := []*pb.FilteredTransaction{}
38+
blockForEvent.Header = block.Header
39+
blockForEvent.Metadata = block.Metadata
40+
blockForEvent.Data = &common.BlockData{}
41+
txsFltr := util.TxValidationFlags(block.Metadata.Metadata[common.BlockMetadataIndex_TRANSACTIONS_FILTER])
42+
43+
for txIndex, d := range block.Data.Data {
3744
ebytes := d
3845
if ebytes != nil {
3946
if env, err := utils.GetEnvelopeFromBlock(ebytes); err != nil {
40-
logger.Errorf("error getting tx from block(%s)\n", err)
47+
logger.Errorf("error getting tx from block: %s", err)
4148
} else if env != nil {
4249
// get the payload from the envelope
4350
payload, err := utils.GetPayload(env)
4451
if err != nil {
45-
return fmt.Errorf("could not extract payload from envelope, err %s", err)
52+
return nil, nil, "", fmt.Errorf("could not extract payload from envelope: %s", err)
4653
}
4754

4855
chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
4956
if err != nil {
50-
return err
57+
return nil, nil, "", err
5158
}
52-
channelId = chdr.ChannelId
59+
channelID = chdr.ChannelId
5360

5461
if common.HeaderType(chdr.Type) == common.HeaderType_ENDORSER_TRANSACTION {
55-
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelId, block.Header.Number, chdr.TxId)
62+
logger.Debugf("Channel [%s]: Block event for block number [%d] contains transaction id: %s", channelID, block.Header.Number, chdr.TxId)
5663
tx, err := utils.GetTransaction(payload.Data)
5764
if err != nil {
58-
return fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
65+
return nil, nil, "", fmt.Errorf("error unmarshalling transaction payload for block event: %s", err)
5966
}
6067
chaincodeActionPayload, err := utils.GetChaincodeActionPayload(tx.Actions[0].Payload)
6168
if err != nil {
62-
return fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
69+
return nil, nil, "", fmt.Errorf("error unmarshalling transaction action payload for block event: %s", err)
6370
}
6471
propRespPayload, err := utils.GetProposalResponsePayload(chaincodeActionPayload.Action.ProposalResponsePayload)
6572
if err != nil {
66-
return fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
73+
return nil, nil, "", fmt.Errorf("error unmarshalling proposal response payload for block event: %s", err)
6774
}
6875
//ENDORSER_ACTION, ProposalResponsePayload.Extension field contains ChaincodeAction
6976
caPayload, err := utils.GetChaincodeAction(propRespPayload.Extension)
7077
if err != nil {
71-
return fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
78+
return nil, nil, "", fmt.Errorf("error unmarshalling chaincode action for block event: %s", err)
79+
}
80+
81+
ccEvent, err := utils.GetChaincodeEvents(caPayload.Events)
82+
filteredTx := &pb.FilteredTransaction{Txid: chdr.TxId, TxValidationCode: txsFltr.Flag(txIndex)}
83+
84+
if ccEvent != nil {
85+
filteredCcEvent := ccEvent
86+
// nil out ccevent payload
87+
filteredCcEvent.Payload = nil
88+
filteredTx.CcEvent = filteredCcEvent
7289
}
90+
filteredTxArray = append(filteredTxArray, filteredTx)
7391
// Drop read write set from transaction before sending block event
7492
// Performance issue with chaincode deploy txs and causes nodejs grpc
7593
// to hit max message size bug
@@ -78,40 +96,46 @@ func SendProducerBlockEvent(block *common.Block) error {
7896
caPayload.Results = nil
7997
chaincodeActionPayload.Action.ProposalResponsePayload, err = utils.GetBytesProposalResponsePayload(propRespPayload.ProposalHash, caPayload.Response, caPayload.Results, caPayload.Events, caPayload.ChaincodeId)
8098
if err != nil {
81-
return fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
99+
return nil, nil, "", fmt.Errorf("error marshalling tx proposal payload for block event: %s", err)
82100
}
83101
tx.Actions[0].Payload, err = utils.GetBytesChaincodeActionPayload(chaincodeActionPayload)
84102
if err != nil {
85-
return fmt.Errorf("error marshalling tx action payload for block event: %s", err)
103+
return nil, nil, "", fmt.Errorf("error marshalling tx action payload for block event: %s", err)
86104
}
87105
payload.Data, err = utils.GetBytesTransaction(tx)
88106
if err != nil {
89-
return fmt.Errorf("error marshalling payload for block event: %s", err)
107+
return nil, nil, "", fmt.Errorf("error marshalling payload for block event: %s", err)
90108
}
91109
env.Payload, err = utils.GetBytesPayload(payload)
92110
if err != nil {
93-
return fmt.Errorf("error marshalling tx envelope for block event: %s", err)
111+
return nil, nil, "", fmt.Errorf("error marshalling tx envelope for block event: %s", err)
94112
}
95113
ebytes, err = utils.GetBytesEnvelope(env)
96114
if err != nil {
97-
return fmt.Errorf("cannot marshal transaction %s", err)
115+
return nil, nil, "", fmt.Errorf("cannot marshal transaction %s", err)
98116
}
99117
}
100118
}
101119
}
102-
bevent.Data.Data = append(bevent.Data.Data, ebytes)
120+
blockForEvent.Data.Data = append(blockForEvent.Data.Data, ebytes)
103121
}
122+
filteredBlockForEvent.ChannelId = channelID
123+
filteredBlockForEvent.Number = block.Header.Number
124+
filteredBlockForEvent.FilteredTx = filteredTxArray
104125

105-
logger.Infof("Channel [%s]: Sending event for block number [%d]", channelId, block.Header.Number)
106-
107-
return Send(CreateBlockEvent(bevent))
126+
return CreateBlockEvent(blockForEvent), CreateFilteredBlockEvent(filteredBlockForEvent), channelID, nil
108127
}
109128

110129
//CreateBlockEvent creates a Event from a Block
111130
func CreateBlockEvent(te *common.Block) *pb.Event {
112131
return &pb.Event{Event: &pb.Event_Block{Block: te}}
113132
}
114133

134+
//CreateFilteredBlockEvent creates a Event from a FilteredBlock
135+
func CreateFilteredBlockEvent(te *pb.FilteredBlock) *pb.Event {
136+
return &pb.Event{Event: &pb.Event_FilteredBlock{FilteredBlock: te}}
137+
}
138+
115139
//CreateChaincodeEvent creates a Event from a ChaincodeEvent
116140
func CreateChaincodeEvent(te *pb.ChaincodeEvent) *pb.Event {
117141
return &pb.Event{Event: &pb.Event_ChaincodeEvent{ChaincodeEvent: te}}

events/producer/events.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,8 @@ func AddEventType(eventType pb.EventType) error {
268268
switch eventType {
269269
case pb.EventType_BLOCK:
270270
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
271+
case pb.EventType_FILTEREDBLOCK:
272+
gEventProcessor.eventConsumers[eventType] = &genericHandlerList{handlers: make(map[*handler]bool)}
271273
case pb.EventType_CHAINCODE:
272274
gEventProcessor.eventConsumers[eventType] = &chaincodeHandlerList{handlers: make(map[string]map[string]map[*handler]bool)}
273275
case pb.EventType_REJECTION:

events/producer/handler.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ func getInterestKey(interest pb.Interest) string {
5151
switch interest.EventType {
5252
case pb.EventType_BLOCK:
5353
key = "/" + strconv.Itoa(int(pb.EventType_BLOCK))
54+
case pb.EventType_FILTEREDBLOCK:
55+
key = "/" + strconv.Itoa(int(pb.EventType_FILTEREDBLOCK))
5456
case pb.EventType_REJECTION:
5557
key = "/" + strconv.Itoa(int(pb.EventType_REJECTION))
5658
case pb.EventType_CHAINCODE:

events/producer/producer_test.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ var ehServer *EventsServer
6363
func (a *Adapter) GetInterestedEvents() ([]*ehpb.Interest, error) {
6464
return []*ehpb.Interest{
6565
&ehpb.Interest{EventType: ehpb.EventType_BLOCK},
66+
&ehpb.Interest{EventType: ehpb.EventType_FILTEREDBLOCK},
6667
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event1"}}},
6768
&ehpb.Interest{EventType: ehpb.EventType_CHAINCODE, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event2"}}},
6869
&ehpb.Interest{EventType: ehpb.EventType_REGISTER, RegInfo: &ehpb.Interest_ChaincodeRegInfo{ChaincodeRegInfo: &ehpb.ChaincodeReg{ChaincodeId: "0xffffffff", EventName: "event3"}}},
@@ -81,7 +82,7 @@ func (a *Adapter) updateCountNotify() {
8182

8283
func (a *Adapter) Recv(msg *ehpb.Event) (bool, error) {
8384
switch x := msg.Event.(type) {
84-
case *ehpb.Event_Block, *ehpb.Event_ChaincodeEvent, *ehpb.Event_Register, *ehpb.Event_Unregister:
85+
case *ehpb.Event_Block, *ehpb.Event_ChaincodeEvent, *ehpb.Event_Register, *ehpb.Event_Unregister, *ehpb.Event_FilteredBlock:
8586
a.updateCountNotify()
8687
case nil:
8788
// The field is not set.
@@ -213,9 +214,19 @@ func TestReceiveAnyMessage(t *testing.T) {
213214

214215
adapter.count = 1
215216
block := testutil.ConstructTestBlock(t, 1, 10, 100)
216-
if err = SendProducerBlockEvent(block); err != nil {
217+
218+
bevent, fbevent, _, err := CreateBlockEvents(block)
219+
if err != nil {
217220
t.Fail()
218-
t.Logf("Error sending message %s", err)
221+
t.Logf("Error processing block for events %s", err)
222+
}
223+
if err = Send(bevent); err != nil {
224+
t.Fail()
225+
t.Logf("Error sending block event: %s", err)
226+
}
227+
if err = Send(fbevent); err != nil {
228+
t.Fail()
229+
t.Logf("Error sending filtered block event: %s", err)
219230
}
220231

221232
emsg := createTestChaincodeEvent("0xffffffff", "event2")
@@ -224,8 +235,8 @@ func TestReceiveAnyMessage(t *testing.T) {
224235
t.Logf("Error sending message %s", err)
225236
}
226237

227-
//receive 2 messages - a block and a chaincode event
228-
for i := 0; i < 2; i++ {
238+
//receive 3 messages - a block, a filtered block, and a chaincode event
239+
for i := 0; i < 3; i++ {
229240
select {
230241
case <-adapter.notfy:
231242
case <-time.After(5 * time.Second):

events/producer/register_internal_events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ func getMessageType(e *pb.Event) pb.EventType {
4242
return pb.EventType_CHAINCODE
4343
case *pb.Event_Rejection:
4444
return pb.EventType_REJECTION
45+
case *pb.Event_FilteredBlock:
46+
return pb.EventType_FILTEREDBLOCK
4547
default:
4648
return -1
4749
}
@@ -53,4 +55,5 @@ func addInternalEventTypes() {
5355
AddEventType(pb.EventType_CHAINCODE)
5456
AddEventType(pb.EventType_REJECTION)
5557
AddEventType(pb.EventType_REGISTER)
58+
AddEventType(pb.EventType_FILTEREDBLOCK)
5659
}

protos/peer/admin.pb.go

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)