diff --git a/example/basic/main.go b/example/basic/main.go index 06567750838..879ca975b85 100644 --- a/example/basic/main.go +++ b/example/basic/main.go @@ -65,7 +65,7 @@ func initMeter() *push.Controller { if err != nil { log.Panicf("failed to initialize metric stdout exporter %v", err) } - batcher := defaultkeys.New(selector, metricsdk.DefaultLabelEncoder(), true) + batcher := defaultkeys.New(selector, metricsdk.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exporter, time.Second) pusher.Start() diff --git a/exporter/metric/dogstatsd/dogstatsd.go b/exporter/metric/dogstatsd/dogstatsd.go new file mode 100644 index 00000000000..01c71a65964 --- /dev/null +++ b/exporter/metric/dogstatsd/dogstatsd.go @@ -0,0 +1,75 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dogstatsd // import "go.opentelemetry.io/otel/exporter/metric/dogstatsd" + +import ( + "bytes" + + "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + export "go.opentelemetry.io/otel/sdk/export/metric" +) + +type ( + Config = statsd.Config + + // Exporter implements a dogstatsd-format statsd exporter, + // which encodes label sets as independent fields in the + // output. + // + // TODO: find a link for this syntax. It's been copied out of + // code, not a specification: + // + // https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go + Exporter struct { + *statsd.Exporter + *statsd.LabelEncoder + + ReencodedLabelsCount int + } +) + +var ( + _ export.Exporter = &Exporter{} + _ export.LabelEncoder = &Exporter{} +) + +// New returns a new Dogstatsd-syntax exporter. This type implements +// the metric.LabelEncoder interface, allowing the SDK's unique label +// encoding to be pre-computed for the exporter and stored in the +// LabelSet. +func New(config Config) (*Exporter, error) { + exp := &Exporter{ + LabelEncoder: statsd.NewLabelEncoder(), + } + + var err error + exp.Exporter, err = statsd.NewExporter(config, exp) + return exp, err +} + +// AppendName is part of the stats-internal adapter interface. +func (*Exporter) AppendName(rec export.Record, buf *bytes.Buffer) { + _, _ = buf.WriteString(rec.Descriptor().Name()) +} + +// AppendTags is part of the stats-internal adapter interface. +func (e *Exporter) AppendTags(rec export.Record, buf *bytes.Buffer) { + encoded, inefficient := e.LabelEncoder.ForceEncode(rec.Labels()) + _, _ = buf.WriteString(encoded) + + if inefficient { + e.ReencodedLabelsCount++ + } +} diff --git a/exporter/metric/dogstatsd/dogstatsd_test.go b/exporter/metric/dogstatsd/dogstatsd_test.go new file mode 100644 index 00000000000..c4fc8609dc8 --- /dev/null +++ b/exporter/metric/dogstatsd/dogstatsd_test.go @@ -0,0 +1,69 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package dogstatsd_test + +import ( + "bytes" + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/exporter/metric/dogstatsd" + "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + "go.opentelemetry.io/otel/exporter/metric/test" + export "go.opentelemetry.io/otel/sdk/export/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/counter" +) + +// TestDogstatsLabels that labels are formatted in the correct style, +// whether or not the provided labels were encoded by a statsd label +// encoder. +func TestDogstatsLabels(t *testing.T) { + for inefficientCount, encoder := range []export.LabelEncoder{ + statsd.NewLabelEncoder(), // inefficientCount == 0 + sdk.NewDefaultLabelEncoder(), // inefficientCount == 1 + } { + t.Run(fmt.Sprintf("%T", encoder), func(t *testing.T) { + ctx := context.Background() + checkpointSet := test.NewCheckpointSet(encoder) + + desc := export.NewDescriptor("test.name", export.CounterKind, nil, "", "", core.Int64NumberKind, false) + cagg := counter.New() + _ = cagg.Update(ctx, core.NewInt64Number(123), desc) + cagg.Checkpoint(ctx, desc) + + checkpointSet.Add(desc, cagg, key.New("A").String("B")) + + var buf bytes.Buffer + exp, err := dogstatsd.New(dogstatsd.Config{ + Writer: &buf, + }) + require.Nil(t, err) + require.Equal(t, 0, exp.ReencodedLabelsCount) + + err = exp.Export(ctx, checkpointSet) + require.Nil(t, err) + + require.Equal(t, inefficientCount, exp.ReencodedLabelsCount) + + require.Equal(t, "test.name:123|c|#A:B\n", buf.String()) + }) + } +} diff --git a/exporter/metric/dogstatsd/example_test.go b/exporter/metric/dogstatsd/example_test.go new file mode 100644 index 00000000000..01c5350f72e --- /dev/null +++ b/exporter/metric/dogstatsd/example_test.go @@ -0,0 +1,87 @@ +package dogstatsd_test + +import ( + "context" + "fmt" + "io" + "log" + "sync" + "time" + + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/metric" + "go.opentelemetry.io/otel/exporter/metric/dogstatsd" + "go.opentelemetry.io/otel/sdk/metric/batcher/ungrouped" + "go.opentelemetry.io/otel/sdk/metric/controller/push" + "go.opentelemetry.io/otel/sdk/metric/selector/simple" +) + +func ExampleNew() { + // Create a "server" + wg := &sync.WaitGroup{} + wg.Add(1) + + reader, writer := io.Pipe() + + go func() { + defer wg.Done() + + for { + var buf [4096]byte + n, err := reader.Read(buf[:]) + if err == io.EOF { + return + } else if err != nil { + log.Fatal("Read err: ", err) + } else if n >= len(buf) { + log.Fatal("Read small buffer: ", n) + } else { + fmt.Print(string(buf[0:n])) + } + } + }() + + // Create a meter + selector := simple.NewWithExactMeasure() + exporter, err := dogstatsd.New(dogstatsd.Config{ + // The Writer field provides test support. + Writer: writer, + + // In real code, use the URL field: + // + // URL: fmt.Sprint("unix://", path), + }) + if err != nil { + log.Fatal("Could not initialize dogstatsd exporter:", err) + } + // The ungrouped batcher ensures that the export sees the full + // set of labels as dogstatsd tags. + batcher := ungrouped.New(selector, false) + + // The pusher automatically recognizes that the exporter + // implements the LabelEncoder interface, which ensures the + // export encoding for labels is encoded in the LabelSet. + pusher := push.New(batcher, exporter, time.Hour) + pusher.Start() + + ctx := context.Background() + + key := key.New("key") + + // pusher implements the metric.MeterProvider interface: + meter := pusher.GetMeter("example") + + // Create and update a single counter: + counter := meter.NewInt64Counter("a.counter", metric.WithKeys(key)) + labels := meter.Labels(key.String("value")) + + counter.Add(ctx, 100, labels) + + // Flush the exporter, close the pipe, and wait for the reader. + pusher.Stop() + writer.Close() + wg.Wait() + + // Output: + // a.counter:100|c|#key:value +} diff --git a/exporter/metric/internal/statsd/conn.go b/exporter/metric/internal/statsd/conn.go new file mode 100644 index 00000000000..5f4581c3623 --- /dev/null +++ b/exporter/metric/internal/statsd/conn.go @@ -0,0 +1,288 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsd + +// See https://github.com/b/statsd_spec for the best-available statsd +// syntax specification. See also +// https://github.com/statsd/statsd/edit/master/docs/metric_types.md + +import ( + "bytes" + "context" + "fmt" + "io" + "net" + "net/url" + "strconv" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/unit" + export "go.opentelemetry.io/otel/sdk/export/metric" + "go.opentelemetry.io/otel/sdk/export/metric/aggregator" +) + +type ( + // Config supports common options that apply to statsd exporters. + Config struct { + // URL describes the destination for exporting statsd data. + // e.g., udp://host:port + // tcp://host:port + // unix:///socket/path + URL string + + // Writer is an alternate to providing a URL. When Writer is + // non-nil, URL will be ignored and the exporter will write to + // the configured Writer interface. + Writer io.Writer + + // MaxPacketSize this limits the packet size for packet-oriented transports. + MaxPacketSize int + + // TODO support Dial and Write timeouts + } + + // Exporter is common type meant to implement concrete statsd + // exporters. + Exporter struct { + adapter Adapter + config Config + conn net.Conn + writer io.Writer + buffer bytes.Buffer + } + + // Adapter supports statsd syntax variations, primarily plain + // statsd vs. dogstatsd. + Adapter interface { + AppendName(export.Record, *bytes.Buffer) + AppendTags(export.Record, *bytes.Buffer) + } +) + +const ( + formatCounter = "c" + formatHistogram = "h" + formatGauge = "g" + formatTiming = "ms" + + MaxPacketSize = 1 << 16 +) + +var ( + _ export.Exporter = &Exporter{} + + ErrInvalidScheme = fmt.Errorf("Invalid statsd transport") +) + +// NewExport returns a common implementation for exporters that Export +// statsd syntax. +func NewExporter(config Config, adapter Adapter) (*Exporter, error) { + if config.MaxPacketSize <= 0 { + config.MaxPacketSize = MaxPacketSize + } + var writer io.Writer + var conn net.Conn + var err error + if config.Writer != nil { + writer = config.Writer + } else { + conn, err = dial(config.URL) + if conn != nil { + writer = conn + } + } + // TODO: If err != nil, we return it _with_ a valid exporter; the + // exporter should attempt to re-dial if it's retryable. Add a + // Start() and Stop() API. + return &Exporter{ + adapter: adapter, + config: config, + conn: conn, + writer: writer, + }, err +} + +// dial connects to a statsd service using several common network +// types. Presently "udp" and "unix" datagram socket connections are +// supported. +func dial(endpoint string) (net.Conn, error) { + dest, err := url.Parse(endpoint) + if err != nil { + return nil, err + } + + // TODO: Support tcp destination, need configurable timeouts first. + + scheme := dest.Scheme + switch scheme { + case "udp", "udp4", "udp6": + udpAddr, err := net.ResolveUDPAddr(scheme, dest.Host) + locAddr := &net.UDPAddr{} + if err != nil { + return nil, err + } + conn, err := net.DialUDP(scheme, locAddr, udpAddr) + if err != nil { + return nil, err + } + return conn, err + case "unix", "unixgram": + scheme = "unixgram" + locAddr := &net.UnixAddr{} + + sockAddr, err := net.ResolveUnixAddr(scheme, dest.Path) + if err != nil { + return nil, err + } + conn, err := net.DialUnix(scheme, locAddr, sockAddr) + if err != nil { + return nil, err + } + return conn, err + } + return nil, ErrInvalidScheme +} + +// Export is common code for any statsd-based metric.Exporter implementation. +func (e *Exporter) Export(_ context.Context, checkpointSet export.CheckpointSet) error { + buf := &e.buffer + buf.Reset() + + var aggErr error + var sendErr error + + checkpointSet.ForEach(func(rec export.Record) { + before := buf.Len() + + if err := e.formatMetric(rec, buf); err != nil && aggErr == nil { + aggErr = err + return + } + + if buf.Len() < e.config.MaxPacketSize { + return + } + if before == 0 { + // A single metric >= packet size + if err := e.send(buf.Bytes()); err != nil && sendErr == nil { + sendErr = err + } + buf.Reset() + return + } + + // Send and copy the leftover + if err := e.send(buf.Bytes()[:before]); err != nil && sendErr == nil { + sendErr = err + } + + leftover := buf.Len() - before + + copy(buf.Bytes()[0:leftover], buf.Bytes()[before:]) + + buf.Truncate(leftover) + }) + if err := e.send(buf.Bytes()); err != nil && sendErr == nil { + sendErr = err + } + if sendErr != nil { + return sendErr + } + return aggErr +} + +// send writes a complete buffer to the writer as a blocking call. +func (e *Exporter) send(buf []byte) error { + for len(buf) != 0 { + n, err := e.writer.Write(buf) + if err != nil { + return err + } + buf = buf[n:] + } + return nil +} + +// formatMetric formats an individual export record. For some records +// this will emit a single statistic, for some it will emit more than +// one. +func (e *Exporter) formatMetric(rec export.Record, buf *bytes.Buffer) error { + desc := rec.Descriptor() + agg := rec.Aggregator() + + // TODO handle non-Points Distribution/MaxSumCount by + // formatting individual quantiles, the sum, and the count as + // single statistics. For the dogstatsd variation, assuming + // open-source systems like Veneur add support, figure out the + // proper encoding for "d"-type distribution data. + + if pts, ok := agg.(aggregator.Points); ok { + var format string + if desc.Unit() == unit.Milliseconds { + format = formatTiming + } else { + format = formatHistogram + } + points, err := pts.Points() + if err != nil { + return err + } + for _, pt := range points { + e.formatSingleStat(rec, pt, format, buf) + } + + } else if sum, ok := agg.(aggregator.Sum); ok { + sum, err := sum.Sum() + if err != nil { + return err + } + e.formatSingleStat(rec, sum, formatCounter, buf) + + } else if lv, ok := agg.(aggregator.LastValue); ok { + lv, _, err := lv.LastValue() + if err != nil { + return err + } + e.formatSingleStat(rec, lv, formatGauge, buf) + } + return nil +} + +// formatSingleStat encodes a single item of statsd data followed by a +// newline. +func (e *Exporter) formatSingleStat(rec export.Record, val core.Number, fmtStr string, buf *bytes.Buffer) { + e.adapter.AppendName(rec, buf) + _, _ = buf.WriteRune(':') + writeNumber(buf, val, rec.Descriptor().NumberKind()) + _, _ = buf.WriteRune('|') + _, _ = buf.WriteString(fmtStr) + e.adapter.AppendTags(rec, buf) + _, _ = buf.WriteRune('\n') +} + +func writeNumber(buf *bytes.Buffer, num core.Number, kind core.NumberKind) { + var tmp [128]byte + var conv []byte + switch kind { + case core.Int64NumberKind: + conv = strconv.AppendInt(tmp[:0], num.AsInt64(), 10) + case core.Float64NumberKind: + conv = strconv.AppendFloat(tmp[:0], num.AsFloat64(), 'g', -1, 64) + case core.Uint64NumberKind: + conv = strconv.AppendUint(tmp[:0], num.AsUint64(), 10) + + } + _, _ = buf.Write(conv) +} diff --git a/exporter/metric/internal/statsd/conn_test.go b/exporter/metric/internal/statsd/conn_test.go new file mode 100644 index 00000000000..4bb3d419a64 --- /dev/null +++ b/exporter/metric/internal/statsd/conn_test.go @@ -0,0 +1,341 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsd_test + +import ( + "bytes" + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/api/unit" + "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + "go.opentelemetry.io/otel/exporter/metric/test" + export "go.opentelemetry.io/otel/sdk/export/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/aggregator/array" + "go.opentelemetry.io/otel/sdk/metric/aggregator/counter" + "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" +) + +// withTagsAdapter tests a dogstatsd-style statsd exporter. +type withTagsAdapter struct { + *statsd.LabelEncoder +} + +func (*withTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) { + _, _ = buf.WriteString(rec.Descriptor().Name()) +} + +func (ta *withTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) { + encoded, _ := ta.LabelEncoder.ForceEncode(rec.Labels()) + _, _ = buf.WriteString(encoded) +} + +func newWithTagsAdapter() *withTagsAdapter { + return &withTagsAdapter{ + statsd.NewLabelEncoder(), + } +} + +// noTagsAdapter simulates a plain-statsd exporter that appends tag +// values to the metric name. +type noTagsAdapter struct { +} + +func (*noTagsAdapter) AppendName(rec export.Record, buf *bytes.Buffer) { + _, _ = buf.WriteString(rec.Descriptor().Name()) + + for _, tag := range rec.Labels().Ordered() { + _, _ = buf.WriteString(".") + _, _ = buf.WriteString(tag.Value.Emit()) + } +} + +func (*noTagsAdapter) AppendTags(rec export.Record, buf *bytes.Buffer) { +} + +func newNoTagsAdapter() *noTagsAdapter { + return &noTagsAdapter{} +} + +type testWriter struct { + vec []string +} + +func (w *testWriter) Write(b []byte) (int, error) { + w.vec = append(w.vec, string(b)) + return len(b), nil +} + +func testNumber(desc *export.Descriptor, v float64) core.Number { + if desc.NumberKind() == core.Float64NumberKind { + return core.NewFloat64Number(v) + } + return core.NewInt64Number(int64(v)) +} + +func gaugeAgg(desc *export.Descriptor, v float64) export.Aggregator { + ctx := context.Background() + gagg := gauge.New() + _ = gagg.Update(ctx, testNumber(desc, v), desc) + gagg.Checkpoint(ctx, desc) + return gagg +} + +func counterAgg(desc *export.Descriptor, v float64) export.Aggregator { + ctx := context.Background() + cagg := counter.New() + _ = cagg.Update(ctx, testNumber(desc, v), desc) + cagg.Checkpoint(ctx, desc) + return cagg +} + +func measureAgg(desc *export.Descriptor, v float64) export.Aggregator { + ctx := context.Background() + magg := array.New() + _ = magg.Update(ctx, testNumber(desc, v), desc) + magg.Checkpoint(ctx, desc) + return magg +} + +func TestBasicFormat(t *testing.T) { + type adapterOutput struct { + adapter statsd.Adapter + expected string + } + + for _, ao := range []adapterOutput{{ + adapter: newWithTagsAdapter(), + expected: `counter:%s|c|#A:B,C:D +gauge:%s|g|#A:B,C:D +measure:%s|h|#A:B,C:D +timer:%s|ms|#A:B,C:D +`}, { + adapter: newNoTagsAdapter(), + expected: `counter.B.D:%s|c +gauge.B.D:%s|g +measure.B.D:%s|h +timer.B.D:%s|ms +`}, + } { + adapter := ao.adapter + expected := ao.expected + t.Run(fmt.Sprintf("%T", adapter), func(t *testing.T) { + for _, nkind := range []core.NumberKind{ + core.Float64NumberKind, + core.Int64NumberKind, + } { + t.Run(nkind.String(), func(t *testing.T) { + ctx := context.Background() + writer := &testWriter{} + config := statsd.Config{ + Writer: writer, + MaxPacketSize: 1024, + } + exp, err := statsd.NewExporter(config, adapter) + if err != nil { + t.Fatal("New error: ", err) + } + + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) + cdesc := export.NewDescriptor( + "counter", export.CounterKind, nil, "", "", nkind, false) + gdesc := export.NewDescriptor( + "gauge", export.GaugeKind, nil, "", "", nkind, false) + mdesc := export.NewDescriptor( + "measure", export.MeasureKind, nil, "", "", nkind, false) + tdesc := export.NewDescriptor( + "timer", export.MeasureKind, nil, "", unit.Milliseconds, nkind, false) + + labels := []core.KeyValue{ + key.New("A").String("B"), + key.New("C").String("D"), + } + const value = 123.456 + + checkpointSet.Add(cdesc, counterAgg(cdesc, value), labels...) + checkpointSet.Add(gdesc, gaugeAgg(gdesc, value), labels...) + checkpointSet.Add(mdesc, measureAgg(mdesc, value), labels...) + checkpointSet.Add(tdesc, measureAgg(tdesc, value), labels...) + + err = exp.Export(ctx, checkpointSet) + require.Nil(t, err) + + var vfmt string + if nkind == core.Int64NumberKind { + fv := float64(value) + vfmt = strconv.FormatInt(int64(fv), 10) + } else { + vfmt = strconv.FormatFloat(value, 'g', -1, 64) + } + + require.Equal(t, 1, len(writer.vec)) + require.Equal(t, fmt.Sprintf(expected, vfmt, vfmt, vfmt, vfmt), writer.vec[0]) + }) + } + }) + } +} + +func makeLabels(offset, nkeys int) []core.KeyValue { + r := make([]core.KeyValue, nkeys) + for i := range r { + r[i] = key.New(fmt.Sprint("k", offset+i)).String(fmt.Sprint("v", offset+i)) + } + return r +} + +type splitTestCase struct { + name string + setup func(add func(int)) + check func(expected, got []string, t *testing.T) +} + +var splitTestCases = []splitTestCase{ + // These test use the number of keys to control where packets + // are split. + {"Simple", + func(add func(int)) { + add(1) + add(1000) + add(1) + }, + func(expected, got []string, t *testing.T) { + require.EqualValues(t, expected, got) + }, + }, + {"LastBig", + func(add func(int)) { + add(1) + add(1) + add(1000) + }, + func(expected, got []string, t *testing.T) { + require.Equal(t, 2, len(got)) + require.EqualValues(t, []string{ + expected[0] + expected[1], + expected[2], + }, got) + }, + }, + {"FirstBig", + func(add func(int)) { + add(1000) + add(1) + add(1) + add(1000) + add(1) + add(1) + }, + func(expected, got []string, t *testing.T) { + require.Equal(t, 4, len(got)) + require.EqualValues(t, []string{ + expected[0], + expected[1] + expected[2], + expected[3], + expected[4] + expected[5], + }, got) + }, + }, + {"OneBig", + func(add func(int)) { + add(1000) + }, + func(expected, got []string, t *testing.T) { + require.EqualValues(t, expected, got) + }, + }, + {"LastSmall", + func(add func(int)) { + add(1000) + add(1) + }, + func(expected, got []string, t *testing.T) { + require.EqualValues(t, expected, got) + }, + }, + {"Overflow", + func(add func(int)) { + for i := 0; i < 1000; i++ { + add(1) + } + }, + func(expected, got []string, t *testing.T) { + require.Less(t, 1, len(got)) + require.Equal(t, strings.Join(expected, ""), strings.Join(got, "")) + }, + }, + {"Empty", + func(add func(int)) { + }, + func(expected, got []string, t *testing.T) { + require.Equal(t, 0, len(got)) + }, + }, + {"AllBig", + func(add func(int)) { + add(1000) + add(1000) + add(1000) + }, + func(expected, got []string, t *testing.T) { + require.EqualValues(t, expected, got) + }, + }, +} + +func TestPacketSplit(t *testing.T) { + for _, tcase := range splitTestCases { + t.Run(tcase.name, func(t *testing.T) { + ctx := context.Background() + writer := &testWriter{} + config := statsd.Config{ + Writer: writer, + MaxPacketSize: 1024, + } + adapter := newWithTagsAdapter() + exp, err := statsd.NewExporter(config, adapter) + if err != nil { + t.Fatal("New error: ", err) + } + + checkpointSet := test.NewCheckpointSet(adapter.LabelEncoder) + desc := export.NewDescriptor("counter", export.CounterKind, nil, "", "", core.Int64NumberKind, false) + + var expected []string + + offset := 0 + tcase.setup(func(nkeys int) { + labels := makeLabels(offset, nkeys) + offset += nkeys + expect := fmt.Sprint("counter:100|c", adapter.LabelEncoder.Encode(labels), "\n") + expected = append(expected, expect) + checkpointSet.Add(desc, counterAgg(desc, 100), labels...) + }) + + err = exp.Export(ctx, checkpointSet) + require.Nil(t, err) + + tcase.check(expected, writer.vec, t) + }) + } +} diff --git a/exporter/metric/internal/statsd/labels.go b/exporter/metric/internal/statsd/labels.go new file mode 100644 index 00000000000..efaebd013a2 --- /dev/null +++ b/exporter/metric/internal/statsd/labels.go @@ -0,0 +1,84 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsd + +import ( + "bytes" + "sync" + + "go.opentelemetry.io/otel/api/core" + export "go.opentelemetry.io/otel/sdk/export/metric" +) + +// LabelEncoder encodes metric labels in the dogstatsd syntax. +// +// TODO: find a link for this syntax. It's been copied out of code, +// not a specification: +// +// https://github.com/stripe/veneur/blob/master/sinks/datadog/datadog.go +type LabelEncoder struct { + pool sync.Pool +} + +// sameCheck is used to test whether label encoders are the same. +type sameCheck interface { + isStatsd() +} + +var _ export.LabelEncoder = &LabelEncoder{} + +// NewLabelEncoder returns a new encoder for dogstatsd-syntax metric +// labels. +func NewLabelEncoder() *LabelEncoder { + return &LabelEncoder{ + pool: sync.Pool{ + New: func() interface{} { + return &bytes.Buffer{} + }, + }, + } +} + +// Encode emits a string like "|#key1:value1,key2:value2". +func (e *LabelEncoder) Encode(labels []core.KeyValue) string { + buf := e.pool.Get().(*bytes.Buffer) + defer e.pool.Put(buf) + buf.Reset() + + delimiter := "|#" + + for _, kv := range labels { + _, _ = buf.WriteString(delimiter) + _, _ = buf.WriteString(string(kv.Key)) + _, _ = buf.WriteRune(':') + _, _ = buf.WriteString(kv.Value.Emit()) + delimiter = "," + } + return buf.String() +} + +func (e *LabelEncoder) isStatsd() {} + +// ForceEncode returns a statsd label encoding, even if the exported +// labels were encoded by a different type of encoder. Returns a +// boolean to indicate whether the labels were in fact re-encoded, to +// test for (and warn about) efficiency. +func (e *LabelEncoder) ForceEncode(labels export.Labels) (string, bool) { + if _, ok := labels.Encoder().(sameCheck); ok { + return labels.Encoded(), false + } + + return e.Encode(labels.Ordered()), true +} diff --git a/exporter/metric/internal/statsd/labels_test.go b/exporter/metric/internal/statsd/labels_test.go new file mode 100644 index 00000000000..d3070de6684 --- /dev/null +++ b/exporter/metric/internal/statsd/labels_test.go @@ -0,0 +1,73 @@ +// Copyright 2019, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package statsd_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/api/core" + "go.opentelemetry.io/otel/api/key" + "go.opentelemetry.io/otel/exporter/metric/internal/statsd" + export "go.opentelemetry.io/otel/sdk/export/metric" + sdk "go.opentelemetry.io/otel/sdk/metric" +) + +var testLabels = []core.KeyValue{ + key.New("A").String("B"), + key.New("C").String("D"), + key.New("E").Float64(1.5), +} + +func TestLabelSyntax(t *testing.T) { + encoder := statsd.NewLabelEncoder() + + require.Equal(t, `|#A:B,C:D,E:1.5`, encoder.Encode(testLabels)) + + require.Equal(t, `|#A:B`, encoder.Encode([]core.KeyValue{ + key.New("A").String("B"), + })) + + require.Equal(t, "", encoder.Encode(nil)) +} + +func TestLabelForceEncode(t *testing.T) { + defaultLabelEncoder := sdk.NewDefaultLabelEncoder() + statsdLabelEncoder := statsd.NewLabelEncoder() + + exportLabelsDefault := export.NewLabels(testLabels, defaultLabelEncoder.Encode(testLabels), defaultLabelEncoder) + exportLabelsStatsd := export.NewLabels(testLabels, statsdLabelEncoder.Encode(testLabels), statsdLabelEncoder) + + statsdEncoding := exportLabelsStatsd.Encoded() + require.NotEqual(t, statsdEncoding, exportLabelsDefault.Encoded()) + + forced, repeat := statsdLabelEncoder.ForceEncode(exportLabelsDefault) + require.Equal(t, statsdEncoding, forced) + require.True(t, repeat) + + forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsStatsd) + require.Equal(t, statsdEncoding, forced) + require.False(t, repeat) + + // Check that this works for an embedded implementation. + exportLabelsEmbed := export.NewLabels(testLabels, statsdEncoding, struct { + *statsd.LabelEncoder + }{LabelEncoder: statsdLabelEncoder}) + + forced, repeat = statsdLabelEncoder.ForceEncode(exportLabelsEmbed) + require.Equal(t, statsdEncoding, forced) + require.False(t, repeat) +} diff --git a/exporter/metric/stdout/stdout_test.go b/exporter/metric/stdout/stdout_test.go index aa80ed22036..3865bbaefe5 100644 --- a/exporter/metric/stdout/stdout_test.go +++ b/exporter/metric/stdout/stdout_test.go @@ -79,7 +79,7 @@ func TestStdoutTimestamp(t *testing.T) { before := time.Now() - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) ctx := context.Background() desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Int64NumberKind, false) @@ -125,7 +125,7 @@ func TestStdoutTimestamp(t *testing.T) { func TestStdoutCounterFormat(t *testing.T) { fix := newFixture(t, stdout.Options{}) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.CounterKind, nil, "", "", core.Int64NumberKind, false) cagg := counter.New() @@ -142,7 +142,7 @@ func TestStdoutCounterFormat(t *testing.T) { func TestStdoutGaugeFormat(t *testing.T) { fix := newFixture(t, stdout.Options{}) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Float64NumberKind, false) gagg := gauge.New() @@ -159,7 +159,7 @@ func TestStdoutGaugeFormat(t *testing.T) { func TestStdoutMaxSumCount(t *testing.T) { fix := newFixture(t, stdout.Options{}) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) magg := maxsumcount.New() @@ -179,7 +179,7 @@ func TestStdoutMeasureFormat(t *testing.T) { PrettyPrint: true, }) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) magg := array.New() @@ -223,7 +223,7 @@ func TestStdoutMeasureFormat(t *testing.T) { func TestStdoutAggError(t *testing.T) { fix := newFixture(t, stdout.Options{}) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.MeasureKind, nil, "", "", core.Float64NumberKind, false) magg := ddsketch.New(ddsketch.NewDefaultConfig(), desc) @@ -242,7 +242,7 @@ func TestStdoutAggError(t *testing.T) { func TestStdoutGaugeNotSet(t *testing.T) { fix := newFixture(t, stdout.Options{}) - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) desc := export.NewDescriptor("test.name", export.GaugeKind, nil, "", "", core.Float64NumberKind, false) gagg := gauge.New() diff --git a/sdk/export/metric/aggregator/aggregator.go b/sdk/export/metric/aggregator/aggregator.go index 8e968d5ef72..0a7dbf0bfe4 100644 --- a/sdk/export/metric/aggregator/aggregator.go +++ b/sdk/export/metric/aggregator/aggregator.go @@ -53,6 +53,11 @@ type ( LastValue() (core.Number, time.Time, error) } + // Points returns the raw set of values that were aggregated. + Points interface { + Points() ([]core.Number, error) + } + // MaxSumCount supports the Max, Sum, and Count interfaces. MaxSumCount interface { Sum diff --git a/sdk/metric/aggregator/array/array.go b/sdk/metric/aggregator/array/array.go index e3d7572cb9e..5c61b791d16 100644 --- a/sdk/metric/aggregator/array/array.go +++ b/sdk/metric/aggregator/array/array.go @@ -29,17 +29,18 @@ import ( type ( Aggregator struct { lock sync.Mutex - current Points - checkpoint Points + current points + checkpoint points ckptSum core.Number } - Points []core.Number + points []core.Number ) var _ export.Aggregator = &Aggregator{} var _ aggregator.MaxSumCount = &Aggregator{} var _ aggregator.Distribution = &Aggregator{} +var _ aggregator.Points = &Aggregator{} // New returns a new array aggregator, which aggregates recorded // measurements by storing them in an array. This type uses a mutex @@ -74,6 +75,11 @@ func (c *Aggregator) Quantile(q float64) (core.Number, error) { return c.checkpoint.Quantile(q) } +// Points returns access to the raw data set. +func (c *Aggregator) Points() ([]core.Number, error) { + return c.checkpoint, nil +} + // Checkpoint saves the current state and resets the current state to // the empty set, taking a lock to prevent concurrent Update() calls. func (c *Aggregator) Checkpoint(ctx context.Context, desc *export.Descriptor) { @@ -133,8 +139,8 @@ func (c *Aggregator) sort(kind core.NumberKind) { } } -func combine(a, b Points, kind core.NumberKind) Points { - result := make(Points, 0, len(a)+len(b)) +func combine(a, b points, kind core.NumberKind) points { + result := make(points, 0, len(a)+len(b)) for len(a) != 0 && len(b) != 0 { if a[0].CompareNumber(kind, b[0]) < 0 { @@ -150,25 +156,25 @@ func combine(a, b Points, kind core.NumberKind) Points { return result } -func (p *Points) Len() int { +func (p *points) Len() int { return len(*p) } -func (p *Points) Less(i, j int) bool { +func (p *points) Less(i, j int) bool { // Note this is specialized for int64, because float64 is // handled by `sort.Float64s` and uint64 numbers never appear // in this data. return int64((*p)[i]) < int64((*p)[j]) } -func (p *Points) Swap(i, j int) { +func (p *points) Swap(i, j int) { (*p)[i], (*p)[j] = (*p)[j], (*p)[i] } // Quantile returns the least X such that Pr(x=q, where X is an // element of the data set. This uses the "Nearest-Rank" definition // of a quantile. -func (p *Points) Quantile(q float64) (core.Number, error) { +func (p *points) Quantile(q float64) (core.Number, error) { if len(*p) == 0 { return core.Number(0), aggregator.ErrEmptyDataSet } diff --git a/sdk/metric/batcher/test/test.go b/sdk/metric/batcher/test/test.go index 69941e0e11c..3bbeb70c265 100644 --- a/sdk/metric/batcher/test/test.go +++ b/sdk/metric/batcher/test/test.go @@ -55,7 +55,7 @@ var ( // SdkEncoder uses a non-standard encoder like K1~V1&K2~V2 SdkEncoder = &Encoder{} // GroupEncoder uses the SDK default encoder - GroupEncoder = sdk.DefaultLabelEncoder() + GroupEncoder = sdk.NewDefaultLabelEncoder() // Gauge groups are (labels1), (labels2+labels3) // Counter groups are (labels1+labels2), (labels3) diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index 89867f17446..71d4e755255 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture { bf := &benchFixture{ B: b, } - bf.sdk = sdk.New(bf, sdk.DefaultLabelEncoder()) + bf.sdk = sdk.New(bf, sdk.NewDefaultLabelEncoder()) return bf } diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index fcb586875ad..709c87a3690 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -76,7 +76,7 @@ func New(batcher export.Batcher, exporter export.Exporter, period time.Duration) lencoder, _ := exporter.(export.LabelEncoder) if lencoder == nil { - lencoder = sdk.DefaultLabelEncoder() + lencoder = sdk.NewDefaultLabelEncoder() } return &Controller{ diff --git a/sdk/metric/controller/push/push_test.go b/sdk/metric/controller/push/push_test.go index 0894ad616e3..89bdb6a197a 100644 --- a/sdk/metric/controller/push/push_test.go +++ b/sdk/metric/controller/push/push_test.go @@ -64,7 +64,7 @@ var _ push.Clock = mockClock{} var _ push.Ticker = mockTicker{} func newFixture(t *testing.T) testFixture { - checkpointSet := test.NewCheckpointSet(sdk.DefaultLabelEncoder()) + checkpointSet := test.NewCheckpointSet(sdk.NewDefaultLabelEncoder()) batcher := &testBatcher{ t: t, diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 3b11aeaa8c4..5fdf86e7fe3 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -69,7 +69,7 @@ func TestInputRangeTestCounter(t *testing.T) { t: t, agg: cagg, } - sdk := sdk.New(batcher, sdk.DefaultLabelEncoder()) + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -104,7 +104,7 @@ func TestInputRangeTestMeasure(t *testing.T) { t: t, agg: magg, } - sdk := sdk.New(batcher, sdk.DefaultLabelEncoder()) + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -139,7 +139,7 @@ func TestDisabledInstrument(t *testing.T) { t: t, agg: nil, } - sdk := sdk.New(batcher, sdk.DefaultLabelEncoder()) + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) measure := sdk.NewFloat64Measure("measure.name", metric.WithAbsolute(true)) measure.Record(ctx, -1, sdk.Labels()) @@ -154,7 +154,7 @@ func TestRecordNaN(t *testing.T) { t: t, agg: gauge.New(), } - sdk := sdk.New(batcher, sdk.DefaultLabelEncoder()) + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) var sdkErr error sdk.SetErrorHandler(func(handleErr error) { @@ -188,7 +188,7 @@ func TestSDKLabelEncoder(t *testing.T) { } func TestDefaultLabelEncoder(t *testing.T) { - encoder := sdk.DefaultLabelEncoder() + encoder := sdk.NewDefaultLabelEncoder() encoded := encoder.Encode([]core.KeyValue{key.String("A", "B"), key.String("C", "D")}) require.Equal(t, `A=B,C=D`, encoded) } diff --git a/sdk/metric/example_test.go b/sdk/metric/example_test.go index 478aa08824a..e79d3ebf8c9 100644 --- a/sdk/metric/example_test.go +++ b/sdk/metric/example_test.go @@ -37,7 +37,7 @@ func ExampleNew() { if err != nil { panic(fmt.Sprintln("Could not initialize stdout exporter:", err)) } - batcher := defaultkeys.New(selector, sdk.DefaultLabelEncoder(), true) + batcher := defaultkeys.New(selector, sdk.NewDefaultLabelEncoder(), true) pusher := push.New(batcher, exporter, time.Second) pusher.Start() defer pusher.Stop() diff --git a/sdk/metric/labelencoder.go b/sdk/metric/labelencoder.go index 1be80d9d592..a2bb0798d54 100644 --- a/sdk/metric/labelencoder.go +++ b/sdk/metric/labelencoder.go @@ -35,7 +35,7 @@ type defaultLabelEncoder struct { var _ export.LabelEncoder = &defaultLabelEncoder{} -func DefaultLabelEncoder() export.LabelEncoder { +func NewDefaultLabelEncoder() export.LabelEncoder { return &defaultLabelEncoder{ pool: sync.Pool{ New: func() interface{} { diff --git a/sdk/metric/monotone_test.go b/sdk/metric/monotone_test.go index fbb4ab93dda..e446953cb0a 100644 --- a/sdk/metric/monotone_test.go +++ b/sdk/metric/monotone_test.go @@ -70,7 +70,7 @@ func TestMonotoneGauge(t *testing.T) { batcher := &monotoneBatcher{ t: t, } - sdk := sdk.New(batcher, sdk.DefaultLabelEncoder()) + sdk := sdk.New(batcher, sdk.NewDefaultLabelEncoder()) sdk.SetErrorHandler(func(error) { t.Fatal("Unexpected") }) diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 5fb62b33e4c..6e17802dc28 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -290,7 +290,7 @@ func stressTest(t *testing.T, impl testImpl) { lused: map[string]bool{}, } cc := concurrency() - sdk := sdk.New(fixture, sdk.DefaultLabelEncoder()) + sdk := sdk.New(fixture, sdk.NewDefaultLabelEncoder()) fixture.wg.Add(cc + 1) for i := 0; i < cc; i++ {