Skip to content

Commit

Permalink
Merge pull request #942 from urso/feature/kafka-output
Browse files Browse the repository at this point in the history
Proposal: kafka output
  • Loading branch information
tsg committed Feb 10, 2016
2 parents bd92dd8 + 2f64b7e commit 1a6c611
Show file tree
Hide file tree
Showing 198 changed files with 20,974 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ https://github.com/elastic/beats/compare/v1.1.0...master[Check the HEAD diff]
- Add ability to create Elasticsearch mapping on startup {pull}639[639]
- Add option to elasticsearch output to pass http parameters in index operations {issue}805[805]
- Improve logstash and elasticsearch backoff behavior. {pull}927[927]
- Add experimental Kafka output. {pull}942[942]

*Packetbeat*
- Change the DNS library used throughout the dns package to github.com/miekg/dns. {pull}803[803]
Expand Down
12 changes: 12 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@ import:
version: v1.2.0
- package: github.com/miekg/dns
version: 85b661b2a6fc95a5a83e66d7730c4bc0b6e9c99e
- package: github.com/Shopify/sarama
version: v1.8.0
- package: github.com/klauspost/crc32
version: 6973dcf6594efa905c08260fe9120cae92ab4305
- package: github.com/golang/snappy
version: 894fd4616c897c201d223c3c0c128e8c648c96a2
- package: github.com/eapache/queue
version: ded5959c0d4e360646dc9e9908cff48666781367
- package: github.com/eapache/go-resiliency
version: b86b1ec0dd4209a588dc1285cdd471e73525c0b3
subpackages:
- breaker
75 changes: 75 additions & 0 deletions libbeat/docs/outputconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ You can configure multiple outputs for exporting the correlated transactions. Cu
* <<elasticsearch-output,Elasticsearch>>
* <<logstash-output,Logstash>>
* <<redis-output,Redis (DEPRECATED)>>
* <<kafka-output,Kafka>>
* <<file-output,File>>
* <<console-output,Console>>

Expand Down Expand Up @@ -523,6 +524,80 @@ setting does not affect how events are published.

Setting `bulk_max_size` to values less than or equal to 0 disables buffering in libbeat.

[[kafka-output]]
==== Kafka Output

===== hosts

List of Kafka broker addressed to connect to.

===== topic

The kafka topic used for produced events. If use_type is set to true, topic will not be used.

===== use_type

Set kafka topic by event type. If use_type is false, topic must be configured. The deault is false.

===== client_id

Configurable ClientID used for logging, debugging and auditing purposes. The default is "beats".

===== worker

Number of concurrent load-balanced kafka output workers.

===== max_retries

The number of times to retry publishing an event after a publishing failure.
After the specified number of retries, the events are typically dropped.
Some Beats, such as Filebeat, ignore the `max_retries` setting and retry until all
events are published.

Set `max_retries` to a value less than 0 to retry until all events are published.

The default is 3.

===== bulk_max_size

The maximum number of events to bulk in a single Logstash request. The default is 2048.

===== timeout

The number of seconds to wait for responses from the Kafka brokers before timing
out. The default is 30 (seconds).

===== broker_timeout

Maximum duration a broker will wait for number of required ACKs. The default is 10s.

===== keep_alive

Keep-alive period for an active network connection. If 0s, keep-alives are disabled. The default is 0 seconds.

===== compression

Select output compression codec. Must be one of `none`, `snappy` and `gzip`. The default is `snappy`.

===== max_message_bytes

Max permitted size of json-encoded messages. Bigger messages will be dropped. The default value is 1000000 (bytes). Should be equal or less to the brokers `message.max.bytes`.

===== required_acks

ACK reliability level required from broker. 0=no response, 1=wait for local commit, -1=wait for all replicas to commit. The default is 1.

Note: If set to 0, no ACKs are returned by kafka. Messages might be lost silently on error.

===== flush_interval

The number of seconds to wait for new events between two producer API calls.

===== tls

Configuration options for TLS parameters like the root CA for Kibana connections. See
<<configuration-output-tls>> for more information.

[[file-output]]
==== File Output

Expand Down
188 changes: 188 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package kafka

import (
"encoding/json"
"expvar"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
)

type client struct {
hosts []string
topic string
useType bool
config sarama.Config

producer sarama.AsyncProducer

wg sync.WaitGroup

isConnected int32
}

type msgRef struct {
count int32
err atomic.Value
batch []common.MapStr
cb func([]common.MapStr, error)
}

var (
ackedEvents = expvar.NewInt("libbeatKafkaPublishedAndAckedEvents")
eventsNotAcked = expvar.NewInt("libbeatKafkaPublishedButNotAckedEvents")
publishEventsCallCount = expvar.NewInt("libbeatKafkaPublishEventsCallCount")
)

func newKafkaClient(hosts []string, topic string, useType bool, cfg *sarama.Config) (*client, error) {
c := &client{
hosts: hosts,
useType: useType,
topic: topic,
config: *cfg,
}
return c, nil
}

func (c *client) Connect(timeout time.Duration) error {
debugf("connect: %v", c.hosts)

c.config.Net.DialTimeout = timeout

// try to connect
producer, err := sarama.NewAsyncProducer(c.hosts, &c.config)
if err != nil {
logp.Err("Kafka connect fails with: %v", err)
return err
}

c.producer = producer

c.wg.Add(2)
go c.successWorker(producer.Successes())
go c.errorWorker(producer.Errors())
atomic.StoreInt32(&c.isConnected, 1)

return nil
}

func (c *client) Close() error {
if c.IsConnected() {
debugf("closed kafka client")

c.producer.AsyncClose()
c.wg.Wait()
atomic.StoreInt32(&c.isConnected, 0)
c.producer = nil
}
return nil
}

func (c *client) IsConnected() bool {
return atomic.LoadInt32(&c.isConnected) != 0
}

func (c *client) AsyncPublishEvent(
cb func(error),
event common.MapStr,
) error {
return c.AsyncPublishEvents(func(_ []common.MapStr, err error) {
cb(err)
}, []common.MapStr{event})
}

func (c *client) AsyncPublishEvents(
cb func([]common.MapStr, error),
events []common.MapStr,
) error {
publishEventsCallCount.Add(1)
debugf("publish events")

ref := &msgRef{
count: int32(len(events)),
batch: events,
cb: cb,
}

ch := c.producer.Input()

for _, event := range events {
topic := c.topic
if c.useType {
topic = event["type"].(string)
}

jsonEvent, err := json.Marshal(event)
if err != nil {
ref.done()
continue
}

msg := &sarama.ProducerMessage{
Metadata: ref,
Topic: topic,
Value: sarama.ByteEncoder(jsonEvent),
}

ch <- msg
}

return nil
}

func (c *client) successWorker(ch <-chan *sarama.ProducerMessage) {
defer c.wg.Done()
defer debugf("Stop kafka ack worker")

for msg := range ch {
ref := msg.Metadata.(*msgRef)
ref.done()
}
}

func (c *client) errorWorker(ch <-chan *sarama.ProducerError) {
defer c.wg.Done()
defer debugf("Stop kafka error handler")

for errMsg := range ch {
msg := errMsg.Msg
ref := msg.Metadata.(*msgRef)
ref.fail(errMsg.Err)
}
}

func (r *msgRef) done() {
r.dec()
}

func (r *msgRef) fail(err error) {
debugf("Kafka publish failed with: %v", err)

r.err.Store(err)
r.dec()
}

func (r *msgRef) dec() {
i := atomic.AddInt32(&r.count, -1)
if i > 0 {
return
}

debugf("finished kafka batch")

var err error
v := r.err.Load()
if v != nil {
err = v.(error)
eventsNotAcked.Add(int64(len(r.batch)))
r.cb(r.batch, err)
} else {
ackedEvents.Add(int64(len(r.batch)))
r.cb(nil, nil)
}
}
Loading

0 comments on commit 1a6c611

Please sign in to comment.