Skip to content

Commit

Permalink
Adding a channel mode to the client
Browse files Browse the repository at this point in the history
Receive metrics through a channel is much slower than using mutexes but
allow the client to have a timeout and drop metrics if needed.
  • Loading branch information
hush-hush committed Apr 17, 2020
1 parent 45ce2c1 commit 0260f5c
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 36 deletions.
7 changes: 7 additions & 0 deletions statsd/benchmark_report_metric_noop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// +build !go1.13

package statsd_test

import "testing"

func reportMetric(*testing.B, float64, string) {}
9 changes: 9 additions & 0 deletions statsd/benchmark_report_metric_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build go1.13

package statsd_test

import "testing"

func reportMetric(b *testing.B, value float64, unit string) {
b.ReportMetric(value, unit)
}
52 changes: 52 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ var (
DefaultWriteTimeoutUDS = 1 * time.Millisecond
// DefaultTelemetry is the default value for the Telemetry option
DefaultTelemetry = true
// DefaultReceivingingMode is the default behavior when sending metrics
DefaultReceivingMode = MutexMode
// DefaultChannelModeBufferSize is the default size of the channel holding incoming metrics
DefaultChannelModeBufferSize = 4096
)

// Options contains the configuration options for a client.
Expand Down Expand Up @@ -60,6 +64,28 @@ type Options struct {
// Telemetry is a set of metrics automatically injected by the client in the
// dogstatsd stream to be able to monitor the client itself.
Telemetry bool
// ReceiveMode determins the behavior of the client when receiving to many
// metrics. The client will either drop the metrics if its buffers are
// full (ChannelMode mode) or block the caller until the metric can be
// handled (MutexMode mode). By default the client will MutexMode. This
// option should be set to ChannelMode only when use under very high
// load.
//
// MutexMode uses a mutex internally which is much faster than
// channel but causes some lock contention when used with a high number
// of threads. Mutex are sharded based on the metrics name which
// limit mutex contention when goroutines send different metrics.
//
// ChannelMode: uses channel (of ChannelModeBufferSize size) to send
// metrics and drop metrics if the channel is full. Sending metrics in
// this mode is slower that MutexMode (because of the channel), but
// will not block the application. This mode is made for application
// using many goroutines, sending the same metrics at a very high
// volume. The goal is to not slow down the application at the cost of
// dropping metrics and having a lower max throughput.
ReceiveMode ReceivingMode
// ChannelModeBufferSize is the size of the channel holding incoming metrics
ChannelModeBufferSize int
}

func resolveOptions(options []Option) (*Options, error) {
Expand All @@ -74,6 +100,8 @@ func resolveOptions(options []Option) (*Options, error) {
SenderQueueSize: DefaultSenderQueueSize,
WriteTimeoutUDS: DefaultWriteTimeoutUDS,
Telemetry: DefaultTelemetry,
ReceiveMode: DefaultReceivingMode,
ChannelModeBufferSize: DefaultChannelModeBufferSize,
}

for _, option := range options {
Expand Down Expand Up @@ -168,3 +196,27 @@ func WithoutTelemetry() Option {
return nil
}
}

// WithChannelMode will use channel to receive metrics
func WithChannelMode() Option {
return func(o *Options) error {
o.ReceiveMode = ChannelMode
return nil
}
}

// WithMutexModeMode will use mutext to receive metrics
func WithMutexMode() Option {
return func(o *Options) error {
o.ReceiveMode = MutexMode
return nil
}
}

// WithChannelModeBufferSize the channel buffer size when using "drop mode"
func WithChannelModeBufferSize(bufferSize int) Option {
return func(o *Options) error {
o.ChannelModeBufferSize = bufferSize
return nil
}
}
53 changes: 43 additions & 10 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ const (
serviceCheck
)

type ReceivingMode int

const (
MutexMode ReceivingMode = iota
ChannelMode
)

type metric struct {
metricType metricType
namespace string
Expand Down Expand Up @@ -208,13 +215,15 @@ type Client struct {
wg sync.WaitGroup
bufferShards []*worker
closerLock sync.Mutex
receiveMode ReceivingMode
}

// ClientMetrics contains metrics about the client
type ClientMetrics struct {
TotalMetrics uint64
TotalEvents uint64
TotalServiceChecks uint64
TotalMetrics uint64
TotalEvents uint64
TotalServiceChecks uint64
TotalDroppedOnReceive uint64
}

// Verify that Client implements the ClientInterface.
Expand Down Expand Up @@ -297,11 +306,16 @@ func newWithWriter(w statsdWriter, o *Options, writerName string) (*Client, erro
o.SenderQueueSize = DefaultUDPBufferPoolSize
}

c.receiveMode = o.ReceiveMode
c.bufferPool = newBufferPool(o.BufferPoolSize, o.MaxBytesPerPayload, o.MaxMessagesPerPayload)
c.buffer = c.bufferPool.borrowBuffer()
c.sender = newSender(w, o.SenderQueueSize, c.bufferPool)
for i := 0; i < o.BufferShardCount; i++ {
c.bufferShards = append(c.bufferShards, newWorker(c.bufferPool, c.sender))
w := newWorker(c.bufferPool, c.sender)
c.bufferShards = append(c.bufferShards, w)
if c.receiveMode == ChannelMode {
w.startReceivingMetric(o.ChannelModeBufferSize) // TODO make it configurable
}
}
c.flushTime = o.BufferFlushInterval
c.stop = make(chan struct{}, 1)
Expand Down Expand Up @@ -377,10 +391,11 @@ func (c *Client) flushTelemetry() []metric {
m = append(m, metric{metricType: count, name: name, ivalue: value, tags: c.telemetryTags, rate: 1})
}

clientMetrics := c.flushTelemetryMetrics()
clientMetrics := c.FlushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.metrics", int64(clientMetrics.TotalMetrics))
telemetryCount("datadog.dogstatsd.client.events", int64(clientMetrics.TotalEvents))
telemetryCount("datadog.dogstatsd.client.service_checks", int64(clientMetrics.TotalServiceChecks))
telemetryCount("datadog.dogstatsd.client.metric_dropped_on_receive", int64(clientMetrics.TotalDroppedOnReceive))

senderMetrics := c.sender.flushTelemetryMetrics()
telemetryCount("datadog.dogstatsd.client.packets_sent", int64(senderMetrics.TotalSentPayloads))
Expand Down Expand Up @@ -408,11 +423,12 @@ func (c *Client) Flush() error {
return nil
}

func (c *Client) flushTelemetryMetrics() ClientMetrics {
func (c *Client) FlushTelemetryMetrics() ClientMetrics {
return ClientMetrics{
TotalMetrics: atomic.SwapUint64(&c.metrics.TotalMetrics, 0),
TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0),
TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0),
TotalMetrics: atomic.SwapUint64(&c.metrics.TotalMetrics, 0),
TotalEvents: atomic.SwapUint64(&c.metrics.TotalEvents, 0),
TotalServiceChecks: atomic.SwapUint64(&c.metrics.TotalServiceChecks, 0),
TotalDroppedOnReceive: atomic.SwapUint64(&c.metrics.TotalDroppedOnReceive, 0),
}
}

Expand Down Expand Up @@ -441,8 +457,19 @@ func (c *Client) send(m metric) error {
if c == nil {
return ErrNoClient
}

h := hashString32(m.name)
return c.bufferShards[h%uint32(len(c.bufferShards))].processMetric(m)
worker := c.bufferShards[h%uint32(len(c.bufferShards))]

if c.receiveMode == ChannelMode {
select {
case worker.inputMetrics <- m:
default:
atomic.AddUint64(&c.metrics.TotalDroppedOnReceive, 1)
}
return nil
}
return worker.processMetric(m)
}

// Gauge measures the value of a metric at a particular time.
Expand Down Expand Up @@ -537,6 +564,12 @@ func (c *Client) Close() error {
}
close(c.stop)

if c.receiveMode == ChannelMode {
for _, w := range c.bufferShards {
w.stopReceivingMetric()
}
}

// Wait for the threads to stop
c.wg.Wait()

Expand Down
85 changes: 70 additions & 15 deletions statsd/statsd_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package statsd_test

import (
"fmt"
"io"
"log"
"net"
"os"
Expand All @@ -11,7 +12,7 @@ import (
"github.com/DataDog/datadog-go/statsd"
)

func setupUDSClientServer(b *testing.B) (*statsd.Client, net.Listener) {
func setupUDSClientServer(b *testing.B, options []statsd.Option) (*statsd.Client, net.Listener) {
sockAddr := "/tmp/test.sock"
if err := os.RemoveAll(sockAddr); err != nil {
log.Fatal(err)
Expand All @@ -28,14 +29,14 @@ func setupUDSClientServer(b *testing.B) (*statsd.Client, net.Listener) {
}
}
}()
client, err := statsd.New("unix://"+sockAddr, statsd.WithMaxMessagesPerPayload(1024))
client, err := statsd.New("unix://"+sockAddr, options...)
if err != nil {
b.Error(err)
}
return client, conn
}

func setupUDPClientServer(b *testing.B) (*statsd.Client, *net.UDPConn) {
func setupUDPClientServer(b *testing.B, options []statsd.Option) (*statsd.Client, *net.UDPConn) {
addr, err := net.ResolveUDPAddr("udp", ":0")
if err != nil {
b.Error(err)
Expand All @@ -44,24 +45,31 @@ func setupUDPClientServer(b *testing.B) (*statsd.Client, *net.UDPConn) {
if err != nil {
b.Error(err)
}
client, err := statsd.New(conn.LocalAddr().String(), statsd.WithMaxMessagesPerPayload(1024))

client, err := statsd.New(conn.LocalAddr().String(), options...)
if err != nil {
b.Error(err)
}
return client, conn
}

func benchmarkStatsd(b *testing.B, transport string) {
var client *statsd.Client
if transport == "udp" {
var conn *net.UDPConn
client, conn = setupUDPClientServer(b)
defer conn.Close()
func setupClient(b *testing.B, transport string, sendingMode statsd.ReceivingMode) (*statsd.Client, io.Closer) {
options := []statsd.Option{statsd.WithMaxMessagesPerPayload(1024), statsd.WithoutTelemetry()}
if sendingMode == statsd.MutexMode {
options = append(options, statsd.WithMutexMode())
} else {
var conn net.Listener
client, conn = setupUDSClientServer(b)
defer conn.Close()
options = append(options, statsd.WithChannelMode())
}

if transport == "udp" {
return setupUDPClientServer(b, options)
}
return setupUDSClientServer(b, options)
}

func benchmarkStatsdDifferentMetrics(b *testing.B, transport string, sendingMode statsd.ReceivingMode) {
client, conn := setupClient(b, transport, sendingMode)
defer conn.Close()

n := int32(0)
b.ResetTimer()
Expand All @@ -74,11 +82,58 @@ func benchmarkStatsd(b *testing.B, transport string) {
}
})
client.Flush()
t := client.FlushTelemetryMetrics()
reportMetric(b, float64(t.TotalDroppedOnReceive)/float64(t.TotalMetrics)*100, "%_dropRate")

b.StopTimer()
client.Close()
}

func benchmarkStatsdSameMetrics(b *testing.B, transport string, sendingMode statsd.ReceivingMode) {
client, conn := setupClient(b, transport, sendingMode)
defer conn.Close()

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
client.Gauge("test.metric", 1, []string{"tag:tag"}, 1)
}
})
client.Flush()
t := client.FlushTelemetryMetrics()
reportMetric(b, float64(t.TotalDroppedOnReceive)/float64(t.TotalMetrics)*100, "%_dropRate")

b.StopTimer()
client.Close()
}

func BenchmarkStatsdUDP(b *testing.B) { benchmarkStatsd(b, "udp") }
// UDP
func BenchmarkStatsdUDPSameMetricMutext(b *testing.B) {
benchmarkStatsdSameMetrics(b, "udp", statsd.MutexMode)
}
func BenchmarkStatsdUDPSameMetricChannel(b *testing.B) {
benchmarkStatsdSameMetrics(b, "udp", statsd.ChannelMode)
}

func BenchmarkStatsdUDS(b *testing.B) { benchmarkStatsd(b, "uds") }
func BenchmarkStatsdUDPDifferentMetricMutext(b *testing.B) {
benchmarkStatsdDifferentMetrics(b, "udp", statsd.MutexMode)
}
func BenchmarkStatsdUDPDifferentMetricChannel(b *testing.B) {
benchmarkStatsdDifferentMetrics(b, "udp", statsd.ChannelMode)
}

// UDS
func BenchmarkStatsdUDSSameMetricMutext(b *testing.B) {
benchmarkStatsdSameMetrics(b, "uds", statsd.MutexMode)
}
func BenchmarkStatsdUDSSameMetricChannel(b *testing.B) {
benchmarkStatsdSameMetrics(b, "uds", statsd.ChannelMode)
}

func BenchmarkStatsdUDPSifferentMetricMutext(b *testing.B) {
benchmarkStatsdDifferentMetrics(b, "uds", statsd.MutexMode)
}
func BenchmarkStatsdUDSDifferentMetricChannel(b *testing.B) {
benchmarkStatsdDifferentMetrics(b, "uds", statsd.ChannelMode)
}
23 changes: 12 additions & 11 deletions statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,18 @@ func TestTelemetry(t *testing.T) {
metrics := client.flushTelemetry()

expectedMetricsName := map[string]int64{
"datadog.dogstatsd.client.metrics": 9,
"datadog.dogstatsd.client.events": 1,
"datadog.dogstatsd.client.service_checks": 1,
"datadog.dogstatsd.client.packets_sent": 0,
"datadog.dogstatsd.client.bytes_sent": 0,
"datadog.dogstatsd.client.packets_dropped": 0,
"datadog.dogstatsd.client.bytes_dropped": 0,
"datadog.dogstatsd.client.packets_dropped_queue": 0,
"datadog.dogstatsd.client.bytes_dropped_queue": 0,
"datadog.dogstatsd.client.packets_dropped_writer": 0,
"datadog.dogstatsd.client.bytes_dropped_writer": 0,
"datadog.dogstatsd.client.metrics": 9,
"datadog.dogstatsd.client.events": 1,
"datadog.dogstatsd.client.service_checks": 1,
"datadog.dogstatsd.client.metric_dropped_on_receive": 0,
"datadog.dogstatsd.client.packets_sent": 0,
"datadog.dogstatsd.client.bytes_sent": 0,
"datadog.dogstatsd.client.packets_dropped": 0,
"datadog.dogstatsd.client.bytes_dropped": 0,
"datadog.dogstatsd.client.packets_dropped_queue": 0,
"datadog.dogstatsd.client.bytes_dropped_queue": 0,
"datadog.dogstatsd.client.packets_dropped_writer": 0,
"datadog.dogstatsd.client.bytes_dropped_writer": 0,
}

telemetryTags := []string{clientTelemetryTag, clientVersionTelemetryTag, "client_transport:udp"}
Expand Down
Loading

0 comments on commit 0260f5c

Please sign in to comment.