From 6d9120baa0133d6381b1cd4cf4664780b1abbff9 Mon Sep 17 00:00:00 2001 From: Corentin Chary Date: Tue, 10 Oct 2023 14:16:08 +0200 Subject: [PATCH] wip --- flooder/cmd/flood/flood.go | 42 ++++++++ flooder/main.go | 7 ++ flooder/pkg/flood/flood.go | 190 +++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 8 ++ 5 files changed, 248 insertions(+) create mode 100644 flooder/cmd/flood/flood.go create mode 100644 flooder/main.go create mode 100644 flooder/pkg/flood/flood.go diff --git a/flooder/cmd/flood/flood.go b/flooder/cmd/flood/flood.go new file mode 100644 index 000000000..f5ef08541 --- /dev/null +++ b/flooder/cmd/flood/flood.go @@ -0,0 +1,42 @@ +package flood + +import ( + "github.com/DataDog/datadog-go/v5/flooder/pkg/flood" + "os" + "time" + + "github.com/spf13/cobra" +) + +// floodCmd represents the base command when called without any subcommands +var floodCmd = &cobra.Command{ + Use: "v5", + Short: "Sends a lot of statsd points.", + Run: func(cmd *cobra.Command, args []string) { + flood.Flood(cmd, args) + }, +} + +// Execute adds all child commands to the root command and sets flags appropriately. +// This is called by main.main(). It only needs to happen once to the rootCmd. +func Execute() { + err := floodCmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func init() { + floodCmd.Flags().StringP("address", "", "127.0.0.1:8125", "Address of the statsd server") + floodCmd.Flags().StringP("telemetry-address", "", "", "Address of the telemetry server") + floodCmd.Flags().BoolP("client-side-aggregation", "", false, "Enable client-side aggregation") + floodCmd.Flags().BoolP("extended-client-side-aggregation", "", false, "Enable extended client-side aggregation") + floodCmd.Flags().BoolP("channel-mode", "", false, "Enable channel mode") + floodCmd.Flags().IntP("channel-mode-buffer-size", "", 4096, "Set channel mode buffer size") + floodCmd.Flags().IntP("sender-queue-size", "", 512, "Set sender queue size") + floodCmd.Flags().DurationP("buffer-flush-interval", "", time.Duration(4)*time.Second, "Set buffer flush interval") + floodCmd.Flags().DurationP("write-timeout", "", time.Duration(100)*time.Millisecond, "Set write timeout") + floodCmd.Flags().StringSliceP("tags", "", []string{}, "Set tags") + floodCmd.Flags().IntP("points-per-10seconds", "", 100000, "Set points per 10 seconds") + floodCmd.Flags().BoolP("send-at-start-of-bucket", "", false, "Send all the points at the start of the 10 sec time bucket.") +} diff --git a/flooder/main.go b/flooder/main.go new file mode 100644 index 000000000..a99c1ec4a --- /dev/null +++ b/flooder/main.go @@ -0,0 +1,7 @@ +package main + +import "github.com/DataDog/datadog-go/v5/flooder/cmd/flood" + +func main() { + flood.Execute() +} diff --git a/flooder/pkg/flood/flood.go b/flooder/pkg/flood/flood.go new file mode 100644 index 000000000..e90ce36f3 --- /dev/null +++ b/flooder/pkg/flood/flood.go @@ -0,0 +1,190 @@ +package flood + +import ( + "github.com/spf13/cobra" + "hash/fnv" + "log" + "strconv" + "strings" + "time" + + "github.com/DataDog/datadog-go/v5/statsd" +) + +type client struct { + client *statsd.Client + pointsPer10Seconds int + sendAtStartOfBucket bool +} + +func initClient(command *cobra.Command) (*client, error) { + var options []statsd.Option + + tags := []string{} + + b, err := command.Flags().GetBool("client-side-aggregation") + if err != nil { + return nil, err + } + if b { + options = append(options, statsd.WithClientSideAggregation()) + } else { + options = append(options, statsd.WithoutClientSideAggregation()) + } + tags = append(tags, "client-side-aggregation:"+strconv.FormatBool(b)) + + b, err = command.Flags().GetBool("extended-client-side-aggregation") + if err != nil { + return nil, err + } + if b { + options = append(options, statsd.WithExtendedClientSideAggregation()) + } + tags = append(tags, "extended-client-side-aggregation"+strconv.FormatBool(b)) + + b, err = command.Flags().GetBool("channel-mode") + if err != nil { + return nil, err + } + if b { + options = append(options, statsd.WithChannelMode()) + } + tags = append(tags, "channel-mode:"+strconv.FormatBool(b)) + + i, err := command.Flags().GetInt("channel-mode-buffer-size") + if err != nil { + return nil, err + } + options = append(options, statsd.WithChannelModeBufferSize(i)) + tags = append(tags, "channel-mode-buffer-size:"+strconv.Itoa(i)) + + i, err = command.Flags().GetInt("sender-queue-size") + if err != nil { + return nil, err + } + options = append(options, statsd.WithSenderQueueSize(i)) + tags = append(tags, "sender-queue-size:"+strconv.Itoa(i)) + + d, err := command.Flags().GetDuration("buffer-flush-interval") + if err != nil { + return nil, err + } + options = append(options, statsd.WithBufferFlushInterval(d)) + tags = append(tags, "buffer-flush-interval:"+d.String()) + + d, err = command.Flags().GetDuration("write-timeout") + if err != nil { + return nil, err + } + options = append(options, statsd.WithWriteTimeout(d)) + tags = append(tags, "write-timeout:"+d.String()) + + pointsPer10Seconds, err := command.Flags().GetInt("points-per-10seconds") + if err != nil { + return nil, err + } + tags = append(tags, "points-per-10seconds:"+strconv.Itoa(pointsPer10Seconds)) + + sendAtStart, err := command.Flags().GetBool("send-at-start-of-bucket") + if err != nil { + return nil, err + } + tags = append(tags, "send-at-start-of-bucket:"+strconv.FormatBool(sendAtStart)) + + address, err := command.Flags().GetString("address") + if err != nil { + return nil, err + } + tags = append(tags, "address:"+address) + + transport := "udp" + if strings.HasPrefix(address, statsd.UnixAddressPrefix) { + transport = "unix" + } + tags = append(tags, "transport:"+transport) + + telemetryAddress, err := command.Flags().GetString("telemetry-address") + if err != nil { + return nil, err + } + if telemetryAddress == "" { + telemetryAddress = address + } + tags = append(tags, "telemetry-address:"+telemetryAddress) + options = append(options, statsd.WithTelemetryAddr(telemetryAddress)) + + telemetryTransport := "udp" + if strings.HasPrefix(telemetryAddress, statsd.UnixAddressPrefix) { + telemetryTransport = "unix" + } + tags = append(tags, "telemetry-transport:"+telemetryTransport) + + t, err := command.Flags().GetStringSlice("tags") + tags = append(tags, t...) + h := hash(tags) + tags = append(tags, "client-hash:"+strconv.Itoa(int(h))) + + options = append(options, statsd.WithTags(tags)) + + log.Printf("Tags: %v - Hash: %x", tags, h) + + options = append(options, statsd.WithOriginDetection()) + + c, err := statsd.New(address, options...) + if err != nil { + return nil, err + } + + return &client{ + client: c, + pointsPer10Seconds: pointsPer10Seconds, + sendAtStartOfBucket: sendAtStart, + }, nil +} + +func hash(s []string) uint32 { + h := fnv.New32a() + for _, e := range s { + h.Write([]byte(e)) + } + return h.Sum32() +} + +func Flood(command *cobra.Command, args []string) { + c, err := initClient(command) + if err != nil { + log.Fatal(err) + } + log.Printf("Sending %d points per 10 seconds", c.pointsPer10Seconds) + + for { + t1 := time.Now() + + for sent := 0; sent < c.pointsPer10Seconds; sent++ { + err := c.client.Incr("flood.dogstatsd.count", []string{}, 1) + if err != nil { + log.Printf("Error: %v", err) + } + if !c.sendAtStartOfBucket { + time.Sleep(time.Duration(8) * time.Second / time.Duration(c.pointsPer10Seconds)) + } + } + err := c.client.Count("flood.dogstatsd.expected", int64(c.pointsPer10Seconds), []string{}, 1) + if err != nil { + log.Printf("Error: %v", err) + } + + t2 := time.Now() + + duration := t2.Sub(t1) + s := time.Duration(10)*time.Second - duration + if s > 0 { + // Sleep until the next bucket + log.Printf("Sleeping for %f seconds", s.Seconds()) + time.Sleep(s) + } else { + log.Printf("We're %f seconds behind", s.Seconds()*-1) + } + } + c.client.Close() +} diff --git a/go.mod b/go.mod index 703c81dbc..b6a6d9883 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,6 @@ go 1.13 require ( github.com/Microsoft/go-winio v0.5.0 github.com/golang/mock v1.6.0 + github.com/spf13/cobra v1.7.0 // indirect github.com/stretchr/testify v1.8.1 ) diff --git a/go.sum b/go.sum index c0e31b595..a170d9aab 100644 --- a/go.sum +++ b/go.sum @@ -1,14 +1,22 @@ github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU= github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84= +github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= +github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= +github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= +github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= +github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=