Skip to content

Commit

Permalink
Merge "[FAB-1298] Remove queueing from broadcast"
Browse files Browse the repository at this point in the history
  • Loading branch information
JonathanLevi authored and Gerrit Code Review committed Dec 21, 2016
2 parents d2371b9 + 2d24b31 commit 5adaef7
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 143 deletions.
94 changes: 26 additions & 68 deletions orderer/common/broadcast/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,65 +58,19 @@ type Support interface {
}

type handlerImpl struct {
queueSize int
sm SupportManager
sm SupportManager
}

// NewHandlerImpl constructs a new implementation of the Handler interface
func NewHandlerImpl(sm SupportManager, queueSize int) Handler {
func NewHandlerImpl(sm SupportManager) Handler {
return &handlerImpl{
queueSize: queueSize,
sm: sm,
sm: sm,
}
}

// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
b := newBroadcaster(bh)
defer close(b.queue)
go b.drainQueue()
return b.queueEnvelopes(srv)
}

type msgAndSupport struct {
msg *cb.Envelope
support Support
}

type broadcaster struct {
bs *handlerImpl
queue chan *msgAndSupport
exitChan chan struct{}
}

func newBroadcaster(bs *handlerImpl) *broadcaster {
b := &broadcaster{
bs: bs,
queue: make(chan *msgAndSupport, bs.queueSize),
exitChan: make(chan struct{}),
}
return b
}

func (b *broadcaster) drainQueue() {
defer close(b.exitChan)
for msgAndSupport := range b.queue {
if !msgAndSupport.support.Enqueue(msgAndSupport.msg) {
logger.Debugf("Consenter instructed us to shut down")
return
}
}
logger.Debugf("Exiting because the queue channel closed")
}

func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) error {

for {
select {
case <-b.exitChan:
return nil
default:
}
msg, err := srv.Recv()
if err != nil {
return err
Expand All @@ -129,32 +83,36 @@ func (b *broadcaster) queueEnvelopes(srv ab.AtomicBroadcast_BroadcastServer) err
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

support, ok := b.bs.sm.GetChain(payload.Header.ChainHeader.ChainID)
support, ok := bh.sm.GetChain(payload.Header.ChainHeader.ChainID)
if !ok {
// Chain not found, maybe create one?
if payload.Header.ChainHeader.Type != int32(cb.HeaderType_CONFIGURATION_TRANSACTION) {
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
} else {
logger.Debugf("Proposing new chain")
err = srv.Send(&ab.BroadcastResponse{Status: b.bs.sm.ProposeChain(msg)})
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_NOT_FOUND})
}
} else {
// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)

if filterErr != nil {
logger.Debugf("Rejecting broadcast message")
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
} else {
select {
case b.queue <- &msgAndSupport{msg: msg, support: support}:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
default:
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

logger.Debugf("Proposing new chain")
err = srv.Send(&ab.BroadcastResponse{Status: bh.sm.ProposeChain(msg)})
if err != nil {
return err
}
continue
}

// Normal transaction for existing chain
_, filterErr := support.Filters().Apply(msg)

if filterErr != nil {
logger.Debugf("Rejecting broadcast message")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
}

if !support.Enqueue(msg) {
logger.Debugf("Consenter instructed us to shut down")
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})

if err != nil {
return err
}
Expand Down
125 changes: 51 additions & 74 deletions orderer/common/broadcast/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package broadcast
import (
"fmt"
"testing"
"time"

"github.com/hyperledger/fabric/orderer/common/filter"
cb "github.com/hyperledger/fabric/protos/common"
Expand Down Expand Up @@ -73,21 +74,13 @@ func (mm *mockSupportManager) ProposeChain(configTx *cb.Envelope) cb.Status {
filter.EmptyRejectRule,
filter.AcceptRule,
}),
queue: make(chan *cb.Envelope),
}
return cb.Status_SUCCESS
}

func (mm *mockSupportManager) halt() {
for _, chain := range mm.chains {
chain.halt()
}
}

type mockSupport struct {
filters *filter.RuleSet
queue chan *cb.Envelope
done bool
filters *filter.RuleSet
rejectEnqueue bool
}

func (ms *mockSupport) Filters() *filter.RuleSet {
Expand All @@ -96,16 +89,7 @@ func (ms *mockSupport) Filters() *filter.RuleSet {

// Enqueue sends a message for ordering
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
ms.queue <- env
return !ms.done
}

func (ms *mockSupport) halt() {
ms.done = true
select {
case <-ms.queue:
default:
}
return !ms.rejectEnqueue
}

func makeConfigMessage(chainID string) *cb.Envelope {
Expand Down Expand Up @@ -137,29 +121,31 @@ func makeMessage(chainID string, data []byte) *cb.Envelope {
}
}

func getMultichainManager() *mockSupportManager {
func getMockSupportManager() (*mockSupportManager, *mockSupport) {
filters := filter.NewRuleSet([]filter.Rule{
filter.EmptyRejectRule,
filter.AcceptRule,
})
mm := &mockSupportManager{
chains: make(map[string]*mockSupport),
}
mm.chains[string(systemChain)] = &mockSupport{
mSysChain := &mockSupport{
filters: filters,
queue: make(chan *cb.Envelope),
}
return mm
mm.chains[string(systemChain)] = mSysChain
return mm, mSysChain
}

func TestQueueOverflow(t *testing.T) {
mm := getMultichainManager()
defer mm.halt()
bh := NewHandlerImpl(mm, 2)
func TestEnqueueFailure(t *testing.T) {
mm, mSysChain := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
b := newBroadcaster(bh.(*handlerImpl))
go b.queueEnvelopes(m)
done := make(chan struct{})
go func() {
bh.Handle(m)
close(done)
}()

for i := 0; i < 2; i++ {
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
Expand All @@ -169,86 +155,77 @@ func TestQueueOverflow(t *testing.T) {
}
}

mSysChain.rejectEnqueue = true
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
reply := <-m.sendChan
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}

}

func TestMultiQueueOverflow(t *testing.T) {
mm := getMultichainManager()
defer mm.halt()
bh := NewHandlerImpl(mm, 2)
ms := []*mockB{newMockB(), newMockB(), newMockB()}

for _, m := range ms {
defer close(m.recvChan)
b := newBroadcaster(bh.(*handlerImpl))
go b.queueEnvelopes(m)
}

for _, m := range ms {
for i := 0; i < 2; i++ {
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully queued the message")
}
}
}

for _, m := range ms {
m.recvChan <- makeMessage(systemChain, []byte("Some bytes"))
reply := <-m.sendChan
if reply.Status != cb.Status_SERVICE_UNAVAILABLE {
t.Fatalf("Should not have successfully queued the message")
}
select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("Should have terminated the stream")
}
}

func TestEmptyEnvelope(t *testing.T) {
mm := getMultichainManager()
defer mm.halt()
bh := NewHandlerImpl(mm, 2)
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
done := make(chan struct{})
go func() {
bh.Handle(m)
close(done)
}()

m.recvChan <- &cb.Envelope{}
reply := <-m.sendChan
if reply.Status != cb.Status_BAD_REQUEST {
t.Fatalf("Should have rejected the null message")
}

select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("Should have terminated the stream")
}
}

func TestBadChainID(t *testing.T) {
mm := getMultichainManager()
defer mm.halt()
bh := NewHandlerImpl(mm, 2)
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
done := make(chan struct{})
go func() {
bh.Handle(m)
close(done)
}()

m.recvChan <- makeMessage("Wrong chain", []byte("Some bytes"))
reply := <-m.sendChan
if reply.Status != cb.Status_NOT_FOUND {
t.Fatalf("Should have rejected message to a chain which does not exist")
}

select {
case <-done:
case <-time.After(time.Second):
t.Fatalf("Should have terminated the stream")
}
}

func TestNewChainID(t *testing.T) {
mm := getMultichainManager()
defer mm.halt()
bh := NewHandlerImpl(mm, 2)
mm, _ := getMockSupportManager()
bh := NewHandlerImpl(mm)
m := newMockB()
defer close(m.recvChan)
go bh.Handle(m)
newChainID := "New Chain"

m.recvChan <- makeConfigMessage("New chain")
m.recvChan <- makeConfigMessage(newChainID)
reply := <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have created a new chain, got %d", reply.Status)
Expand All @@ -258,9 +235,9 @@ func TestNewChainID(t *testing.T) {
t.Fatalf("Should have created a new chain")
}

m.recvChan <- makeMessage("New chain", []byte("Some bytes"))
m.recvChan <- makeMessage(newChainID, []byte("Some bytes"))
reply = <-m.sendChan
if reply.Status != cb.Status_SUCCESS {
t.Fatalf("Should have successfully sent message to new chain")
t.Fatalf("Should have successfully sent message to new chain, got %v", reply)
}
}
2 changes: 1 addition & 1 deletion orderer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func NewServer(ml multichain.Manager, queueSize, maxWindowSize int) ab.AtomicBro
logger.Infof("Starting orderer")

s := &server{
bh: broadcast.NewHandlerImpl(broadcastSupport{ml}, queueSize),
dh: deliver.NewHandlerImpl(deliverSupport{ml}),
bh: broadcast.NewHandlerImpl(broadcastSupport{ml}),
}
return s
}
Expand Down

0 comments on commit 5adaef7

Please sign in to comment.