@@ -18,6 +18,7 @@ import (
18
18
"github.com/coreos/etcd/raft"
19
19
"github.com/coreos/etcd/raft/raftpb"
20
20
"github.com/golang/protobuf/proto"
21
+ "github.com/hyperledger/fabric/common/configtx"
21
22
"github.com/hyperledger/fabric/common/flogging"
22
23
"github.com/hyperledger/fabric/orderer/common/cluster"
23
24
"github.com/hyperledger/fabric/orderer/consensus"
@@ -129,8 +130,53 @@ func (c *Chain) Order(env *common.Envelope, configSeq uint64) error {
129
130
130
131
// Configure submits config type transactions for ordering.
131
132
func (c * Chain ) Configure (env * common.Envelope , configSeq uint64 ) error {
132
- c .logger .Panicf ("Configure not implemented yet" )
133
- return nil
133
+ if err := c .checkConfigUpdateValidity (env ); err != nil {
134
+ return err
135
+ }
136
+ return c .Submit (& orderer.SubmitRequest {LastValidationSeq : configSeq , Content : env }, 0 )
137
+ }
138
+
139
+ // validate the config update for being of Type A or Type B as described in the design doc.
140
+ func (c * Chain ) checkConfigUpdateValidity (ctx * common.Envelope ) error {
141
+ var err error
142
+ payload , err := utils .UnmarshalPayload (ctx .Payload )
143
+ if err != nil {
144
+ return err
145
+ }
146
+ chdr , err := utils .UnmarshalChannelHeader (payload .Header .ChannelHeader )
147
+ if err != nil {
148
+ return err
149
+ }
150
+
151
+ switch chdr .Type {
152
+ case int32 (common .HeaderType_ORDERER_TRANSACTION ):
153
+ return errors .Errorf ("channel creation requests not supported yet" )
154
+ case int32 (common .HeaderType_CONFIG ):
155
+ configEnv , err := configtx .UnmarshalConfigEnvelope (payload .Data )
156
+ if err != nil {
157
+ return err
158
+ }
159
+ configUpdateEnv , err := utils .EnvelopeToConfigUpdate (configEnv .LastUpdate )
160
+ if err != nil {
161
+ return err
162
+ }
163
+ configUpdate , err := configtx .UnmarshalConfigUpdate (configUpdateEnv .ConfigUpdate )
164
+ if err != nil {
165
+ return err
166
+ }
167
+
168
+ // ignoring the read set for now
169
+ // check only if the ConsensusType is updated in the write set
170
+ if ordererConfigGroup , ok := configUpdate .WriteSet .Groups ["Orderer" ]; ok {
171
+ if _ , ok := ordererConfigGroup .Values ["ConsensusType" ]; ok {
172
+ return errors .Errorf ("updates to ConsensusType not supported currently" )
173
+ }
174
+ }
175
+ return nil
176
+
177
+ default :
178
+ return errors .Errorf ("config transaction has unknown header type" )
179
+ }
134
180
}
135
181
136
182
// WaitReady is currently a no-op.
@@ -209,29 +255,19 @@ func (c *Chain) serveRequest() {
209
255
}
210
256
211
257
for {
212
- seq := c .support .Sequence ()
213
258
214
259
select {
215
260
case msg := <- c .submitC :
216
- if c .isConfig (msg .Content ) {
217
- c .logger .Panicf ("Processing config envelope is not implemented yet" )
218
- }
219
-
220
- if msg .LastValidationSeq < seq {
221
- if _ , err := c .support .ProcessNormalMsg (msg .Content ); err != nil {
222
- c .logger .Warningf ("Discarding bad normal message: %s" , err )
223
- continue
224
- }
261
+ batches , pending , err := c .ordered (msg )
262
+ if err != nil {
263
+ c .logger .Errorf ("Failed to order message: %s" , err )
225
264
}
226
-
227
- batches , _ := c .support .BlockCutter ().Ordered (msg .Content )
228
- if len (batches ) == 0 {
229
- start ()
230
- continue
265
+ if pending {
266
+ start () // no-op if timer is already started
267
+ } else {
268
+ stop ()
231
269
}
232
270
233
- stop ()
234
-
235
271
if err := c .commitBatches (batches ... ); err != nil {
236
272
c .logger .Errorf ("Failed to commit block: %s" , err )
237
273
}
@@ -257,6 +293,42 @@ func (c *Chain) serveRequest() {
257
293
}
258
294
}
259
295
296
+ // Orders the envelope in the `msg` content. SubmitRequest.
297
+ // Returns
298
+ // -- batches [][]*common.Envelope; the batches cut,
299
+ // -- pending bool; if there are envelopes pending to be ordered,
300
+ // -- err error; the error encountered, if any.
301
+ // It takes care of config messages as well as the revalidation of messages if the config sequence has advanced.
302
+ func (c * Chain ) ordered (msg * orderer.SubmitRequest ) (batches [][]* common.Envelope , pending bool , err error ) {
303
+ seq := c .support .Sequence ()
304
+
305
+ if c .isConfig (msg .Content ) {
306
+ // ConfigMsg
307
+ if msg .LastValidationSeq < seq {
308
+ msg .Content , _ , err = c .support .ProcessConfigMsg (msg .Content )
309
+ if err != nil {
310
+ return nil , true , errors .Errorf ("bad config message: %s" , err )
311
+ }
312
+ }
313
+ batch := c .support .BlockCutter ().Cut ()
314
+ batches = [][]* common.Envelope {}
315
+ if len (batch ) != 0 {
316
+ batches = append (batches , batch )
317
+ }
318
+ batches = append (batches , []* common.Envelope {msg .Content })
319
+ return batches , false , nil
320
+ }
321
+ // it is a normal message
322
+ if msg .LastValidationSeq < seq {
323
+ if _ , err := c .support .ProcessNormalMsg (msg .Content ); err != nil {
324
+ return nil , true , errors .Errorf ("bad normal message: %s" , err )
325
+ }
326
+ }
327
+ batches , pending = c .support .BlockCutter ().Ordered (msg .Content )
328
+ return batches , pending , nil
329
+
330
+ }
331
+
260
332
func (c * Chain ) commitBatches (batches ... []* common.Envelope ) error {
261
333
for _ , batch := range batches {
262
334
b := c .support .CreateNextBlock (batch )
@@ -268,9 +340,10 @@ func (c *Chain) commitBatches(batches ...[]*common.Envelope) error {
268
340
select {
269
341
case block := <- c .commitC :
270
342
if utils .IsConfigBlock (block ) {
271
- c .logger .Panicf ("Config block is not supported yet" )
343
+ c .support .WriteConfigBlock (block , nil )
344
+ } else {
345
+ c .support .WriteBlock (block , nil )
272
346
}
273
- c .support .WriteBlock (block , nil )
274
347
275
348
case <- c .doneC :
276
349
return nil
0 commit comments