diff --git a/broker/broker.go b/broker/broker.go index d334c7b..2e0252c 100644 --- a/broker/broker.go +++ b/broker/broker.go @@ -7,7 +7,7 @@ import ( "sync/atomic" "time" - "h12.me/kafka/common" + "h12.me/kafka/model" ) var ( @@ -25,12 +25,12 @@ type B struct { } type brokerJob struct { - req common.Request - resp common.Response + req model.Request + resp model.Response errChan chan error } -func New(addr string) common.Broker { +func New(addr string) model.Broker { return &B{ Addr: addr, Timeout: 30 * time.Second, @@ -38,7 +38,7 @@ func New(addr string) common.Broker { } } -func (b *B) Do(req common.Request, resp common.Response) error { +func (b *B) Do(req model.Request, resp model.Response) error { req.SetID(atomic.AddInt32(&b.cid, 1)) errChan := make(chan error) if err := b.sendJob(&brokerJob{ diff --git a/cluster/cluster.go b/cluster/cluster.go index 0329f0e..1ff501e 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -5,8 +5,8 @@ import ( "fmt" "sync" - "h12.me/kafka/common" "h12.me/kafka/log" + "h12.me/kafka/model" "h12.me/kafka/proto" ) @@ -22,10 +22,10 @@ type ( pool *brokerPool mu sync.Mutex } - NewBrokerFunc func(addr string) common.Broker + NewBrokerFunc func(addr string) model.Broker ) -func New(newBroker NewBrokerFunc, brokers []string) common.Cluster { +func New(newBroker NewBrokerFunc, brokers []string) model.Cluster { c := &C{ topics: newTopicPartitions(), pool: newBrokerPool(newBroker), @@ -51,7 +51,7 @@ func (c *C) Partitions(topic string) ([]int32, error) { return nil, fmt.Errorf("topic %s not found", topic) } -func (c *C) Coordinator(group string) (common.Broker, error) { +func (c *C) Coordinator(group string) (model.Broker, error) { if coord, err := c.pool.GetCoordinator(group); err == nil { return coord, nil } @@ -61,7 +61,7 @@ func (c *C) Coordinator(group string) (common.Broker, error) { return c.pool.GetCoordinator(group) } -func (c *C) Leader(topic string, partition int32) (common.Broker, error) { +func (c *C) Leader(topic string, partition int32) (model.Broker, error) { if leader, err := c.pool.GetLeader(topic, partition); err == nil { return leader, nil } diff --git a/cluster/pool.go b/cluster/pool.go index b4221a0..fe54191 100644 --- a/cluster/pool.go +++ b/cluster/pool.go @@ -3,16 +3,16 @@ package cluster import ( "sync" - "h12.me/kafka/common" + "h12.me/kafka/model" "h12.me/kafka/log" ) type brokerPool struct { - addrBroker map[string]common.Broker + addrBroker map[string]model.Broker idAddr map[int32]string - topicPartitionLeader map[topicPartition]common.Broker - groupCoordinator map[string]common.Broker - newBroker func(string) common.Broker + topicPartitionLeader map[topicPartition]model.Broker + groupCoordinator map[string]model.Broker + newBroker func(string) model.Broker mu sync.Mutex } @@ -21,30 +21,30 @@ type topicPartition struct { partition int32 } -func newBrokerPool(newBroker func(string) common.Broker) *brokerPool { +func newBrokerPool(newBroker func(string) model.Broker) *brokerPool { return &brokerPool{ - addrBroker: make(map[string]common.Broker), + addrBroker: make(map[string]model.Broker), idAddr: make(map[int32]string), - topicPartitionLeader: make(map[topicPartition]common.Broker), - groupCoordinator: make(map[string]common.Broker), + topicPartitionLeader: make(map[topicPartition]model.Broker), + groupCoordinator: make(map[string]model.Broker), newBroker: newBroker, } } -func (p *brokerPool) Brokers() (map[string]common.Broker, error) { +func (p *brokerPool) Brokers() (map[string]model.Broker, error) { if len(p.addrBroker) > 0 { return p.addrBroker, nil } return nil, ErrNoBrokerFound } -func (p *brokerPool) AddAddr(addr string) common.Broker { +func (p *brokerPool) AddAddr(addr string) model.Broker { p.mu.Lock() defer p.mu.Unlock() return p.addAddr(addr) } -func (p *brokerPool) addAddr(addr string) common.Broker { +func (p *brokerPool) addAddr(addr string) model.Broker { if broker, ok := p.addrBroker[addr]; ok { return broker } @@ -53,12 +53,12 @@ func (p *brokerPool) addAddr(addr string) common.Broker { return broker } -func (p *brokerPool) add(brokerID int32, addr string) common.Broker { +func (p *brokerPool) add(brokerID int32, addr string) model.Broker { p.idAddr[brokerID] = addr return p.addAddr(addr) } -func (p *brokerPool) Add(brokerID int32, addr string) common.Broker { +func (p *brokerPool) Add(brokerID int32, addr string) model.Broker { p.mu.Lock() defer p.mu.Unlock() return p.add(brokerID, addr) @@ -75,7 +75,7 @@ func (p *brokerPool) SetLeader(topic string, partition int32, brokerID int32) er return nil } -func (p *brokerPool) GetLeader(topic string, partition int32) (common.Broker, error) { +func (p *brokerPool) GetLeader(topic string, partition int32) (model.Broker, error) { p.mu.Lock() defer p.mu.Unlock() broker, ok := p.topicPartitionLeader[topicPartition{topic, partition}] @@ -104,7 +104,7 @@ func (p *brokerPool) SetCoordinator(consumerGroup string, brokerID int32, addr s p.groupCoordinator[consumerGroup] = broker } -func (p *brokerPool) GetCoordinator(consumerGroup string) (common.Broker, error) { +func (p *brokerPool) GetCoordinator(consumerGroup string) (model.Broker, error) { p.mu.Lock() defer p.mu.Unlock() broker, ok := p.groupCoordinator[consumerGroup] @@ -114,7 +114,7 @@ func (p *brokerPool) GetCoordinator(consumerGroup string) (common.Broker, error) return broker, nil } -func (p *brokerPool) find(brokerID int32) (common.Broker, error) { +func (p *brokerPool) find(brokerID int32) (model.Broker, error) { if addr, ok := p.idAddr[brokerID]; ok { if broker, ok := p.addrBroker[addr]; ok { return broker, nil diff --git a/consumer/consumer.go b/consumer/consumer.go index 0e64dc7..b104cfe 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -4,7 +4,7 @@ import ( "errors" "time" - "h12.me/kafka/common" + "h12.me/kafka/model" "h12.me/kafka/proto" ) @@ -24,10 +24,10 @@ type C struct { MinBytes int MaxBytes int OffsetRetention time.Duration - cluster common.Cluster + cluster model.Cluster } -func New(cluster common.Cluster) *C { +func New(cluster model.Cluster) *C { return &C{ cluster: cluster, MaxWaitTime: 100 * time.Millisecond, diff --git a/common/type.go b/model/type.go similarity index 96% rename from common/type.go rename to model/type.go index bfa0d4d..68b195e 100644 --- a/common/type.go +++ b/model/type.go @@ -1,4 +1,4 @@ -package common +package model import ( "io" diff --git a/producer/producer.go b/producer/producer.go index e8d671f..5c83ba7 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "h12.me/kafka/common" + "h12.me/kafka/model" "h12.me/kafka/proto" ) @@ -18,11 +18,11 @@ type P struct { LeaderRecoveryTime time.Duration RequiredAcks int16 AckTimeout time.Duration - cluster common.Cluster + cluster model.Cluster topicPartitioner *topicPartitioner } -func New(cluster common.Cluster) *P { +func New(cluster model.Cluster) *P { return &P{ cluster: cluster, topicPartitioner: newTopicPartitioner(), diff --git a/proto/api.go b/proto/api.go index 378a7d6..a25bd7a 100644 --- a/proto/api.go +++ b/proto/api.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "h12.me/kafka/common" + "h12.me/kafka/model" ) var ( @@ -14,7 +14,7 @@ var ( type client struct { id string - doer common.Broker + doer model.Broker } func (r *Response) ID() int32 { return r.CorrelationID } @@ -37,7 +37,7 @@ const clientID = "h12.me/kafka" type Metadata string -func (m Metadata) Fetch(b common.Broker) (*TopicMetadataResponse, error) { +func (m Metadata) Fetch(b model.Broker) (*TopicMetadataResponse, error) { topic := string(m) req := TopicMetadataRequest([]string{topic}) resp := TopicMetadataResponse{} @@ -63,7 +63,7 @@ func (m Metadata) Fetch(b common.Broker) (*TopicMetadataResponse, error) { type GroupCoordinator string -func (group GroupCoordinator) Fetch(b common.Broker) (*Broker, error) { +func (group GroupCoordinator) Fetch(b model.Broker) (*Broker, error) { req := GroupCoordinatorRequest(group) resp := GroupCoordinatorResponse{} if err := (client{clientID, b}).Do(&req, &resp); err != nil { @@ -83,7 +83,7 @@ type Payload struct { AckTimeout time.Duration } -func (p *Payload) Produce(c common.Cluster) error { +func (p *Payload) Produce(c model.Cluster) error { leader, err := c.Leader(p.Topic, p.Partition) if err != nil { return err @@ -97,7 +97,7 @@ func (p *Payload) Produce(c common.Cluster) error { return nil } -func (p *Payload) DoProduce(b common.Broker) error { +func (p *Payload) DoProduce(b model.Broker) error { req := ProduceRequest{ RequiredAcks: p.RequiredAcks, Timeout: int32(p.AckTimeout / time.Millisecond), @@ -146,7 +146,7 @@ type Messages struct { MaxWaitTime time.Duration } -func (m *Messages) Consume(c common.Cluster) (MessageSet, error) { +func (m *Messages) Consume(c model.Cluster) (MessageSet, error) { leader, err := c.Leader(m.Topic, m.Partition) if err != nil { return nil, err @@ -161,7 +161,7 @@ func (m *Messages) Consume(c common.Cluster) (MessageSet, error) { return ms, nil } -func (fr *Messages) DoConsume(c common.Broker) (messages MessageSet, err error) { +func (fr *Messages) DoConsume(c model.Broker) (messages MessageSet, err error) { req := FetchRequest{ ReplicaID: -1, MaxWaitTime: int32(fr.MaxWaitTime / time.Millisecond), @@ -228,7 +228,7 @@ type Offset struct { Retention time.Duration } -func (o *Offset) Commit(c common.Cluster) error { +func (o *Offset) Commit(c model.Cluster) error { coord, err := c.Coordinator(o.Group) if err != nil { return err @@ -242,7 +242,7 @@ func (o *Offset) Commit(c common.Cluster) error { return nil } -func (commit *Offset) DoCommit(b common.Broker) error { +func (commit *Offset) DoCommit(b model.Broker) error { req := OffsetCommitRequestV1{ ConsumerGroupID: commit.Group, OffsetCommitInTopicV1s: []OffsetCommitInTopicV1{ @@ -280,7 +280,7 @@ func (commit *Offset) DoCommit(b common.Broker) error { return fmt.Errorf("fail to commit offset: %v", commit) } -func (o *Offset) Fetch(c common.Cluster) (int64, error) { +func (o *Offset) Fetch(c model.Cluster) (int64, error) { coord, err := c.Coordinator(o.Group) if err != nil { return -1, err @@ -295,7 +295,7 @@ func (o *Offset) Fetch(c common.Cluster) (int64, error) { return offset, nil } -func (o *Offset) DoFetch(b common.Broker) (int64, error) { +func (o *Offset) DoFetch(b model.Broker) (int64, error) { req := OffsetFetchRequestV1{ ConsumerGroup: o.Group, PartitionInTopics: []PartitionInTopic{ @@ -331,7 +331,7 @@ type OffsetByTime struct { Time time.Time } -func (o *OffsetByTime) Fetch(c common.Cluster) (int64, error) { +func (o *OffsetByTime) Fetch(c model.Cluster) (int64, error) { leader, err := c.Leader(o.Topic, o.Partition) if err != nil { return -1, err @@ -346,7 +346,7 @@ func (o *OffsetByTime) Fetch(c common.Cluster) (int64, error) { return offset, nil } -func (o *OffsetByTime) DoFetch(b common.Broker) (int64, error) { +func (o *OffsetByTime) DoFetch(b model.Broker) (int64, error) { var milliSec int64 switch o.Time { case Latest: diff --git a/proto/offset.go b/proto/offset.go index 23aa9da..ee96534 100644 --- a/proto/offset.go +++ b/proto/offset.go @@ -4,12 +4,12 @@ import ( "fmt" "time" - "h12.me/kafka/common" + "h12.me/kafka/model" ) type GetTimeFunc func([]byte) (time.Time, error) -func (o *OffsetByTime) Search(cl common.Cluster, getTime GetTimeFunc) (int64, error) { +func (o *OffsetByTime) Search(cl model.Cluster, getTime GetTimeFunc) (int64, error) { earliest, err := (&OffsetByTime{ Topic: o.Topic, Partition: o.Partition, @@ -61,7 +61,7 @@ func (o *OffsetByTime) Search(cl common.Cluster, getTime GetTimeFunc) (int64, er return o.searchOffsetBefore(cl, earliest, mid, latest, getter) } -func (o *OffsetByTime) searchOffsetBefore(cl common.Cluster, min, mid, max int64, getter timeGetter) (int64, error) { +func (o *OffsetByTime) searchOffsetBefore(cl model.Cluster, min, mid, max int64, getter timeGetter) (int64, error) { const maxJitter = 1000 // time may be interleaved in a small range midTime, err := getter.get(mid) if err != nil { @@ -100,7 +100,7 @@ func (o *OffsetByTime) searchOffsetBefore(cl common.Cluster, min, mid, max int64 type timeGetter struct { Messages - cl common.Cluster + cl model.Cluster getTime GetTimeFunc } diff --git a/spec/spec.html b/spec/spec.html index f18277e..9ff2262 100644 --- a/spec/spec.html +++ b/spec/spec.html @@ -44,7 +44,7 @@
This is a notation for handling repeated structures. These will always be encoded as an int32 size containing the length N followed by N repetitions of the structure which can itself be made up of other primitive types. In the BNF grammars below we will show an array of a structure foo as [foo].
The BNFs below give an exact context free grammar for the request and response binary format. For each API I will give the request and response together followed by all the sub-definitions. The BNF is intentionally not compact in order to give human-readable name (for example I define a production for ErrorCode even though it is just an int16 in order to give it a symbolic name). As always in a BNF a sequence of productions indicates concatenation, so the MetadataRequest given below would be a sequence of bytes containing first a VersionId, then a ClientId, and then an array of TopicNames (each of which has its own definition). Productions are always given in camel case and primitive types in lower case. When there are multiple possible productions these are separated with '|' and may be enclosed in parenthesis for grouping. The top-level definition is always given first and subsequent sub-parts are indented.
-All requests and responses originate from the following grammar which will be incrementally describe through the rest of this document:
The response will always match the paired request (e.g. we will send a MetadataResponse in return to a MetadataRequest).
One structure common to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.
+One structure model to both the produce and fetch requests is the message set format. A message in kafka is a key-value pair with a small amount of associated metadata. A message set is just a sequence of messages with offset and size information. This format happens to be used both for the on-disk storage on the broker and the on-the-wire format.
A message set is also the unit of compression in Kafka, and we allow messages to recursively contain compressed message sets to allow batch compression.
N.B., MessageSets are not preceded by an int32 like other array elements in the protocol.
Some people have asked why we don't use HTTP. There are a number of reasons, the best is that client implementors can make use of some of the more advanced TCP features--the ability to multiplex requests, the ability to simultaneously poll many connections, etc. We have also found HTTP libraries in many languages to be surprisingly shabby.
Others have asked if maybe we shouldn't support many different protocols. Prior experience with this was that it makes it very hard to add and test new features if they have to be ported across many protocol implementations. Our feeling is that most users don't really see multiple protocols as a feature, they just want a good reliable client in the language of their choice.
Another question is why we don't adopt XMPP, STOMP, AMQP or an existing protocol. The answer to this varies by protocol, but in general the problem is that the protocol does determine large parts of the implementation and we couldn't do what we are doing if we didn't have control over the protocol. Our belief is that it is possible to do better than existing messaging systems have in providing a truly distributed messaging system, and to do this we need to build something that works differently.
diff --git a/tools/kafpro/command.go b/tools/kafpro/command.go index 353db56..be200ab 100644 --- a/tools/kafpro/command.go +++ b/tools/kafpro/command.go @@ -11,8 +11,8 @@ import ( "sync/atomic" "time" - "h12.me/kafka/cluster" "h12.me/kafka/consumer" + "h12.me/kafka/model" "h12.me/kafka/proto" ) @@ -114,7 +114,7 @@ type ConsumeCommand struct { Count bool `long:"count"` } -func (cmd *ConsumeCommand) Exec(cl *cluster.C) error { +func (cmd *ConsumeCommand) Exec(cl model.Cluster) error { // TODO: detect format partitions, err := cl.Partitions(cmd.Topic) if err != nil { @@ -203,7 +203,7 @@ type CommitCommand struct { Retention int `long:"retention"` // millisecond } -func (cmd *CommitCommand) Exec(cl *cluster.C) error { +func (cmd *CommitCommand) Exec(cl model.Cluster) error { /* req := &broker.Request{ ClientID: clientID, diff --git a/tools/kafpro/main.go b/tools/kafpro/main.go index 6622ee3..09cbeb4 100644 --- a/tools/kafpro/main.go +++ b/tools/kafpro/main.go @@ -44,10 +44,7 @@ func main() { if err != nil { log.Fatal(err) } - c, err := cluster.New(broker.New, cfg.Brokers) - if err != nil { - log.Fatal(err) - } + c := cluster.New(broker.New, cfg.Brokers) //fmt.Println(toJSON(cfg)) switch parser.Active.Name { case "consume":