Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Changed format of metrics published to Kafka as a JSON #21

Merged
merged 1 commit into from
Jul 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ It's used in the [snap framework](http://github.com/intelsdi-x/snap).
* [Configuration and Usage](#configuration-and-usage)
2. [Documentation](#documentation)
* [Kafka Quickstart](#kafka-quickstart)
* [Published data] (#published-data)
* [Examples](#examples)
* [Roadmap](#roadmap)
3. [Community Support](#community-support)
Expand Down Expand Up @@ -100,9 +101,40 @@ $ docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka

Read more about Kafka on [http://kafka.apache.org](http://kafka.apache.org/documentation.html)

### Published data

The plugin publishes all collected metrics serialized as JSON to Kafka. An example of published data is below:

```json
[
{
"timestamp": "2016-07-25T11:27:59.795548989+02:00",
"namespace": "/intel/mock/bar",
"data": 82,
"unit": "",
"tags": {
"plugin_running_on": "my-machine"
},
"version": 0,
"last_advertised_time": "2016-07-25T11:27:21.852064032+02:00"
},
{
"timestamp": "2016-07-25T11:27:59.795549268+02:00",
"namespace": "/intel/mock/foo",
"data": 72,
"unit": "",
"tags": {
"plugin_running_on": "my-machine"
},
"version": 0,
"last_advertised_time": "2016-07-25T11:27:21.852063228+02:00"
}
]
```

### Examples

Example running mock collector plugin, passthru processor plugin, and writing data to Kafka.
Example of running mock collector plugin, passthru processor plugin, and writing data to Kafka.

Make sure that your `$SNAP_PATH` is set, if not:
```
Expand All @@ -114,14 +146,15 @@ $ $SNAP_PATH/bin/snapd -l 1 -t 0
```
In another terminal window:

Load snap-plugin-collector-mock1 plugin:
Load snap-plugin-collector-mock2 plugin:
```
$ $SNAP_PATH/bin/snapctl plugin load $SNAP_PATH/plugin/snap-plugin-collector-mock1
$ $SNAP_PATH/bin/snapctl plugin load $SNAP_PATH/plugin/snap-plugin-collector-mock2
```
See available metrics for your system:
```
$ $SNAP_PATH/bin/snapctl metric list
```

Load snap-plugin-processor-passthru plugin:
```
$ $SNAP_PATH/bin/snapctl plugin load $SNAP_PATH/plugin/snap-plugin-processor-passthru
Expand Down
75 changes: 51 additions & 24 deletions kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"os"
"strings"

log "github.com/Sirupsen/logrus"
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
Expand All @@ -37,7 +37,7 @@ import (

const (
PluginName = "kafka"
PluginVersion = 7
PluginVersion = 8
PluginType = plugin.PublisherPluginType
)

Expand All @@ -51,52 +51,58 @@ func NewKafkaPublisher() *kafkaPublisher {
return &kafkaPublisher{}
}

type MetricToPublish struct {
// The timestamp from when the metric was created.
Timestamp time.Time `json:"timestamp"`
Namespace string `json:"namespace"`
Data interface{} `json:"data"`
Unit string `json:"unit"`
Tags map[string]string `json:"tags"`
Version_ int `json:"version"`
// Last advertised time is the last time the snap agent was told about a metric.
LastAdvertisedTime time.Time `json:"last_advertised_time"`
}

// Publish sends data to a Kafka server
func (k *kafkaPublisher) Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error {

// Inserted and modified codes from intelsdi-x/snap-plugin-publisher-file/file/file.go
logger := log.New()
logger.Println("Publishing started")
var metrics []plugin.MetricType
var mts []plugin.MetricType

switch contentType {
case plugin.SnapGOBContentType:
dec := gob.NewDecoder(bytes.NewBuffer(content))
if err := dec.Decode(&metrics); err != nil {
logger.Printf("Error decoding: error=%v content=%v", err, content)
return err
// decode incoming metrics types
if err := dec.Decode(&mts); err != nil {
fmt.Fprintf(os.Stderr, "Error: invalid incoming content: %v, err=%v", content, err)
return fmt.Errorf("Cannot decode incoming content, err=%v", err)
}
default:
logger.Printf("Error unknown content type '%v'", contentType)
return fmt.Errorf("Unknown content type '%s'", contentType)
}

logger.Printf("publishing %v metrics to %v", len(metrics), config)
// format metrics types to metrics to be published
metrics := formatMetricTypes(mts)

jsonOut, err := json.Marshal(metrics)
if err != nil {
return fmt.Errorf("Error while marshalling metrics to JSON: %v", err)
return fmt.Errorf("Cannot marshal metrics to JSON format, err=%v", err)
}

// Inserted codes end

topic := config["topic"].(ctypes.ConfigValueStr).Value
brokers := parseBrokerString(config["brokers"].(ctypes.ConfigValueStr).Value)
err = k.publish(topic, brokers, []byte(jsonOut))
return err

return k.publish(topic, brokers, []byte(jsonOut))
}

func (k *kafkaPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
cp := cpolicy.New()
config := cpolicy.NewPolicyNode()

r1, err := cpolicy.NewStringRule("topic", true)
r1, err := cpolicy.NewStringRule("topic", false, "snap")
handleErr(err)
r1.Description = "Kafka topic for publishing"

r2, _ := cpolicy.NewStringRule("brokers", true)
r2, _ := cpolicy.NewStringRule("brokers", false, "localhost:9092")
handleErr(err)
r2.Description = "List of brokers in the format: broker-ip:port;broker-ip:port (ex: 192.168.1.1:9092;172.16.9.99:9092"
r2.Description = "List of brokers separated by semicolon in the format: <broker-ip:port;broker-ip:port> (ex: \"192.168.1.1:9092;172.16.9.99:9092\")"

config.Add(r1, r2)
cp.Add([]string{""}, config)
Expand All @@ -107,7 +113,7 @@ func (k *kafkaPublisher) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
func (k *kafkaPublisher) publish(topic string, brokers []string, content []byte) error {
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
return err
return fmt.Errorf("Cannot initialize a new Sarama SyncProducer using the given broker addresses (%v), err=%v", brokers, err)
}

_, _, err = producer.SendMessage(&sarama.ProducerMessage{
Expand All @@ -117,8 +123,29 @@ func (k *kafkaPublisher) publish(topic string, brokers []string, content []byte)
return err
}

// formatMetricTypes returns metrics in format to be publish as a JSON based on incoming metrics types;
// i.a. namespace is formatted as a single string
func formatMetricTypes(mts []plugin.MetricType) []MetricToPublish {
var metrics []MetricToPublish
for _, mt := range mts {
metrics = append(metrics, MetricToPublish{
Timestamp: mt.Timestamp(),
Namespace: mt.Namespace().String(),
Data: mt.Data(),
Unit: mt.Unit(),
Tags: mt.Tags(),
Version_: mt.Version(),
LastAdvertisedTime: mt.LastAdvertisedTime(),
})
}
return metrics
}
func parseBrokerString(brokerStr string) []string {
return strings.Split(brokerStr, ";")
// remove spaces from 'brokerStr'
brokers := strings.Replace(brokerStr, " ", "", -1)

// return split brokers separated by semicolon
return strings.Split(brokers, ";")
}

func handleErr(e error) {
Expand Down
10 changes: 5 additions & 5 deletions kafka/kafka_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (

var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

var mockMetric = plugin.MetricType{
var mockMetricType = plugin.MetricType{
Namespace_: core.NewNamespace("mock", "foo"),
Data_: 1,
Timestamp_: time.Now(),
Expand Down Expand Up @@ -69,11 +69,11 @@ func TestPublish(t *testing.T) {

Convey("publish mock metrics and consume", func() {
contentType := plugin.SnapGOBContentType
metrics := []plugin.MetricType{
mockMetric,
mts := []plugin.MetricType{
mockMetricType,
}

enc.Encode(metrics)
enc.Encode(mts)

// set config items
config["brokers"] = ctypes.ConfigValueStr{Value: brokers}
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestPublish(t *testing.T) {
So(m.Value, ShouldNotBeNil)

Convey("check if marshalled metrics and published data to Kafka are equal", func() {
metricsAsJson, err := json.Marshal(metrics)
metricsAsJson, err := json.Marshal(formatMetricTypes(mts))
So(err, ShouldBeNil)
So(string(m.Value), ShouldEqual, string(metricsAsJson))
})
Expand Down
25 changes: 22 additions & 3 deletions kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,30 @@ package kafka

import (
"testing"
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/ctypes"
. "github.com/smartystreets/goconvey/convey"
)

func TestKafkaPlugin(t *testing.T) {
var mockMts = []plugin.MetricType{
*plugin.NewMetricType(core.NewNamespace("foo"), time.Now(), nil, "", 99),
}

func TestMetaData(t *testing.T) {
Convey("Meta returns proper metadata", t, func() {
meta := Meta()
So(meta, ShouldNotBeNil)
So(meta.Name, ShouldResemble, PluginName)
So(meta.Version, ShouldResemble, PluginVersion)
So(meta.Type, ShouldResemble, plugin.PublisherPluginType)
So(meta.Type, ShouldResemble, PluginType)
})
}

func TestKafkaPlugin(t *testing.T) {
Convey("Create Kafka Publisher", t, func() {
k := NewKafkaPublisher()
Convey("so kafka publisher should not be nil", func() {
Expand All @@ -47,7 +56,7 @@ func TestKafkaPlugin(t *testing.T) {
So(k, ShouldHaveSameTypeAs, &kafkaPublisher{})
})
configPolicy, err := k.GetConfigPolicy()
Convey("k.GetConfigPolicy()", func() {
Convey("Test GetConfigPolicy()", func() {
Convey("So config policy should not be nil", func() {
So(configPolicy, ShouldNotBeNil)
})
Expand All @@ -68,5 +77,15 @@ func TestKafkaPlugin(t *testing.T) {
So(errs.HasErrors(), ShouldBeFalse)
})
})

})
}

func TestFormatMetricTypes(t *testing.T) {
Convey("FormatMetricTypes returns metrics to publish", t, func() {
metrics := formatMetricTypes(mockMts)
So(metrics, ShouldNotBeEmpty)
// formatted metric has namespace represented as a single string
So(metrics[0].Namespace, ShouldEqual, mockMts[0].Namespace().String())
})
}