Skip to content

Commit 3e5c3e4

Browse files
author
Jason Yellick
committed
[FAB-5266] Replace Enqueue with Order/Configure
The current consenter interface only allows for one sort of message ingress. All messages are received via 'Enqueue', and treated identically. In order for the consenter to be able to differentiate and handle config vs non-config messages differently, the consenter needs two diferent ingress points for messages. This CR replaces the Enqueue method with two methods: Order and Configure. For the time being, these methods behave exactly as Enqueue, but will be leveraged in future CRs. Change-Id: I3701e5e3c0de4833a455c49acebbad70c6ed763c Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent ed9517e commit 3e5c3e4

File tree

11 files changed

+114
-42
lines changed

11 files changed

+114
-42
lines changed

orderer/common/broadcast/broadcast.go

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,13 @@ type SupportManager interface {
5353

5454
// Support provides the backing resources needed to support broadcast on a chain
5555
type Support interface {
56-
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
57-
Enqueue(env *cb.Envelope) bool
56+
// Order accepts a message or returns an error indicating the cause of failure
57+
// It ultimately passes through to the consensus.Chain interface
58+
Order(env *cb.Envelope, configSeq uint64) error
59+
60+
// Configure accepts a reconfiguration or returns an error indicating the cause of failure
61+
// It ultimately passes through to the consensus.Chain interface
62+
Configure(configUpdateMsg *cb.Envelope, config *cb.Envelope, configSeq uint64) error
5863

5964
// Filters returns the set of broadcast filters for this chain
6065
Filters() *filter.RuleSet
@@ -102,6 +107,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
102107
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
103108
}
104109

110+
isConfig := false
111+
configUpdateMsg := msg
105112
if chdr.Type == int32(cb.HeaderType_CONFIG_UPDATE) {
106113
logger.Debugf("Preprocessing CONFIG_UPDATE")
107114
msg, err = bh.sm.Process(msg)
@@ -126,6 +133,8 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
126133
logger.Criticalf("Generated bad transaction after CONFIG_UPDATE processing (empty channel ID)")
127134
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_INTERNAL_SERVER_ERROR})
128135
}
136+
137+
isConfig = true
129138
}
130139

131140
support, ok := bh.sm.GetChain(chdr.ChannelId)
@@ -144,7 +153,14 @@ func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
144153
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST})
145154
}
146155

147-
if !support.Enqueue(msg) {
156+
// XXX temporary hack to mesh interface definitions, will remove.
157+
if isConfig {
158+
err = support.Configure(configUpdateMsg, msg, 0)
159+
} else {
160+
err = support.Order(msg, 0)
161+
}
162+
163+
if err != nil {
148164
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE})
149165
}
150166

orderer/common/broadcast/broadcast_test.go

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -127,9 +127,17 @@ func (ms *mockSupport) Filters() *filter.RuleSet {
127127
return ms.filters
128128
}
129129

130-
// Enqueue sends a message for ordering
131-
func (ms *mockSupport) Enqueue(env *cb.Envelope) bool {
132-
return !ms.rejectEnqueue
130+
// Order sends a message for ordering
131+
func (ms *mockSupport) Order(env *cb.Envelope, configSeq uint64) error {
132+
if ms.rejectEnqueue {
133+
return fmt.Errorf("Reject")
134+
}
135+
return nil
136+
}
137+
138+
// Configure sends a reconfiguration message for ordering
139+
func (ms *mockSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
140+
return ms.Order(config, configSeq)
133141
}
134142

135143
func makeConfigMessage(chainID string) *cb.Envelope {
@@ -264,9 +272,9 @@ func TestGoodConfigUpdate(t *testing.T) {
264272
m := newMockB()
265273
defer close(m.recvChan)
266274
go bh.Handle(m)
267-
newChannelId := "New Chain"
275+
newChannelID := "New Chain"
268276

269-
m.recvChan <- makeConfigMessage(newChannelId)
277+
m.recvChan <- makeConfigMessage(newChannelID)
270278
reply := <-m.sendChan
271279
assert.Equal(t, cb.Status_SUCCESS, reply.Status, "Should have allowed a good CONFIG_UPDATE")
272280
}
@@ -301,9 +309,9 @@ func TestRejected(t *testing.T) {
301309
defer close(m.recvChan)
302310
go bh.Handle(m)
303311

304-
newChannelId := "New Chain"
312+
newChannelID := "New Chain"
305313

306-
m.recvChan <- makeConfigMessage(newChannelId)
314+
m.recvChan <- makeConfigMessage(newChannelID)
307315
reply := <-m.sendChan
308316
assert.Equal(t, cb.Status_BAD_REQUEST, reply.Status, "Should have rejected CONFIG_UPDATE")
309317
}

orderer/common/multichannel/chainsupport.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,14 @@ func (cs *ChainSupport) Reader() ledger.Reader {
197197
return cs.ledger
198198
}
199199

200-
// Enqueue takes a message and sends it to the consenter for ordering.
201-
func (cs *ChainSupport) Enqueue(env *cb.Envelope) bool {
202-
return cs.chain.Enqueue(env)
200+
// Order passes through to the Consenter implementation.
201+
func (cs *ChainSupport) Order(env *cb.Envelope, configSeq uint64) error {
202+
return cs.chain.Order(env, configSeq)
203+
}
204+
205+
// Configure passes through to the Consenter implementation.
206+
func (cs *ChainSupport) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
207+
return cs.chain.Configure(configUpdate, config, configSeq)
203208
}
204209

205210
// Errored returns whether the backing consenter has errored

orderer/common/multichannel/manager_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func TestManagerImpl(t *testing.T) {
178178
}
179179

180180
for _, message := range messages {
181-
chainSupport.Enqueue(message)
181+
chainSupport.Order(message, 0)
182182
}
183183

184184
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
@@ -489,7 +489,7 @@ func TestNewChain(t *testing.T) {
489489
chainSupport, ok := manager.GetChain(manager.SystemChannelID())
490490
assert.True(t, ok, "Could not find system channel")
491491

492-
chainSupport.Enqueue(wrapped)
492+
chainSupport.Configure(wrapped, wrapped, 0)
493493
func() {
494494
it, _ := rl.Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 1}}})
495495
defer it.Close()
@@ -521,7 +521,7 @@ func TestNewChain(t *testing.T) {
521521
}
522522

523523
for _, message := range messages {
524-
chainSupport.Enqueue(message)
524+
chainSupport.Order(message, 0)
525525
}
526526

527527
it, _ := chainSupport.Reader().Iterator(&ab.SeekPosition{Type: &ab.SeekPosition_Specified{Specified: &ab.SeekSpecified{Number: 0}}})

orderer/common/multichannel/util_test.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,14 @@ func (mch *mockChain) Errored() <-chan struct{} {
5454
return nil
5555
}
5656

57-
func (mch *mockChain) Enqueue(env *cb.Envelope) bool {
57+
func (mch *mockChain) Order(env *cb.Envelope, configSeq uint64) error {
5858
mch.queue <- env
59-
return true
59+
return nil
60+
}
61+
62+
func (mch *mockChain) Configure(configUpdate, config *cb.Envelope, configSeq uint64) error {
63+
mch.queue <- config
64+
return nil
6065
}
6166

6267
func (mch *mockChain) Start() {

orderer/consensus/consensus.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,27 @@ type Consenter interface {
3232
// 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka)
3333
// 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft)
3434
type Chain interface {
35-
// Enqueue accepts a message and returns true on acceptance, or false on failure.
36-
Enqueue(env *cb.Envelope) bool
35+
// NOTE: The solo/kafka consenters have not been updated to perform the revalidation
36+
// checks conditionally. For now, Order/Configure are essentially Enqueue as before.
37+
// This does not cause data inconsistency, but it wastes cycles and will be required
38+
// to properly support the ConfigUpdate concept once introduced
39+
40+
// Order accepts a message which has been processed at a given configSeq.
41+
// If the configSeq advances, it is the responsibility of the consenter
42+
// to revalidate and potentially discard the message
43+
// The consenter may return an error, indicating the message was not accepted
44+
Order(env *cb.Envelope, configSeq uint64) error
45+
46+
// Configure accepts a message which reconfigures the channel and will
47+
// trigger an update to the configSeq if committed. The configuration must have
48+
// been triggered by a ConfigUpdate message, which is included. If the config
49+
// sequence advances, it is the responsibility of the consenter to recompute the
50+
// resulting config, discarding the message if the reconfiguration is no longer
51+
// valid. While a configure message is in flight, the consenter should lock
52+
// and block additional calls to Order/Configure, any messages received will
53+
// need to be revalidated before ordering.
54+
// The consenter may return an error, indicating the message was not accepted
55+
Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error
3756

3857
// Errored returns a channel which will close when an error has occurred.
3958
// This is especially useful for the Deliver client, who must terminate waiting

orderer/consensus/kafka/chain.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,21 @@ func (chain *chainImpl) Halt() {
112112
}
113113
}
114114

115-
// Enqueue accepts a message and returns true on acceptance, or false otheriwse.
116115
// Implements the consensus.Chain interface. Called by Broadcast().
117-
func (chain *chainImpl) Enqueue(env *cb.Envelope) bool {
116+
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
117+
if !chain.enqueue(env) {
118+
return fmt.Errorf("Could not enqueue")
119+
}
120+
return nil
121+
}
122+
123+
// Implements the consensus.Chain interface. Called by Broadcast().
124+
func (chain *chainImpl) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
125+
return chain.Order(config, configSeq)
126+
}
127+
128+
// enqueue accepts a message and returns true on acceptance, or false otheriwse.
129+
func (chain *chainImpl) enqueue(env *cb.Envelope) bool {
118130
logger.Debugf("[channel: %s] Enqueueing envelope...", chain.support.ChainID())
119131
select {
120132
case <-chain.startChan: // The Start phase has completed

orderer/consensus/kafka/chain_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func TestChain(t *testing.T) {
195195
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
196196
})
197197

198-
t.Run("EnqueueIfNotStarted", func(t *testing.T) {
198+
t.Run("enqueueIfNotStarted", func(t *testing.T) {
199199
mockChannel, mockBroker, mockSupport := newMocks(t)
200200
defer func() { mockBroker.Close() }()
201201
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
@@ -216,7 +216,7 @@ func TestChain(t *testing.T) {
216216
SetMessage(mockChannel.topic(), mockChannel.partition(), newestOffset, message),
217217
})
218218

219-
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
219+
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
220220
})
221221

222222
t.Run("StartWithConsumerForChannelError", func(t *testing.T) {
@@ -247,7 +247,7 @@ func TestChain(t *testing.T) {
247247
assert.Panics(t, func() { startThread(chain) }, "Expected the Start() call to panic")
248248
})
249249

250-
t.Run("EnqueueProper", func(t *testing.T) {
250+
t.Run("enqueueProper", func(t *testing.T) {
251251
mockChannel, mockBroker, mockSupport := newMocks(t)
252252
defer func() { mockBroker.Close() }()
253253
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
@@ -273,14 +273,14 @@ func TestChain(t *testing.T) {
273273
t.Fatal("startChan should have been closed by now")
274274
}
275275

276-
// Enqueue should have access to the post path, and its ProduceRequest
276+
// enqueue should have access to the post path, and its ProduceRequest
277277
// should go by without error
278-
assert.True(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return true")
278+
assert.True(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return true")
279279

280280
chain.Halt()
281281
})
282282

283-
t.Run("EnqueueIfHalted", func(t *testing.T) {
283+
t.Run("enqueueIfHalted", func(t *testing.T) {
284284
mockChannel, mockBroker, mockSupport := newMocks(t)
285285
defer func() { mockBroker.Close() }()
286286
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
@@ -308,10 +308,10 @@ func TestChain(t *testing.T) {
308308
chain.Halt()
309309

310310
// haltChan should close access to the post path
311-
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
311+
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
312312
})
313313

314-
t.Run("EnqueueError", func(t *testing.T) {
314+
t.Run("enqueueError", func(t *testing.T) {
315315
mockChannel, mockBroker, mockSupport := newMocks(t)
316316
defer func() { mockBroker.Close() }()
317317
chain, _ := newChain(mockConsenter, mockSupport, newestOffset-1)
@@ -345,7 +345,7 @@ func TestChain(t *testing.T) {
345345
SetError(mockChannel.topic(), mockChannel.partition(), sarama.ErrNotLeaderForPartition),
346346
})
347347

348-
assert.False(t, chain.Enqueue(newMockEnvelope("fooMessage")), "Expected Enqueue call to return false")
348+
assert.False(t, chain.enqueue(newMockEnvelope("fooMessage")), "Expected enqueue call to return false")
349349
})
350350
}
351351

orderer/consensus/kafka/consenter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func setupTestLogging(logLevel string, verbose bool) {
165165

166166
// Taken from orderer/solo/consensus_test.go
167167
func syncQueueMessage(message *cb.Envelope, chain *chainImpl, mockBlockcutter *mockblockcutter.Receiver) {
168-
chain.Enqueue(message)
168+
chain.enqueue(message)
169169
mockBlockcutter.Block <- struct{}{} // We'll move past this line (and the function will return) only when the mock blockcutter is about to return
170170
}
171171

orderer/consensus/solo/consensus.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package solo
1818

1919
import (
20+
"fmt"
2021
"time"
2122

2223
"github.com/hyperledger/fabric/orderer/common/msgprocessor"
@@ -37,7 +38,7 @@ type chain struct {
3738

3839
// New creates a new consenter for the solo consensus scheme.
3940
// The solo consensus scheme is very simple, and allows only one consenter for a given chain (this process).
40-
// It accepts messages being delivered via Enqueue, orders them, and then uses the blockcutter to form the messages
41+
// It accepts messages being delivered via Order/Configure, orders them, and then uses the blockcutter to form the messages
4142
// into blocks before writing to the given ledger
4243
func New() consensus.Consenter {
4344
return &consenter{}
@@ -68,16 +69,22 @@ func (ch *chain) Halt() {
6869
}
6970
}
7071

71-
// Enqueue accepts a message and returns true on acceptance, or false on shutdown
72-
func (ch *chain) Enqueue(env *cb.Envelope) bool {
72+
// Order accepts normal messages for ordering
73+
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
7374
select {
7475
case ch.sendChan <- env:
75-
return true
76+
return nil
7677
case <-ch.exitChan:
77-
return false
78+
return fmt.Errorf("Exiting")
7879
}
7980
}
8081

82+
// Order accepts normal messages for ordering
83+
func (ch *chain) Configure(configUpdate *cb.Envelope, config *cb.Envelope, configSeq uint64) error {
84+
// TODO, handle this specially
85+
return ch.Order(config, configSeq)
86+
}
87+
8188
// Errored only closes on exit
8289
func (ch *chain) Errored() <-chan struct{} {
8390
return ch.exitChan

0 commit comments

Comments
 (0)