Skip to content

Commit c39d69b

Browse files
committed
[FAB-7273] Update deliver to facilitate usage on peer
This CR updates the deliver functionality to facilitate its usage on a peer as well as an orderer. This required: - modifying the signal logic for when a new block is available due to the difference in addition of blocks to the ledger between the orderer and the peer. The signal logic is now handled using the iterator itself, which signals when it finds a new block - adding a policy variable to the deliver handler to ensure the peer and orderer each can control access to deliver Change-Id: Iebb6c25a8c5ac32d65f909eb0519f26bfde0dc31 Signed-off-by: Will Lahti <wtlahti@us.ibm.com>
1 parent 0dfe4f3 commit c39d69b

File tree

6 files changed

+82
-95
lines changed

6 files changed

+82
-95
lines changed

common/deliver/deliver.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,13 +65,15 @@ type Support interface {
6565
}
6666

6767
type deliverServer struct {
68-
sm SupportManager
68+
sm SupportManager
69+
policyName string
6970
}
7071

7172
// NewHandlerImpl creates an implementation of the Handler interface
72-
func NewHandlerImpl(sm SupportManager) Handler {
73+
func NewHandlerImpl(sm SupportManager, policyName string) Handler {
7374
return &deliverServer{
74-
sm: sm,
75+
sm: sm,
76+
policyName: policyName,
7577
}
7678
}
7779

@@ -137,7 +139,7 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
137139

138140
lastConfigSequence := chain.Sequence()
139141

140-
sf := NewSigFilter(policies.ChannelReaders, chain)
142+
sf := NewSigFilter(ds.policyName, chain)
141143
if err := sf.Apply(envelope); err != nil {
142144
logger.Warningf("[channel: %s] Received unauthorized deliver request from %s: %s", chdr.ChannelId, addr, err)
143145
return sendStatusReply(srv, cb.Status_FORBIDDEN)
@@ -173,21 +175,21 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
173175
}
174176

175177
for {
176-
if seekInfo.Behavior == ab.SeekInfo_BLOCK_UNTIL_READY {
177-
select {
178-
case <-erroredChan:
179-
logger.Warningf("[channel: %s] Aborting deliver for request because of consenter error", chdr.ChannelId, addr)
180-
return sendStatusReply(srv, cb.Status_SERVICE_UNAVAILABLE)
181-
case <-cursor.ReadyChan():
182-
}
183-
} else {
184-
select {
185-
case <-cursor.ReadyChan():
186-
default:
178+
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
179+
if number > chain.Reader().Height()-1 {
187180
return sendStatusReply(srv, cb.Status_NOT_FOUND)
188181
}
189182
}
190183

184+
block, status := nextBlock(cursor, erroredChan)
185+
if status != cb.Status_SUCCESS {
186+
cursor.Close()
187+
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
188+
return sendStatusReply(srv, status)
189+
}
190+
// increment block number to support FAIL_IF_NOT_READY deliver behavior
191+
number++
192+
191193
currentConfigSequence := chain.Sequence()
192194
if currentConfigSequence > lastConfigSequence {
193195
lastConfigSequence = currentConfigSequence
@@ -197,12 +199,6 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
197199
}
198200
}
199201

200-
block, status := cursor.Next()
201-
if status != cb.Status_SUCCESS {
202-
logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status)
203-
return sendStatusReply(srv, status)
204-
}
205-
206202
logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr)
207203

208204
if err := sendBlockReply(srv, block); err != nil {
@@ -226,6 +222,22 @@ func (ds *deliverServer) deliverBlocks(srv ab.AtomicBroadcast_DeliverServer, env
226222

227223
}
228224

225+
func nextBlock(cursor blockledger.Iterator, cancel <-chan struct{}) (block *cb.Block, status cb.Status) {
226+
done := make(chan struct{})
227+
go func() {
228+
defer close(done)
229+
block, status = cursor.Next()
230+
}()
231+
232+
select {
233+
case <-done:
234+
return
235+
case <-cancel:
236+
logger.Warningf("Aborting deliver for request because of background error")
237+
return nil, cb.Status_SERVICE_UNAVAILABLE
238+
}
239+
}
240+
229241
func sendStatusReply(srv ab.AtomicBroadcast_DeliverServer, status cb.Status) error {
230242
return srv.Send(&ab.DeliverResponse{
231243
Type: &ab.DeliverResponse_Status{Status: status},

common/deliver/deliver_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ var genesisBlock = cb.NewBlock(0, nil)
4141

4242
var systemChainID = "systemChain"
4343

44+
var policyName = policies.ChannelReaders
45+
4446
const ledgerSize = 10
4547

4648
func init() {
@@ -156,7 +158,7 @@ func initializeDeliverHandler() Handler {
156158
l.Append(blockledger.CreateNextBlock(l, []*cb.Envelope{{Payload: []byte(fmt.Sprintf("%d", i))}}))
157159
}
158160

159-
return NewHandlerImpl(mm)
161+
return NewHandlerImpl(mm, policyName)
160162
}
161163

162164
func newMockMultichainManager() *mockSupportManager {
@@ -288,7 +290,7 @@ func TestUnauthorizedSeek(t *testing.T) {
288290

289291
m := newMockD()
290292
defer close(m.recvChan)
291-
ds := NewHandlerImpl(mm)
293+
ds := NewHandlerImpl(mm, policyName)
292294

293295
go ds.Handle(m)
294296

@@ -313,7 +315,7 @@ func TestRevokedAuthorizationSeek(t *testing.T) {
313315

314316
m := newMockD()
315317
defer close(m.recvChan)
316-
ds := NewHandlerImpl(mm)
318+
ds := NewHandlerImpl(mm, policyName)
317319

318320
go ds.Handle(m)
319321

@@ -396,7 +398,7 @@ func TestBlockingSeek(t *testing.T) {
396398

397399
m := newMockD()
398400
defer close(m.recvChan)
399-
ds := NewHandlerImpl(mm)
401+
ds := NewHandlerImpl(mm, policyName)
400402

401403
go ds.Handle(m)
402404

@@ -450,7 +452,7 @@ func TestErroredSeek(t *testing.T) {
450452

451453
m := newMockD()
452454
defer close(m.recvChan)
453-
ds := NewHandlerImpl(mm)
455+
ds := NewHandlerImpl(mm, policyName)
454456

455457
go ds.Handle(m)
456458

@@ -474,7 +476,7 @@ func TestErroredBlockingSeek(t *testing.T) {
474476

475477
m := newMockD()
476478
defer close(m.recvChan)
477-
ds := NewHandlerImpl(mm)
479+
ds := NewHandlerImpl(mm, policyName)
478480

479481
go ds.Handle(m)
480482

@@ -499,7 +501,7 @@ func TestErroredBlockingSeek(t *testing.T) {
499501

500502
func TestSGracefulShutdown(t *testing.T) {
501503
m := newMockD()
502-
ds := NewHandlerImpl(nil)
504+
ds := NewHandlerImpl(nil, policyName)
503505

504506
close(m.recvChan)
505507
assert.NoError(t, ds.Handle(m), "Expected no error for hangup")
@@ -527,7 +529,7 @@ func TestReversedSeqSeek(t *testing.T) {
527529
}
528530

529531
func TestBadStreamRecv(t *testing.T) {
530-
bh := NewHandlerImpl(nil)
532+
bh := NewHandlerImpl(nil, policyName)
531533
assert.Error(t, bh.Handle(&erroneousRecvMockD{}), "Should catch unexpected stream error")
532534
}
533535

@@ -616,7 +618,7 @@ func TestChainNotFound(t *testing.T) {
616618
m := newMockD()
617619
defer close(m.recvChan)
618620

619-
ds := NewHandlerImpl(mm)
621+
ds := NewHandlerImpl(mm, policyName)
620622
go ds.Handle(m)
621623

622624
m.recvChan <- makeSeek(systemChainID, &ab.SeekInfo{Start: seekNewest, Stop: seekNewest, Behavior: ab.SeekInfo_BLOCK_UNTIL_READY})

common/ledger/blockledger/file/factory.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (flf *fileLedgerFactory) GetOrCreate(chainID string) (blockledger.ReadWrite
4646
if err != nil {
4747
return nil, err
4848
}
49-
ledger = &fileLedger{blockStore: blockStore, signal: make(chan struct{})}
49+
ledger = NewFileLedger(blockStore)
5050
flf.ledgers[key] = ledger
5151
return ledger, nil
5252
}

common/ledger/blockledger/file/impl.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package fileledger
1919
import (
2020
"github.com/hyperledger/fabric/common/flogging"
2121
"github.com/hyperledger/fabric/common/ledger"
22-
"github.com/hyperledger/fabric/common/ledger/blkstorage"
2322
"github.com/hyperledger/fabric/common/ledger/blockledger"
2423
cb "github.com/hyperledger/fabric/protos/common"
2524
ab "github.com/hyperledger/fabric/protos/orderer"
@@ -40,31 +39,38 @@ func init() {
4039
}
4140

4241
// FileLedger is a struct used to interact with a node's ledger
43-
type fileLedger struct {
44-
blockStore blkstorage.BlockStore
42+
type FileLedger struct {
43+
blockStore FileLedgerBlockStore
4544
signal chan struct{}
4645
}
4746

47+
// FileLedgerBlockStore defines the interface to interact with deliver when using a
48+
// file ledger
49+
type FileLedgerBlockStore interface {
50+
AddBlock(block *cb.Block) error
51+
GetBlockchainInfo() (*cb.BlockchainInfo, error)
52+
RetrieveBlocks(startBlockNumber uint64) (ledger.ResultsIterator, error)
53+
}
54+
55+
// NewFileLedger creates a new FileLedger for interaction with the ledger
56+
func NewFileLedger(blockStore FileLedgerBlockStore) *FileLedger {
57+
return &FileLedger{blockStore: blockStore, signal: make(chan struct{})}
58+
}
59+
4860
type fileLedgerIterator struct {
49-
ledger *fileLedger
61+
ledger *FileLedger
5062
blockNumber uint64
5163
commonIterator ledger.ResultsIterator
5264
}
5365

5466
// Next blocks until there is a new block available, or returns an error if the
5567
// next block is no longer retrievable
5668
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
57-
for {
58-
if i.blockNumber < i.ledger.Height() {
59-
result, err := i.commonIterator.Next()
60-
if err != nil {
61-
return nil, cb.Status_SERVICE_UNAVAILABLE
62-
}
63-
i.blockNumber++
64-
return result.(*cb.Block), cb.Status_SUCCESS
65-
}
66-
<-i.ledger.signal
69+
result, err := i.commonIterator.Next()
70+
if err != nil {
71+
return nil, cb.Status_SERVICE_UNAVAILABLE
6772
}
73+
return result.(*cb.Block), cb.Status_SUCCESS
6874
}
6975

7076
// ReadyChan supplies a channel which will block until Next will not block
@@ -83,7 +89,7 @@ func (i *fileLedgerIterator) Close() {
8389

8490
// Iterator returns an Iterator, as specified by an ab.SeekInfo message, and its
8591
// starting block number
86-
func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
92+
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
8793
var startingBlockNumber uint64
8894
switch start := startPosition.Type.(type) {
8995
case *ab.SeekPosition_Oldest:
@@ -114,7 +120,7 @@ func (fl *fileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iter
114120
}
115121

116122
// Height returns the number of blocks on the ledger
117-
func (fl *fileLedger) Height() uint64 {
123+
func (fl *FileLedger) Height() uint64 {
118124
info, err := fl.blockStore.GetBlockchainInfo()
119125
if err != nil {
120126
logger.Panic(err)
@@ -123,7 +129,7 @@ func (fl *fileLedger) Height() uint64 {
123129
}
124130

125131
// Append a new block to the ledger
126-
func (fl *fileLedger) Append(block *cb.Block) error {
132+
func (fl *FileLedger) Append(block *cb.Block) error {
127133
err := fl.blockStore.AddBlock(block)
128134
if err == nil {
129135
close(fl.signal)

0 commit comments

Comments
 (0)