Skip to content

Commit

Permalink
[FAB-4438] Fix race condition in mock WriteBlock
Browse files Browse the repository at this point in the history
WriteBlockVal could get overwritten before the listener on the Batches
channel had a chance to retrieve it. This changeset fixes this by
replacing the Batches channel with a Blocks channel.

Change-Id: Idc8899a4317a1c2dc7f5b6affae1c42f24095117
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Jun 8, 2017
1 parent 2590cce commit 1a721b1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 42 deletions.
30 changes: 14 additions & 16 deletions orderer/kafka/chain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func TestProcessLoopRegularError(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -477,7 +477,7 @@ func TestProcessLoopRegularQueueEnvelope(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestProcessLoopRegularCutBlock(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand All @@ -546,7 +546,7 @@ func TestProcessLoopRegularCutBlock(t *testing.T) {
go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good
mockSupport.BlockCutterVal.Block <- struct{}{} // Let the `mockblockcutter.Ordered` call return
logger.Debugf("Mock blockcutter's Ordered call has returned")
<-mockSupport.Batches // Let the `mockConsenterSupport.WriteBlock` proceed
<-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed
logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once
close(exitChan) // Identical to chain.Halt()
logger.Debug("exitChan closed")
Expand Down Expand Up @@ -590,7 +590,7 @@ func TestProcessLoopRegularCutTwoBlocks(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand All @@ -614,15 +614,13 @@ func TestProcessLoopRegularCutTwoBlocks(t *testing.T) {
logger.Debugf("Mock blockcutter's Ordered call has returned for the second time")

select {
case <-mockSupport.Batches: // Let the `mockConsenterSupport.WriteBlock` proceed
block1 = mockSupport.WriteBlockVal
case block1 = <-mockSupport.Blocks: // Let the `mockConsenterSupport.WriteBlock` proceed
case <-time.After(hitBranch):
logger.Fatalf("Did not receive a block from the blockcutter as expected")
}

select {
case <-mockSupport.Batches:
block2 = mockSupport.WriteBlockVal
case block2 = <-mockSupport.Blocks:
case <-time.After(hitBranch):
logger.Fatalf("Did not receive a block from the blockcutter as expected")
}
Expand Down Expand Up @@ -677,7 +675,7 @@ func TestProcessLoopRegularAndSendTimeToCutRegular(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -746,7 +744,7 @@ func TestProcessLoopRegularAndSendTimeToCutError(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -801,7 +799,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageRegular(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand All @@ -818,7 +816,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageRegular(t *testing.T) {
mockSupport.BlockCutterVal.Ordered(newMockEnvelope("fooMessage"))

go func() { // Note: Unlike the CONNECT test case, the following does NOT introduce a race condition, so we're good
<-mockSupport.Batches // Let the `mockConsenterSupport.WriteBlock` proceed
<-mockSupport.Blocks // Let the `mockConsenterSupport.WriteBlock` proceed
logger.Debug("Closing exitChan to exit the infinite for loop") // We are guaranteed to hit the exitChan branch after hitting the REGULAR branch at least once
close(exitChan) // Identical to chain.Halt()
logger.Debug("exitChan closed")
Expand Down Expand Up @@ -858,7 +856,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageZeroBatch(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -902,7 +900,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageLargerThanExpected(t *testing.T)
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down Expand Up @@ -948,7 +946,7 @@ func TestProcessLoopTimeToCutFromReceivedMessageStale(t *testing.T) {
assert.NoError(t, err, "Expected no error when setting up the mock partition consumer")

mockSupport := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope), // WriteBlock will post here
Blocks: make(chan *cb.Block), // WriteBlock will post here
BlockCutterVal: mockblockcutter.NewReceiver(),
ChainIDVal: mockChannel.topic(),
HeightVal: lastCutBlockNumber, // Incremented during the WriteBlock call
Expand Down
19 changes: 5 additions & 14 deletions orderer/mocks/multichain/multichain.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ type ConsenterSupport struct {
// BlockCutterVal is the value returned by BlockCutter()
BlockCutterVal *mockblockcutter.Receiver

// Batches is the channel which WriteBlock writes data to
Batches chan []*cb.Envelope
// Blocks is the channel where WriteBlock writes the most recently created block
Blocks chan *cb.Block

// ChainIDVal is the value returned by ChainID()
ChainIDVal string
Expand All @@ -50,9 +50,6 @@ type ConsenterSupport struct {

// NextBlockVal stores the block created by the most recent CreateNextBlock() call
NextBlockVal *cb.Block

// WriteBlockVal stores the block created by the most recent WriteBlock() call
WriteBlockVal *cb.Block
}

// BlockCutter returns BlockCutterVal
Expand All @@ -77,20 +74,14 @@ func (mcs *ConsenterSupport) CreateNextBlock(data []*cb.Envelope) *cb.Block {
return block
}

// WriteBlock writes data to the Batches channel
// WriteBlock writes data to the Blocks channel
// Note that _committers is ignored by this mock implementation
func (mcs *ConsenterSupport) WriteBlock(block *cb.Block, _committers []filter.Committer, encodedMetadataValue []byte) *cb.Block {
logger.Debugf("mockWriter: attempting to write batch")
umtxs := make([]*cb.Envelope, len(block.Data.Data))
for i := range block.Data.Data {
umtxs[i] = utils.UnmarshalEnvelopeOrPanic(block.Data.Data[i])
}
mcs.HeightVal++
if encodedMetadataValue != nil {
block.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
mcs.WriteBlockVal = block
mcs.Batches <- umtxs
mcs.HeightVal++
mcs.Blocks <- block
return block
}

Expand Down
24 changes: 12 additions & 12 deletions orderer/solo/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func goWithWait(target func()) *waitableGo {
func TestEmptyBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
Blocks: make(chan *cb.Block),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
}
Expand All @@ -69,7 +69,7 @@ func TestEmptyBatch(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)
bs.Halt()
select {
case <-support.Batches:
case <-support.Blocks:
t.Fatalf("Expected no invocations of Append")
case <-wg.done:
}
Expand All @@ -78,7 +78,7 @@ func TestEmptyBatch(t *testing.T) {
func TestBatchTimer(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1ms")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
Blocks: make(chan *cb.Block),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
}
Expand All @@ -90,21 +90,21 @@ func TestBatchTimer(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)

select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Expected a block to be cut because of batch timer expiration but did not")
}

syncQueueMessage(testMessage, bs, support.BlockCutterVal)
select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Did not create the second batch, indicating that the timer was not appopriately reset")
}

bs.Halt()
select {
case <-support.Batches:
case <-support.Blocks:
t.Fatalf("Expected no invocations of Append")
case <-wg.done:
}
Expand All @@ -113,7 +113,7 @@ func TestBatchTimer(t *testing.T) {
func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1h")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
Blocks: make(chan *cb.Block),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
}
Expand All @@ -128,7 +128,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)

select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Expected a block to be cut because the batch was filled, but did not")
}
Expand All @@ -140,7 +140,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)

select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Did not create the second batch, indicating that the old timer was still running")
}
Expand All @@ -156,7 +156,7 @@ func TestBatchTimerHaltOnFilledBatch(t *testing.T) {
func TestConfigStyleMultiBatch(t *testing.T) {
batchTimeout, _ := time.ParseDuration("1h")
support := &mockmultichain.ConsenterSupport{
Batches: make(chan []*cb.Envelope),
Blocks: make(chan *cb.Block),
BlockCutterVal: mockblockcutter.NewReceiver(),
SharedConfigVal: &mockconfig.Orderer{BatchTimeoutVal: batchTimeout},
}
Expand All @@ -170,13 +170,13 @@ func TestConfigStyleMultiBatch(t *testing.T) {
syncQueueMessage(testMessage, bs, support.BlockCutterVal)

select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Expected two blocks to be cut but never got the first")
}

select {
case <-support.Batches:
case <-support.Blocks:
case <-time.After(time.Second):
t.Fatalf("Expected the config type tx to create two blocks, but only go the first")
}
Expand Down

0 comments on commit 1a721b1

Please sign in to comment.