Skip to content

Commit

Permalink
generate an good test case for snappy produce request
Browse files Browse the repository at this point in the history
  • Loading branch information
Hǎiliàng Wáng committed Jan 15, 2016
1 parent b019d65 commit 2748827
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
68 changes: 45 additions & 23 deletions broker/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"h12.me/realtest/kafka"
"h12.me/wipro"
//"h12.me/wipro"
)

func TestTopicMetadata(t *testing.T) {
Expand Down Expand Up @@ -70,43 +70,65 @@ func TestProduceSnappy(t *testing.T) {
}
partitionCount := 2
partition := int32(1)
topic, err := k.NewRandomTopic(partitionCount)
topic := "topic1"
err = k.NewTopic(topic, partitionCount)
if err != nil {
t.Fatal(err)
}
defer k.DeleteTopic(topic)
leaderAddr := getLeader(t, k, topic, partition)
b := New(DefaultConfig().WithAddr(leaderAddr))
defer b.Close()
var w wipro.Writer
ms := MessageSet{
{SizedMessage: SizedMessage{CRCMessage: CRCMessage{Message: Message{
Attributes: 0,
Key: nil,
Value: []byte("hello"),
}}}},
}
ms.Marshal(&w)
compressedValue := encodeSnappy(w.B)
fmt.Println(w.B)
resp, err := b.Produce(topic, partition, MessageSet{
{
SizedMessage: SizedMessage{CRCMessage: CRCMessage{Message: Message{
Attributes: 2,

conn, err := net.Dial("tcp", leaderAddr)
if err != nil {
t.Fatal(err)
}
_, err = conn.Write([]byte{0x00, 0x00, 0x00, 0xab, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0x00, 0x03, 0x61, 0x62, 0x63, 0x00, 0x01, 0x00, 0x00, 0x03, 0xe8, 0x00, 0x00, 0x00, 0x01, 0x00, 0x06, 0x74, 0x6f, 0x70, 0x69, 0x63, 0x31, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2d, 0x34, 0x5c, 0xc8, 0x81, 0x00, 0x02, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x1f, 0x24, 0x00, 0x00, 0x19, 0x01, 0x60, 0x18, 0x8a, 0x13, 0x5f, 0x20, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x0a, 0x53, 0x6e, 0x61, 0x70, 0x70, 0x79, 0x20, 0x31, 0x20, 0x30, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x3b, 0x0c, 0x6f, 0xa9, 0x33, 0x00, 0x02, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x2d, 0x48, 0x00, 0x00, 0x19, 0x01, 0x60, 0x18, 0x88, 0x55, 0xe1, 0x79, 0x00, 0x00, 0xff, 0xff, 0xff, 0xff, 0x00, 0x00, 0x00, 0x0a, 0x53, 0x6e, 0x61, 0x70, 0x70, 0x79, 0x20, 0x32, 0x20, 0x30, 0x19, 0x23, 0x14, 0x00, 0x18, 0xff, 0x52, 0xd1, 0xef, 0x4a, 0x24, 0x00, 0x00, 0x31})
if err != nil {
t.Fatal(err)
}

//b := New(DefaultConfig().WithAddr(leaderAddr))
//defer b.Close()
/*
var w wipro.Writer
ms := MessageSet{
{SizedMessage: SizedMessage{CRCMessage: CRCMessage{Message: Message{
Attributes: 0,
Key: nil,
Value: compressedValue,
Value: []byte("hello"),
}}}},
})
if err != nil {
}
ms.Marshal(&w)
compressedValue := encodeSnappy(w.B)
fmt.Println(w.B)
resp, err := b.Produce(topic, partition, MessageSet{
{
SizedMessage: SizedMessage{CRCMessage: CRCMessage{Message: Message{
Attributes: 2,
Key: nil,
Value: compressedValue,
}}}},
})
*/
resp := &ProduceResponse{}
r := &Response{
ResponseMessage: resp,
}
if err := r.Receive(conn); err != nil {
t.Fatal(err)
}
if len(*resp) != 1 || len((*resp)[0].OffsetInPartitions) != 1 {
t.Fatal("expect 1 resp")
}

for _, topic := range *resp {
for _, p := range topic.OffsetInPartitions {
if p.ErrorCode.HasError() {
t.Fatal(p.ErrorCode)
}
}
}
t.Log(*resp)
}

func TestGroupCoordinator(t *testing.T) {
Expand Down Expand Up @@ -259,7 +281,7 @@ func sendReceive(t *testing.T, conn net.Conn, req *Request, resp *Response) {
t.Fatal(t)
}
if err := resp.Receive(conn); err != nil {
t.Fatal(resp)
t.Fatal(err)
}
if resp.CorrelationID != req.CorrelationID {
t.Fatalf("correlation id: expect %d but got %d", req.CorrelationID, resp.CorrelationID)
Expand Down
5 changes: 3 additions & 2 deletions spec/python/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,6 @@
produce = ProduceRequestPayload("topic1", 1, messages=messages)

req = KafkaProtocol.encode_produce_request("abc", 2, payloads=[produce], acks=1, timeout=1000)
print len(req)
print "[]byte{" +''.join( [ "0x%02x, " % ord( x ) for x in req ] ).strip() + "}"
print req
#print len(req)
#print "[]byte{" +''.join( [ "0x%02x, " % ord( x ) for x in req ] ).strip() + "}"

0 comments on commit 2748827

Please sign in to comment.