Skip to content

Commit

Permalink
refactor & add offset retention.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hǎiliàng Wáng committed Sep 17, 2015
1 parent 41a830d commit 58f6fac
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 49 deletions.
8 changes: 6 additions & 2 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ Design principle
* Stateless
* fail fast
- timeout (SetDealine)
- release resources
* fault tolerance:
- automatic handling of broken connections
- leader down
- try again for broken connection (maxBadConnRetries=2)
- recover without restart
- broken connections
- leader down
- retry
- partition expand
- graceful shutdown
7 changes: 3 additions & 4 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ var (

type Config struct {
Addr string
SendQueueLen int
RecvQueueLen int
Timeout time.Duration
}
Expand All @@ -42,7 +41,7 @@ func New(config *Config) (*B, error) {
b := &B{
config: config,
conn: conn,
sendChan: make(chan *brokerJob, config.SendQueueLen),
sendChan: make(chan *brokerJob),
recvChan: make(chan *brokerJob, config.RecvQueueLen),
}
go b.sendLoop()
Expand All @@ -54,12 +53,12 @@ func (b *B) Close() {
b.conn.Close()
}

func (b *B) Do(req *proto.Request, resp *proto.Response) error {
func (b *B) Do(req *proto.Request, resp proto.ResponseMessage) error {
req.CorrelationID = atomic.AddInt32(&b.cid, 1)
errChan := make(chan error)
b.sendChan <- &brokerJob{
req: req,
resp: resp,
resp: &proto.Response{ResponseMessage: resp},
errChan: errChan,
}
return <-errChan
Expand Down
22 changes: 7 additions & 15 deletions broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ const (
func TestMeta(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand All @@ -32,19 +31,16 @@ func TestMeta(t *testing.T) {
"test",
},
}
resp := &proto.Response{
ResponseMessage: &proto.TopicMetadataResponse{},
}
resp := &proto.TopicMetadataResponse{}
if err := broker.Do(req, resp); err != nil {
t.Fatal(err)
}
fmt.Println(toJSON(resp.ResponseMessage))
fmt.Println(toJSON(resp))
}

func TestConsumeAll(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand Down Expand Up @@ -75,7 +71,7 @@ func TestConsumeAll(t *testing.T) {
},
}
resp := proto.FetchResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
t.Fatal(err)
}
fmt.Println(toJSON(resp))
Expand All @@ -91,7 +87,6 @@ func TestConsumeAll(t *testing.T) {
func TestOffsetCommit(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand Down Expand Up @@ -124,7 +119,7 @@ func TestOffsetCommit(t *testing.T) {
},
}
resp := proto.OffsetCommitResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
t.Fatal(t)
}
fmt.Println(toJSON(resp))
Expand All @@ -133,7 +128,6 @@ func TestOffsetCommit(t *testing.T) {
func TestOffsetFetch(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand All @@ -156,7 +150,7 @@ func TestOffsetFetch(t *testing.T) {
},
}
resp := proto.OffsetFetchResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
t.Fatal(err)
}
fmt.Println(toJSON(resp))
Expand All @@ -165,7 +159,6 @@ func TestOffsetFetch(t *testing.T) {
func TestConsumerMeta(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand All @@ -181,7 +174,7 @@ func TestConsumerMeta(t *testing.T) {
RequestMessage: &creq,
}
resp := proto.ConsumerMetadataResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
t.Fatal(err)
}
fmt.Println(toJSON(resp))
Expand All @@ -190,7 +183,6 @@ func TestConsumerMeta(t *testing.T) {
func TestProduce(t *testing.T) {
broker, err := New(&Config{
Addr: kafkaAddr,
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
})
Expand Down Expand Up @@ -228,7 +220,7 @@ func TestProduce(t *testing.T) {
},
}
resp := proto.ProduceResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
t.Fatal(err)
}
fmt.Println(toJSON(resp))
Expand Down
18 changes: 4 additions & 14 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,14 +170,9 @@ func (c *C) getTopicMetadata(topic string) (*proto.TopicMetadataResponse, error)
if needClosing {
defer broker.Close()
}
req := &proto.Request{
APIKey: proto.TopicMetadataRequestType,
APIVersion: 0,
ClientID: c.config.ClientID,
RequestMessage: &proto.TopicMetadataRequest{topic},
}
req := c.NewRequest(&proto.TopicMetadataRequest{topic})
resp := &proto.TopicMetadataResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: resp}); err != nil {
if err := broker.Do(req, resp); err != nil {
return nil, err
}
return resp, nil
Expand All @@ -192,14 +187,9 @@ func (c *C) getConsumerMetadata(consumerGroup string) (*proto.ConsumerMetadataRe
defer broker.Close()
}
creq := proto.ConsumerMetadataRequest(consumerGroup)
req := &proto.Request{
APIKey: proto.ConsumerMetadataRequestType,
APIVersion: 0,
ClientID: c.config.ClientID,
RequestMessage: &creq,
}
req := c.NewRequest(&creq)
resp := proto.ConsumerMetadataResponse{}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
return nil, err
}
return &resp, nil
Expand Down
17 changes: 9 additions & 8 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ var (
)

type Config struct {
Client client.Config
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Client client.Config
MaxWaitTime int32
MinBytes int32
MaxBytes int32
OffsetRetention time.Duration
}

type C struct {
Expand Down Expand Up @@ -52,7 +53,7 @@ func (c *C) Offset(topic string, partition int32, consumerGroup string) (int64,
if err != nil {
return 0, err
}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
return 0, err
}
for i := range resp {
Expand Down Expand Up @@ -93,7 +94,7 @@ func (c *C) Consume(topic string, partition int32, offset int64) (values [][]byt
if err != nil {
return nil, err
}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
return nil, err
}
for i := range resp {
Expand Down Expand Up @@ -133,7 +134,7 @@ func (c *C) Commit(topic string, partition int32, consumerGroup string, offset i
{
Partition: partition,
Offset: offset,
TimeStamp: time.Now().Unix(),
TimeStamp: time.Now().Add(c.config.OffsetRetention).Unix(),
},
},
},
Expand All @@ -144,7 +145,7 @@ func (c *C) Commit(topic string, partition int32, consumerGroup string, offset i
if err != nil {
return err
}
if err := broker.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := broker.Do(req, &resp); err != nil {
return err
}
for i := range resp {
Expand Down
8 changes: 4 additions & 4 deletions consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestGetOffset(t *testing.T) {
if err != nil {
t.Fatal(err)
}
fmt.Println("offset: ", offset)
fmt.Println("get offset: ", offset)
}

func TestConsumeAll(t *testing.T) {
Expand Down Expand Up @@ -50,14 +50,14 @@ func getConsumer(t *testing.T) *C {
"docker:32793",
},
BrokerConfig: broker.Config{
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
},
ClientID: "abc",
},
MinBytes: 0,
MaxBytes: 9999,
MinBytes: 0,
MaxBytes: 9999,
OffsetRetention: 7 * 24 * time.Hour,
})
if err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (p *P) Produce(topic string, key, value []byte) error {
},
})
resp := proto.ProduceResponse{}
if err := leader.Do(req, &proto.Response{ResponseMessage: &resp}); err != nil {
if err := leader.Do(req, &resp); err != nil {
return err
}
for i := range resp {
Expand Down
1 change: 0 additions & 1 deletion producer/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ func TestProducer(t *testing.T) {
"docker:32793",
},
BrokerConfig: broker.Config{
SendQueueLen: 10,
RecvQueueLen: 10,
Timeout: time.Second,
},
Expand Down

0 comments on commit 58f6fac

Please sign in to comment.