Skip to content

Commit

Permalink
Add BroadcastIncompleteBatch test to Kafka orderer
Browse files Browse the repository at this point in the history
Test that a batch is cut even if the batch size requirement is not met.

Change-Id: Ic02cea7bd835a85a833af21cab45626677f00817
Signed-off-by: Kostas Christidis <kostas@christidis.io>
  • Loading branch information
kchristidis committed Oct 29, 2016
1 parent b4473da commit 383f34d
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
55 changes: 53 additions & 2 deletions orderer/kafka/broadcast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,58 @@ func TestBroadcastBatch(t *testing.T) {
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
t.Fatal("Should have received a block by now")
}
}
}

func TestBroadcastIncompleteBatch(t *testing.T) {
if testConf.General.BatchSize <= 1 {
t.Skip("Skipping test as it requires a batchsize > 1")
}

messageCount := int(testConf.General.BatchSize) - 1

disk := make(chan []byte)

mb := mockNewBroadcaster(t, testConf, oldestOffset, disk)
defer testClose(t, mb)

mbs := newMockBroadcastStream(t)
go func() {
if err := mb.Broadcast(mbs); err != nil {
t.Fatal("Broadcast error:", err)
}
}()

<-disk // We tested the checkpoint block in a previous test, so we can ignore it now

// Pump less than batchSize messages into the system
go func() {
for i := 0; i < messageCount; i++ {
mbs.incoming <- &ab.BroadcastMessage{Data: []byte("message " + strconv.Itoa(i))}
}
}()

// Ignore the broadcast replies as they have been tested elsewhere
for i := 0; i < messageCount; i++ {
<-mbs.outgoing
}

for {
select {
case in := <-disk:
block := new(ab.Block)
err := proto.Unmarshal(in, block)
if err != nil {
t.Fatal("Expected a block on the broker's disk")
}
if len(block.Messages) != messageCount {
t.Fatalf("Expected block to have %d messages instead of %d", messageCount, len(block.Messages))
}
return
case <-time.After(testConf.General.BatchTimeout + timePadding):
t.Fatal("Should have received a block by now")
}
}
}
Expand Down Expand Up @@ -177,7 +228,7 @@ func TestBroadcastBatchAndQuitEarly(t *testing.T) {
}
return
case <-time.After(500 * time.Millisecond):
t.Fatal("Should have received the initialization block by now")
t.Fatal("Should have received a block by now")
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions orderer/kafka/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ var (
oldestOffset = int64(100) // The oldest block available on the broker
newestOffset = int64(1100) // The offset that will be assigned to the next block
middleOffset = (oldestOffset + newestOffset - 1) / 2 // Just an offset in the middle

// Amount of time to wait for block processing when doing time-based tests
// We generally want this value to be as small as possible so as to make tests execute faster
// But this may have to be bumped up in slower machines
timePadding = 200 * time.Millisecond
)

var testConf = &config.TopLevel{
Expand Down

0 comments on commit 383f34d

Please sign in to comment.