Skip to content

Commit

Permalink
rename from common to model
Browse files Browse the repository at this point in the history
  • Loading branch information
Hǎiliàng Wáng committed Jan 29, 2016
1 parent 3aa54df commit 0471839
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 64 deletions.
10 changes: 5 additions & 5 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync/atomic"
"time"

"h12.me/kafka/common"
"h12.me/kafka/model"
)

var (
Expand All @@ -25,20 +25,20 @@ type B struct {
}

type brokerJob struct {
req common.Request
resp common.Response
req model.Request
resp model.Response
errChan chan error
}

func New(addr string) common.Broker {
func New(addr string) model.Broker {
return &B{
Addr: addr,
Timeout: 30 * time.Second,
QueueLen: 1000,
}
}

func (b *B) Do(req common.Request, resp common.Response) error {
func (b *B) Do(req model.Request, resp model.Response) error {
req.SetID(atomic.AddInt32(&b.cid, 1))
errChan := make(chan error)
if err := b.sendJob(&brokerJob{
Expand Down
10 changes: 5 additions & 5 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"
"sync"

"h12.me/kafka/common"
"h12.me/kafka/log"
"h12.me/kafka/model"
"h12.me/kafka/proto"
)

Expand All @@ -22,10 +22,10 @@ type (
pool *brokerPool
mu sync.Mutex
}
NewBrokerFunc func(addr string) common.Broker
NewBrokerFunc func(addr string) model.Broker
)

func New(newBroker NewBrokerFunc, brokers []string) common.Cluster {
func New(newBroker NewBrokerFunc, brokers []string) model.Cluster {
c := &C{
topics: newTopicPartitions(),
pool: newBrokerPool(newBroker),
Expand All @@ -51,7 +51,7 @@ func (c *C) Partitions(topic string) ([]int32, error) {
return nil, fmt.Errorf("topic %s not found", topic)
}

func (c *C) Coordinator(group string) (common.Broker, error) {
func (c *C) Coordinator(group string) (model.Broker, error) {
if coord, err := c.pool.GetCoordinator(group); err == nil {
return coord, nil
}
Expand All @@ -61,7 +61,7 @@ func (c *C) Coordinator(group string) (common.Broker, error) {
return c.pool.GetCoordinator(group)
}

func (c *C) Leader(topic string, partition int32) (common.Broker, error) {
func (c *C) Leader(topic string, partition int32) (model.Broker, error) {
if leader, err := c.pool.GetLeader(topic, partition); err == nil {
return leader, nil
}
Expand Down
34 changes: 17 additions & 17 deletions cluster/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package cluster
import (
"sync"

"h12.me/kafka/common"
"h12.me/kafka/model"
"h12.me/kafka/log"
)

type brokerPool struct {
addrBroker map[string]common.Broker
addrBroker map[string]model.Broker
idAddr map[int32]string
topicPartitionLeader map[topicPartition]common.Broker
groupCoordinator map[string]common.Broker
newBroker func(string) common.Broker
topicPartitionLeader map[topicPartition]model.Broker
groupCoordinator map[string]model.Broker
newBroker func(string) model.Broker
mu sync.Mutex
}

Expand All @@ -21,30 +21,30 @@ type topicPartition struct {
partition int32
}

func newBrokerPool(newBroker func(string) common.Broker) *brokerPool {
func newBrokerPool(newBroker func(string) model.Broker) *brokerPool {
return &brokerPool{
addrBroker: make(map[string]common.Broker),
addrBroker: make(map[string]model.Broker),
idAddr: make(map[int32]string),
topicPartitionLeader: make(map[topicPartition]common.Broker),
groupCoordinator: make(map[string]common.Broker),
topicPartitionLeader: make(map[topicPartition]model.Broker),
groupCoordinator: make(map[string]model.Broker),
newBroker: newBroker,
}
}

func (p *brokerPool) Brokers() (map[string]common.Broker, error) {
func (p *brokerPool) Brokers() (map[string]model.Broker, error) {
if len(p.addrBroker) > 0 {
return p.addrBroker, nil
}
return nil, ErrNoBrokerFound
}

func (p *brokerPool) AddAddr(addr string) common.Broker {
func (p *brokerPool) AddAddr(addr string) model.Broker {
p.mu.Lock()
defer p.mu.Unlock()
return p.addAddr(addr)
}

func (p *brokerPool) addAddr(addr string) common.Broker {
func (p *brokerPool) addAddr(addr string) model.Broker {
if broker, ok := p.addrBroker[addr]; ok {
return broker
}
Expand All @@ -53,12 +53,12 @@ func (p *brokerPool) addAddr(addr string) common.Broker {
return broker
}

func (p *brokerPool) add(brokerID int32, addr string) common.Broker {
func (p *brokerPool) add(brokerID int32, addr string) model.Broker {
p.idAddr[brokerID] = addr
return p.addAddr(addr)
}

func (p *brokerPool) Add(brokerID int32, addr string) common.Broker {
func (p *brokerPool) Add(brokerID int32, addr string) model.Broker {
p.mu.Lock()
defer p.mu.Unlock()
return p.add(brokerID, addr)
Expand All @@ -75,7 +75,7 @@ func (p *brokerPool) SetLeader(topic string, partition int32, brokerID int32) er
return nil
}

func (p *brokerPool) GetLeader(topic string, partition int32) (common.Broker, error) {
func (p *brokerPool) GetLeader(topic string, partition int32) (model.Broker, error) {
p.mu.Lock()
defer p.mu.Unlock()
broker, ok := p.topicPartitionLeader[topicPartition{topic, partition}]
Expand Down Expand Up @@ -104,7 +104,7 @@ func (p *brokerPool) SetCoordinator(consumerGroup string, brokerID int32, addr s
p.groupCoordinator[consumerGroup] = broker
}

func (p *brokerPool) GetCoordinator(consumerGroup string) (common.Broker, error) {
func (p *brokerPool) GetCoordinator(consumerGroup string) (model.Broker, error) {
p.mu.Lock()
defer p.mu.Unlock()
broker, ok := p.groupCoordinator[consumerGroup]
Expand All @@ -114,7 +114,7 @@ func (p *brokerPool) GetCoordinator(consumerGroup string) (common.Broker, error)
return broker, nil
}

func (p *brokerPool) find(brokerID int32) (common.Broker, error) {
func (p *brokerPool) find(brokerID int32) (model.Broker, error) {
if addr, ok := p.idAddr[brokerID]; ok {
if broker, ok := p.addrBroker[addr]; ok {
return broker, nil
Expand Down
6 changes: 3 additions & 3 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"errors"
"time"

"h12.me/kafka/common"
"h12.me/kafka/model"
"h12.me/kafka/proto"
)

Expand All @@ -24,10 +24,10 @@ type C struct {
MinBytes int
MaxBytes int
OffsetRetention time.Duration
cluster common.Cluster
cluster model.Cluster
}

func New(cluster common.Cluster) *C {
func New(cluster model.Cluster) *C {
return &C{
cluster: cluster,
MaxWaitTime: 100 * time.Millisecond,
Expand Down
2 changes: 1 addition & 1 deletion common/type.go → model/type.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package common
package model

import (
"io"
Expand Down
6 changes: 3 additions & 3 deletions producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"fmt"
"time"

"h12.me/kafka/common"
"h12.me/kafka/model"
"h12.me/kafka/proto"
)

Expand All @@ -18,11 +18,11 @@ type P struct {
LeaderRecoveryTime time.Duration
RequiredAcks int16
AckTimeout time.Duration
cluster common.Cluster
cluster model.Cluster
topicPartitioner *topicPartitioner
}

func New(cluster common.Cluster) *P {
func New(cluster model.Cluster) *P {
return &P{
cluster: cluster,
topicPartitioner: newTopicPartitioner(),
Expand Down
28 changes: 14 additions & 14 deletions proto/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"h12.me/kafka/common"
"h12.me/kafka/model"
)

var (
Expand All @@ -14,7 +14,7 @@ var (

type client struct {
id string
doer common.Broker
doer model.Broker
}

func (r *Response) ID() int32 { return r.CorrelationID }
Expand All @@ -37,7 +37,7 @@ const clientID = "h12.me/kafka"

type Metadata string

func (m Metadata) Fetch(b common.Broker) (*TopicMetadataResponse, error) {
func (m Metadata) Fetch(b model.Broker) (*TopicMetadataResponse, error) {
topic := string(m)
req := TopicMetadataRequest([]string{topic})
resp := TopicMetadataResponse{}
Expand All @@ -63,7 +63,7 @@ func (m Metadata) Fetch(b common.Broker) (*TopicMetadataResponse, error) {

type GroupCoordinator string

func (group GroupCoordinator) Fetch(b common.Broker) (*Broker, error) {
func (group GroupCoordinator) Fetch(b model.Broker) (*Broker, error) {
req := GroupCoordinatorRequest(group)
resp := GroupCoordinatorResponse{}
if err := (client{clientID, b}).Do(&req, &resp); err != nil {
Expand All @@ -83,7 +83,7 @@ type Payload struct {
AckTimeout time.Duration
}

func (p *Payload) Produce(c common.Cluster) error {
func (p *Payload) Produce(c model.Cluster) error {
leader, err := c.Leader(p.Topic, p.Partition)
if err != nil {
return err
Expand All @@ -97,7 +97,7 @@ func (p *Payload) Produce(c common.Cluster) error {
return nil
}

func (p *Payload) DoProduce(b common.Broker) error {
func (p *Payload) DoProduce(b model.Broker) error {
req := ProduceRequest{
RequiredAcks: p.RequiredAcks,
Timeout: int32(p.AckTimeout / time.Millisecond),
Expand Down Expand Up @@ -146,7 +146,7 @@ type Messages struct {
MaxWaitTime time.Duration
}

func (m *Messages) Consume(c common.Cluster) (MessageSet, error) {
func (m *Messages) Consume(c model.Cluster) (MessageSet, error) {
leader, err := c.Leader(m.Topic, m.Partition)
if err != nil {
return nil, err
Expand All @@ -161,7 +161,7 @@ func (m *Messages) Consume(c common.Cluster) (MessageSet, error) {
return ms, nil
}

func (fr *Messages) DoConsume(c common.Broker) (messages MessageSet, err error) {
func (fr *Messages) DoConsume(c model.Broker) (messages MessageSet, err error) {
req := FetchRequest{
ReplicaID: -1,
MaxWaitTime: int32(fr.MaxWaitTime / time.Millisecond),
Expand Down Expand Up @@ -228,7 +228,7 @@ type Offset struct {
Retention time.Duration
}

func (o *Offset) Commit(c common.Cluster) error {
func (o *Offset) Commit(c model.Cluster) error {
coord, err := c.Coordinator(o.Group)
if err != nil {
return err
Expand All @@ -242,7 +242,7 @@ func (o *Offset) Commit(c common.Cluster) error {
return nil
}

func (commit *Offset) DoCommit(b common.Broker) error {
func (commit *Offset) DoCommit(b model.Broker) error {
req := OffsetCommitRequestV1{
ConsumerGroupID: commit.Group,
OffsetCommitInTopicV1s: []OffsetCommitInTopicV1{
Expand Down Expand Up @@ -280,7 +280,7 @@ func (commit *Offset) DoCommit(b common.Broker) error {
return fmt.Errorf("fail to commit offset: %v", commit)
}

func (o *Offset) Fetch(c common.Cluster) (int64, error) {
func (o *Offset) Fetch(c model.Cluster) (int64, error) {
coord, err := c.Coordinator(o.Group)
if err != nil {
return -1, err
Expand All @@ -295,7 +295,7 @@ func (o *Offset) Fetch(c common.Cluster) (int64, error) {
return offset, nil
}

func (o *Offset) DoFetch(b common.Broker) (int64, error) {
func (o *Offset) DoFetch(b model.Broker) (int64, error) {
req := OffsetFetchRequestV1{
ConsumerGroup: o.Group,
PartitionInTopics: []PartitionInTopic{
Expand Down Expand Up @@ -331,7 +331,7 @@ type OffsetByTime struct {
Time time.Time
}

func (o *OffsetByTime) Fetch(c common.Cluster) (int64, error) {
func (o *OffsetByTime) Fetch(c model.Cluster) (int64, error) {
leader, err := c.Leader(o.Topic, o.Partition)
if err != nil {
return -1, err
Expand All @@ -346,7 +346,7 @@ func (o *OffsetByTime) Fetch(c common.Cluster) (int64, error) {
return offset, nil
}

func (o *OffsetByTime) DoFetch(b common.Broker) (int64, error) {
func (o *OffsetByTime) DoFetch(b model.Broker) (int64, error) {
var milliSec int64
switch o.Time {
case Latest:
Expand Down
8 changes: 4 additions & 4 deletions proto/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"fmt"
"time"

"h12.me/kafka/common"
"h12.me/kafka/model"
)

type GetTimeFunc func([]byte) (time.Time, error)

func (o *OffsetByTime) Search(cl common.Cluster, getTime GetTimeFunc) (int64, error) {
func (o *OffsetByTime) Search(cl model.Cluster, getTime GetTimeFunc) (int64, error) {
earliest, err := (&OffsetByTime{
Topic: o.Topic,
Partition: o.Partition,
Expand Down Expand Up @@ -61,7 +61,7 @@ func (o *OffsetByTime) Search(cl common.Cluster, getTime GetTimeFunc) (int64, er
return o.searchOffsetBefore(cl, earliest, mid, latest, getter)
}

func (o *OffsetByTime) searchOffsetBefore(cl common.Cluster, min, mid, max int64, getter timeGetter) (int64, error) {
func (o *OffsetByTime) searchOffsetBefore(cl model.Cluster, min, mid, max int64, getter timeGetter) (int64, error) {
const maxJitter = 1000 // time may be interleaved in a small range
midTime, err := getter.get(mid)
if err != nil {
Expand Down Expand Up @@ -100,7 +100,7 @@ func (o *OffsetByTime) searchOffsetBefore(cl common.Cluster, min, mid, max int64

type timeGetter struct {
Messages
cl common.Cluster
cl model.Cluster
getTime GetTimeFunc
}

Expand Down
Loading

0 comments on commit 0471839

Please sign in to comment.