Skip to content

Commit

Permalink
Utilizing new client and overhauling Accumulator interface
Browse files Browse the repository at this point in the history
Fixes #280
Fixes #281
  • Loading branch information
sparrc committed Oct 16, 2015
1 parent 73f1ed4 commit a8aaaa9
Showing 1 changed file with 111 additions and 39 deletions.
150 changes: 111 additions & 39 deletions accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,62 +7,134 @@ import (
"sync"
"time"

"github.com/influxdb/influxdb/client"
oldclient "github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/client/v2"
)

// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
sync.Mutex
type Accumulator interface {
Add(measurement string, fields map[string]interface{},
tags map[string]string, t ...time.Time)

client.BatchPoints
BatchPoints() client.BatchPoints

Debug bool
SetDefaultTags(tags map[string]string)
AddDefaultTag(key, value string)

Prefix string
Prefix() string
SetPrefix(prefix string)

Config *ConfiguredPlugin
Debug() bool
SetDebug(enabled bool)
}

// deepcopy returns a deep copy of the BatchPoints object. This is primarily so
// we can do multithreaded output flushing (see Agent.flush)
func (bp *BatchPoints) deepcopy() *BatchPoints {
bp.Lock()
defer bp.Unlock()
func NewAccumulator(
precision string,
database string,
config *ConfiguredPlugin,
) (Accumulator, error) {
var err error
acc := accumulator{}
bpconfig := client.BatchPointsConfig{
Precision: precision,
Database: database,
}
if acc.batch, err = client.NewBatchPoints(bpconfig); err != nil {
return nil, err
}
acc.config = config
return &acc, nil
}

var bpc BatchPoints
bpc.Time = bp.Time
bpc.Precision = bp.Precision
type accumulator struct {
sync.Mutex

bpc.Tags = make(map[string]string)
for k, v := range bp.Tags {
bpc.Tags[k] = v
}
batch client.BatchPoints

defaultTags map[string]string

var pts []client.Point
for _, pt := range bp.Points {
var ptc client.Point
debug bool

ptc.Measurement = pt.Measurement
ptc.Time = pt.Time
ptc.Precision = pt.Precision
ptc.Raw = pt.Raw
config *ConfiguredPlugin

prefix string
}

func (ac *accumulator) Add(
measurement string,
fields map[string]interface{},
tags map[string]string,
t ...time.Time,
) {
ac.Lock()
defer ac.Unlock()

ptc.Tags = make(map[string]string)
ptc.Fields = make(map[string]interface{})
if len(t) > 0 {
timestamp := t[0]
} else {
timestamp := time.Now()
}

for k, v := range pt.Tags {
ptc.Tags[k] = v
if ac.config != nil {
if !ac.config.ShouldPass(measurement, tags) {
return
}
}

for k, v := range pt.Fields {
ptc.Fields[k] = v
for k, v := range ac.defaultTags {
if _, ok := tags[k]; !ok {
tags[k] = v
}
pts = append(pts, ptc)
}

bpc.Points = pts
return &bpc
pt := client.NewPoint(measurement, tags, fields, timestamp)
if ac.debug {
fmt.Println(pt.PrecisionString(ac.batch.Precision()))
}
ac.batch.AddPoint(pt)
}

func (ac *accumulator) BatchPoints() client.BatchPoints {
ac.Lock()
defer ac.Unlock()
return ac.batch
}

func (ac *accumulator) SetDefaultTags(tags map[string]string) {
ac.defaultTags = tags
}

func (ac *accumulator) AddDefaultTag(key, value string) {
ac.defaultTags[key] = value
}

func (ac *accumulator) Prefix() string {
return ac.prefix
}

func (ac *accumulator) SetPrefix(prefix string) {
ac.prefix = prefix
}

func (ac *accumulator) Debug() bool {
return ac.debug
}

func (ac *accumulator) SetDebug(enabled bool) {
ac.debug = enabled
}

// BatchPoints is used to send a batch of data in a single write from telegraf
// to influx
type BatchPoints struct {
sync.Mutex

oldclient.BatchPoints

Debug bool

Prefix string

Config *ConfiguredPlugin
}

// Add adds a measurement
Expand Down Expand Up @@ -116,7 +188,7 @@ func (bp *BatchPoints) AddFieldsWithTime(
// fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
// }

// bp.Points = append(bp.Points, client.Point{
// bp.Points = append(bp.Points, oldclient.Point{
// Measurement: measurement,
// Tags: tags,
// Fields: fields,
Expand Down Expand Up @@ -171,7 +243,7 @@ func (bp *BatchPoints) AddFields(
fmt.Printf("> [%s] %s %s\n", strings.Join(tg, " "), measurement, strings.Join(vals, " "))
}

bp.Points = append(bp.Points, client.Point{
bp.Points = append(bp.Points, oldclient.Point{
Measurement: measurement,
Tags: tags,
Fields: fields,
Expand Down

0 comments on commit a8aaaa9

Please sign in to comment.