Skip to content

Commit

Permalink
lock pool.
Browse files Browse the repository at this point in the history
  • Loading branch information
Hǎiliàng Wáng committed Sep 17, 2015
1 parent 0ab28c7 commit b0a7442
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 115 deletions.
12 changes: 5 additions & 7 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,20 @@ func New(config *Config) *B {
}
}

func (b *B) Connected() bool {
b.mu.Lock()
defer b.mu.Unlock()
return b.conn != nil
func (b *B) Addr() string {
return b.config.Addr
}

func (b *B) Connect() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.conn != nil {
panic("already connected")
return nil
}
var err error
b.conn, err = net.Dial("tcp", b.config.Addr)
if err != nil {
b.closed = true
return err
}
go b.sendLoop()
Expand Down Expand Up @@ -91,12 +90,11 @@ func (b *B) Do(req *proto.Request, resp proto.ResponseMessage) error {

func (b *B) sendJob(job *brokerJob) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
b.mu.Unlock()
return ErrBrokerClosed
}
b.sendChan <- job
b.mu.Unlock()
return nil
}

Expand Down
149 changes: 53 additions & 96 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package client

import (
"errors"
"fmt"
"sync"

"h12.me/kafka/broker"
Expand All @@ -23,26 +22,24 @@ type Config struct {
}

type C struct {
brokers map[int32]*broker.B
topicPartitions map[string][]int32
topicPartitionLeader map[topicPartition]*broker.B
groupCoordinator map[string]*broker.B
config *Config
mu sync.Mutex
}

type topicPartition struct {
topic string
partition int32
config *Config
topics *topicPartitions
pool *brokerPool
mu sync.Mutex
}

func New(config *Config) (*C, error) {
c := &C{
brokers: make(map[int32]*broker.B),
config: config,
topicPartitions: make(map[string][]int32),
topicPartitionLeader: make(map[topicPartition]*broker.B),
groupCoordinator: make(map[string]*broker.B),
config: config,
topics: newTopicPartitions(),
pool: newBrokerPool(func(addr string) *broker.B {
cfg := config.BrokerConfig
cfg.Addr = addr
return broker.New(&cfg)
}),
}
for _, addr := range config.Brokers {
c.pool.AddAddr(addr)
}
return c, nil
}
Expand All @@ -57,62 +54,50 @@ func (c *C) NewRequest(req proto.RequestMessage) *proto.Request {
}

func (c *C) Partitions(topic string) ([]int32, error) {
if partitions, ok := c.topicPartitions[topic]; ok {
partitions := c.topics.getPartitions(topic)
if len(partitions) > 0 {
return partitions, nil
}
if err := c.updateFromTopicMetadata(topic); err != nil {
return nil, err
}
if partitions, ok := c.topicPartitions[topic]; ok {
partitions = c.topics.getPartitions(topic)
if len(partitions) > 0 {
return partitions, nil
}
return nil, ErrTopicNotFound
}

func (c *C) Coordinator(topic, consumerGroup string) (*broker.B, error) {
if coord, ok := c.groupCoordinator[consumerGroup]; ok {
if coord, err := c.pool.GetCoordinator(consumerGroup); err == nil {
return coord, nil
}
if err := c.updateFromConsumerMetadata(topic, consumerGroup); err != nil {
return nil, err
}
if coord, ok := c.groupCoordinator[consumerGroup]; ok {
return coord, nil
}
return nil, ErrCoordNotFound
return c.pool.GetCoordinator(consumerGroup)
}

func (c *C) Leader(topic string, partition int32) (*broker.B, error) {
key := topicPartition{topic, partition}
if leader, ok := c.topicPartitionLeader[key]; ok {
if leader, err := c.pool.GetLeader(topic, partition); err == nil {
return leader, nil
}
if err := c.updateFromTopicMetadata(topic); err != nil {
return nil, err
}
if leader, ok := c.topicPartitionLeader[key]; ok {
return leader, nil
}
return nil, ErrLeaderNotFound
return c.pool.GetLeader(topic, partition)
}

func (c *C) updateFromConsumerMetadata(topic, consumerGroup string) error {
m, err := c.getConsumerMetadata(consumerGroup)
if err != nil {
return err
}
if broker, ok := c.brokers[m.CoordinatorID]; ok {
c.groupCoordinator[consumerGroup] = broker
return nil
}
if err := c.updateFromTopicMetadata(topic); err != nil {
return err
}
if broker, ok := c.brokers[m.CoordinatorID]; ok {
c.groupCoordinator[consumerGroup] = broker
return nil
if m.ErrorCode != 0 {
return ErrCoordNotFound
}
return ErrCoordNotFound
c.pool.SetCoordinator(consumerGroup, m.CoordinatorID, m.CoordinatorHost, m.CoordinatorPort)
return nil
}

func (c *C) updateFromTopicMetadata(topic string) error {
Expand All @@ -122,87 +107,59 @@ func (c *C) updateFromTopicMetadata(topic string) error {
}
for i := range m.Brokers {
b := &m.Brokers[i]
if _, ok := c.brokers[b.NodeID]; !ok {
cfg := c.config.BrokerConfig
cfg.Addr = fmt.Sprintf("%s:%d", b.Host, b.Port)
broker := broker.New(&cfg)
if err := broker.Connect(); err == nil {
c.brokers[b.NodeID] = broker
} else {
// TODO: log
}
}
c.pool.Add(b.NodeID, b.Host, b.Port)
}
for i := range m.TopicMetadatas {
t := &m.TopicMetadatas[i]
if t.TopicName == topic {
partitions := make([]int32, len(t.PartitionMetadatas))
for i := range t.PartitionMetadatas {
partition := t.PartitionMetadatas[i].PartitionID
partitions[i] = partition
if broker, ok := c.brokers[t.PartitionMetadatas[i].Leader]; ok {
c.topicPartitionLeader[topicPartition{topic, partition}] = broker
partition := &t.PartitionMetadatas[i]
partitions[i] = partition.PartitionID
if err := c.pool.SetLeader(topic, partition.PartitionID, partition.Leader); err != nil {
return ErrLeaderNotFound
}
}
c.topicPartitions[topic] = partitions
c.topics.addPartitions(topic, partitions)
return nil
}
}
return ErrTopicNotFound
}

func (c *C) getAnyBroker() (broker *broker.B, needClosing bool, err error) {
for _, b := range c.brokers {
return b, false, nil
}
broker, err = c.getBootstrapBroker()
if err != nil {
return nil, false, err
}
return broker, true, nil
}

func (c *C) getTopicMetadata(topic string) (*proto.TopicMetadataResponse, error) {
broker, needClosing, err := c.getAnyBroker()
if err != nil {
return nil, err
}
if needClosing {
defer broker.Close()
}
var err error
req := c.NewRequest(&proto.TopicMetadataRequest{topic})
resp := &proto.TopicMetadataResponse{}
if err := broker.Do(req, resp); err != nil {
brokers, err := c.pool.Brokers()
if err != nil {
return nil, err
}
return resp, nil
for _, broker := range brokers {
broker.Connect()
err = broker.Do(req, resp)
if err == nil {
return resp, nil
}
}
return nil, err
}

func (c *C) getConsumerMetadata(consumerGroup string) (*proto.ConsumerMetadataResponse, error) {
broker, needClosing, err := c.getAnyBroker()
if err != nil {
return nil, err
}
if needClosing {
defer broker.Close()
}
var err error
creq := proto.ConsumerMetadataRequest(consumerGroup)
req := c.NewRequest(&creq)
resp := proto.ConsumerMetadataResponse{}
if err := broker.Do(req, &resp); err != nil {
resp := &proto.ConsumerMetadataResponse{}
brokers, err := c.pool.Brokers()
if err != nil {
return nil, err
}
return &resp, nil
}

func (c *C) getBootstrapBroker() (*broker.B, error) {
for _, addr := range c.config.Brokers {
cfg := c.config.BrokerConfig
cfg.Addr = addr
broker := broker.New(&cfg)
if err := broker.Connect(); err == nil {
return broker, nil
for _, broker := range brokers {
broker.Connect()
err = broker.Do(req, resp)
if err == nil {
return resp, nil
}
}
return nil, ErrNoBrokerFound
return nil, err
}
Loading

0 comments on commit b0a7442

Please sign in to comment.