Skip to content

Commit

Permalink
Adding publisher event cache (elastic#14568)
Browse files Browse the repository at this point in the history
* Adding publisher event cache

* Adding CHANGELOG entry

* Store ptr to EventCache so copy-by-value uses same cache

* Remove mutex

* Adding back nil guard for map

* Pass cache by pointer

* Adding debug logging

* Set default number of partitions for Kafka cluster to 15

* Guarding per-event debug logs with logp.IsDebug

* Update tests

* WIP: make testReadFromKafkaTopic work with multiple partitions

* Create consumers outside goroutine

* Fix compile errors

* Check channel state

* Fixing up tests

* [WIP] Getting Docker logs for all containers on failure

* [WIP] Temporarily increase timeout for integration tests

* Remove temporary Makefile changes introduced for debugging

* Reduce number of default Kafka partitions to 3 (was 15)
  • Loading branch information
ycombinator authored Dec 6, 2019
1 parent cc00d01 commit b5270dd
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
- Fix memory leak in kubernetes autodiscover provider and add_kubernetes_metadata processor happening when pods are terminated without sending a delete event. {pull}14259[14259]
- Fix kubernetes `metaGenerator.ResourceMetadata` when parent reference controller is nil {issue}14320[14320] {pull}14329[14329]
- Allow users to configure only `cluster_uuid` setting under `monitoring` namespace. {pull}14338[14338]
- Fix bug with potential concurrent reads and writes from event.Meta map by Kafka output. {issue}14542[14542] {pull}14568[14568]

*Auditbeat*

Expand Down
26 changes: 22 additions & 4 deletions filebeat/input/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,21 @@ func TestInput(t *testing.T) {
input.Run()

timeout := time.After(30 * time.Second)
for _, m := range messages {
for range messages {
select {
case event := <-events:
text, err := event.Fields.GetValue("message")
v, err := event.Fields.GetValue("message")
if err != nil {
t.Fatal(err)
}
assert.Equal(t, text, m.message)
text, ok := v.(string)
if !ok {
t.Fatal("could not get message text from event")
}
msg := findMessage(t, text, messages)
assert.Equal(t, text, msg.message)

checkMatchingHeaders(t, event, m.headers)
checkMatchingHeaders(t, event, msg.headers)
case <-timeout:
t.Fatal("timeout waiting for incoming events")
}
Expand Down Expand Up @@ -251,6 +256,19 @@ func TestInputWithMultipleEvents(t *testing.T) {
}
}

func findMessage(t *testing.T, text string, msgs []testMessage) *testMessage {
var msg *testMessage
for _, m := range msgs {
if text == m.message {
msg = &m
break
}
}

assert.NotNil(t, msg)
return msg
}

func checkMatchingHeaders(
t *testing.T, event beat.Event, expected []sarama.RecordHeader,
) {
Expand Down
38 changes: 25 additions & 13 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,19 +162,27 @@ func (c *client) String() string {
func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
event := &data.Content
msg := &message{partition: -1, data: *data}
if event.Meta != nil {
if value, ok := event.Meta["partition"]; ok {
if partition, ok := value.(int32); ok {
msg.partition = partition
}

value, err := data.Cache.GetValue("partition")
if err == nil {
if logp.IsDebug(debugSelector) {
debugf("got event.Meta[\"partition\"] = %v", value)
}
if partition, ok := value.(int32); ok {
msg.partition = partition
}
}

if value, ok := event.Meta["topic"]; ok {
if topic, ok := value.(string); ok {
msg.topic = topic
}
value, err = data.Cache.GetValue("topic")
if err == nil {
if logp.IsDebug(debugSelector) {
debugf("got event.Meta[\"topic\"] = %v", value)
}
if topic, ok := value.(string); ok {
msg.topic = topic
}
}

if msg.topic == "" {
topic, err := c.topic.Select(event)
if err != nil {
Expand All @@ -184,15 +192,19 @@ func (c *client) getEventMessage(data *publisher.Event) (*message, error) {
return nil, errNoTopicsSelected
}
msg.topic = topic
if event.Meta == nil {
event.Meta = map[string]interface{}{}
if logp.IsDebug(debugSelector) {
debugf("setting event.Meta[\"topic\"] = %v", topic)
}
if _, err := data.Cache.Put("topic", topic); err != nil {
return nil, fmt.Errorf("setting kafka topic in publisher event failed: %v", err)
}
event.Meta["topic"] = topic
}

serializedEvent, err := c.codec.Encode(c.index, event)
if err != nil {
logp.Debug("kafka", "Failed event: %v", event)
if logp.IsDebug(debugSelector) {
debugf("failed event: %v", event)
}
return nil, err
}

Expand Down
4 changes: 3 additions & 1 deletion libbeat/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ const (
// NOTE: maxWaitRetry has no effect on mode, as logstash client currently does
// not return ErrTempBulkFailure
defaultMaxWaitRetry = 60 * time.Second

debugSelector = "kafka"
)

var debugf = logp.MakeDebug("kafka")
var debugf = logp.MakeDebug(debugSelector)

var (
errNoTopicSet = errors.New("No topic configured")
Expand Down
139 changes: 116 additions & 23 deletions libbeat/outputs/kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,33 +243,63 @@ func TestKafkaPublish(t *testing.T) {
validate = makeValidateFmtStr(fmt.(string))
}

for i, d := range expected {
validate(t, stored[i].Value, d)
seenMsgs := map[string]struct{}{}
for _, s := range stored {
msg := validate(t, s.Value, expected)
seenMsgs[msg] = struct{}{}
}
assert.Equal(t, len(expected), len(seenMsgs))
})
}
}

func validateJSON(t *testing.T, value []byte, event beat.Event) {
func validateJSON(t *testing.T, value []byte, events []beat.Event) string {
var decoded map[string]interface{}
err := json.Unmarshal(value, &decoded)
if err != nil {
t.Errorf("can not json decode event value: %v", value)
return
return ""
}

msg := decoded["message"].(string)
event := findEvent(events, msg)
if event == nil {
t.Errorf("could not find expected event with message: %v", msg)
return ""
}

assert.Equal(t, decoded["type"], event.Fields["type"])
assert.Equal(t, decoded["message"], event.Fields["message"])

return msg
}

func makeValidateFmtStr(fmt string) func(*testing.T, []byte, beat.Event) {
func makeValidateFmtStr(fmt string) func(*testing.T, []byte, []beat.Event) string {
fmtString := fmtstr.MustCompileEvent(fmt)
return func(t *testing.T, value []byte, event beat.Event) {
expectedMessage, err := fmtString.Run(&event)
return func(t *testing.T, value []byte, events []beat.Event) string {
msg := string(value)
event := findEvent(events, msg)
if event == nil {
t.Errorf("could not find expected event with message: %v", msg)
return ""
}

_, err := fmtString.Run(event)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, string(expectedMessage), string(value))

return msg
}
}

func findEvent(events []beat.Event, msg string) *beat.Event {
for _, e := range events {
if e.Fields["message"] == msg {
return &e
}
}

return nil
}

func strDefault(a, defaults string) string {
Expand Down Expand Up @@ -307,7 +337,49 @@ func newTestConsumer(t *testing.T) sarama.Consumer {
return consumer
}

var testTopicOffsets = map[string]int64{}
// topicOffsetMap is threadsafe map from topic => partition => offset
type topicOffsetMap struct {
m map[string]map[int32]int64
mu sync.RWMutex
}

func (m *topicOffsetMap) GetOffset(topic string, partition int32) int64 {
m.mu.RLock()
defer m.mu.RUnlock()

if m.m == nil {
return sarama.OffsetOldest
}

topicMap, ok := m.m[topic]
if !ok {
return sarama.OffsetOldest
}

offset, ok := topicMap[partition]
if !ok {
return sarama.OffsetOldest
}

return offset
}

func (m *topicOffsetMap) SetOffset(topic string, partition int32, offset int64) {
m.mu.Lock()
defer m.mu.Unlock()

if m.m == nil {
m.m = map[string]map[int32]int64{}
}

if _, ok := m.m[topic]; !ok {
m.m[topic] = map[int32]int64{}
}

m.m[topic][partition] = offset
}

var testTopicOffsets = topicOffsetMap{}

func testReadFromKafkaTopic(
t *testing.T, topic string, nMessages int,
Expand All @@ -318,31 +390,52 @@ func testReadFromKafkaTopic(
consumer.Close()
}()

offset, found := testTopicOffsets[topic]
if !found {
offset = sarama.OffsetOldest
}

partitionConsumer, err := consumer.ConsumePartition(topic, 0, offset)
partitions, err := consumer.Partitions(topic)
if err != nil {
t.Fatal(err)
}
defer func() {
partitionConsumer.Close()
}()

timer := time.After(timeout)
done := make(chan struct{})
msgs := make(chan *sarama.ConsumerMessage)
for _, partition := range partitions {
offset := testTopicOffsets.GetOffset(topic, partition)
partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
if err != nil {
t.Fatal(err)
}
defer func() {
partitionConsumer.Close()
}()

go func(p int32, pc sarama.PartitionConsumer) {
for {
select {
case msg, ok := <-pc.Messages():
if !ok {
break
}
testTopicOffsets.SetOffset(topic, p, msg.Offset+1)
msgs <- msg
case <-done:
break
}
}
}(partition, partitionConsumer)
}

var messages []*sarama.ConsumerMessage
for i := 0; i < nMessages; i++ {
timer := time.After(timeout)

for len(messages) < nMessages {
select {
case msg := <-partitionConsumer.Messages():
case msg := <-msgs:
messages = append(messages, msg)
testTopicOffsets[topic] = msg.Offset + 1
case <-timer:
break
}
}

close(done)
return messages
}

Expand Down
11 changes: 7 additions & 4 deletions libbeat/outputs/kafka/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,14 @@ func (p *messagePartitioner) Partition(
}

msg.partition = partition
event := &msg.data.Content
if event.Meta == nil {
event.Meta = map[string]interface{}{}

if logp.IsDebug(debugSelector) {
debugf("setting event.Meta[\"partition\"] = %v", partition)
}
if _, err := msg.data.Cache.Put("partition", partition); err != nil {
return 0, fmt.Errorf("setting kafka partition in publisher event failed: %v", err)
}
event.Meta["partition"] = partition

p.partitions = numPartitions
return msg.partition, nil
}
Expand Down
28 changes: 28 additions & 0 deletions libbeat/publisher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package publisher

import (
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
)

// Batch is used to pass a batch of events to the outputs and asynchronously listening
Expand All @@ -41,11 +42,38 @@ type Batch interface {
type Event struct {
Content beat.Event
Flags EventFlags
Cache EventCache
}

// EventFlags provides additional flags/option types for used with the outputs.
type EventFlags uint8

// EventCache provides a space for outputs to define per-event metadata
// that's intended to be used only within the scope of an output
type EventCache struct {
m common.MapStr
}

// Put lets outputs put key-value pairs into the event cache
func (ec *EventCache) Put(key string, value interface{}) (interface{}, error) {
if ec.m == nil {
// uninitialized map
ec.m = common.MapStr{}
}

return ec.m.Put(key, value)
}

// GetValue lets outputs retrieve values from the event cache by key
func (ec *EventCache) GetValue(key string) (interface{}, error) {
if ec.m == nil {
// uninitialized map
return nil, common.ErrKeyNotFound
}

return ec.m.GetValue(key)
}

const (
// GuaranteedSend requires an output to not drop the event on failure, but
// retry until ACK.
Expand Down
3 changes: 2 additions & 1 deletion testing/environments/docker/kafka/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ mkdir -p ${KAFKA_LOGS_DIR}
${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties \
--override delete.topic.enable=true --override advertised.host.name=${KAFKA_ADVERTISED_HOST} \
--override listeners=PLAINTEXT://0.0.0.0:9092 \
--override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 &
--override logs.dir=${KAFKA_LOGS_DIR} --override log.flush.interval.ms=200 \
--override num.partitions=3 &

wait_for_port 9092

Expand Down

0 comments on commit b5270dd

Please sign in to comment.