Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
iksaif committed Oct 12, 2023
1 parent 70cab92 commit 6d9120b
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 0 deletions.
42 changes: 42 additions & 0 deletions flooder/cmd/flood/flood.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package flood

import (
"github.com/DataDog/datadog-go/v5/flooder/pkg/flood"
"os"
"time"

"github.com/spf13/cobra"
)

// floodCmd represents the base command when called without any subcommands
var floodCmd = &cobra.Command{
Use: "v5",
Short: "Sends a lot of statsd points.",
Run: func(cmd *cobra.Command, args []string) {
flood.Flood(cmd, args)
},
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
err := floodCmd.Execute()
if err != nil {
os.Exit(1)
}
}

func init() {
floodCmd.Flags().StringP("address", "", "127.0.0.1:8125", "Address of the statsd server")
floodCmd.Flags().StringP("telemetry-address", "", "", "Address of the telemetry server")
floodCmd.Flags().BoolP("client-side-aggregation", "", false, "Enable client-side aggregation")
floodCmd.Flags().BoolP("extended-client-side-aggregation", "", false, "Enable extended client-side aggregation")
floodCmd.Flags().BoolP("channel-mode", "", false, "Enable channel mode")
floodCmd.Flags().IntP("channel-mode-buffer-size", "", 4096, "Set channel mode buffer size")
floodCmd.Flags().IntP("sender-queue-size", "", 512, "Set sender queue size")
floodCmd.Flags().DurationP("buffer-flush-interval", "", time.Duration(4)*time.Second, "Set buffer flush interval")
floodCmd.Flags().DurationP("write-timeout", "", time.Duration(100)*time.Millisecond, "Set write timeout")
floodCmd.Flags().StringSliceP("tags", "", []string{}, "Set tags")
floodCmd.Flags().IntP("points-per-10seconds", "", 100000, "Set points per 10 seconds")
floodCmd.Flags().BoolP("send-at-start-of-bucket", "", false, "Send all the points at the start of the 10 sec time bucket.")
}
7 changes: 7 additions & 0 deletions flooder/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package main

import "github.com/DataDog/datadog-go/v5/flooder/cmd/flood"

func main() {
flood.Execute()
}
190 changes: 190 additions & 0 deletions flooder/pkg/flood/flood.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
package flood

import (
"github.com/spf13/cobra"
"hash/fnv"
"log"
"strconv"
"strings"
"time"

"github.com/DataDog/datadog-go/v5/statsd"
)

type client struct {
client *statsd.Client
pointsPer10Seconds int
sendAtStartOfBucket bool
}

func initClient(command *cobra.Command) (*client, error) {
var options []statsd.Option

tags := []string{}

b, err := command.Flags().GetBool("client-side-aggregation")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithClientSideAggregation())
} else {
options = append(options, statsd.WithoutClientSideAggregation())
}
tags = append(tags, "client-side-aggregation:"+strconv.FormatBool(b))

b, err = command.Flags().GetBool("extended-client-side-aggregation")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithExtendedClientSideAggregation())
}
tags = append(tags, "extended-client-side-aggregation"+strconv.FormatBool(b))

b, err = command.Flags().GetBool("channel-mode")
if err != nil {
return nil, err
}
if b {
options = append(options, statsd.WithChannelMode())
}
tags = append(tags, "channel-mode:"+strconv.FormatBool(b))

i, err := command.Flags().GetInt("channel-mode-buffer-size")
if err != nil {
return nil, err
}
options = append(options, statsd.WithChannelModeBufferSize(i))
tags = append(tags, "channel-mode-buffer-size:"+strconv.Itoa(i))

i, err = command.Flags().GetInt("sender-queue-size")
if err != nil {
return nil, err
}
options = append(options, statsd.WithSenderQueueSize(i))
tags = append(tags, "sender-queue-size:"+strconv.Itoa(i))

d, err := command.Flags().GetDuration("buffer-flush-interval")
if err != nil {
return nil, err
}
options = append(options, statsd.WithBufferFlushInterval(d))
tags = append(tags, "buffer-flush-interval:"+d.String())

d, err = command.Flags().GetDuration("write-timeout")
if err != nil {
return nil, err
}
options = append(options, statsd.WithWriteTimeout(d))
tags = append(tags, "write-timeout:"+d.String())

pointsPer10Seconds, err := command.Flags().GetInt("points-per-10seconds")
if err != nil {
return nil, err
}
tags = append(tags, "points-per-10seconds:"+strconv.Itoa(pointsPer10Seconds))

sendAtStart, err := command.Flags().GetBool("send-at-start-of-bucket")
if err != nil {
return nil, err
}
tags = append(tags, "send-at-start-of-bucket:"+strconv.FormatBool(sendAtStart))

address, err := command.Flags().GetString("address")
if err != nil {
return nil, err
}
tags = append(tags, "address:"+address)

transport := "udp"
if strings.HasPrefix(address, statsd.UnixAddressPrefix) {
transport = "unix"
}
tags = append(tags, "transport:"+transport)

telemetryAddress, err := command.Flags().GetString("telemetry-address")
if err != nil {
return nil, err
}
if telemetryAddress == "" {
telemetryAddress = address
}
tags = append(tags, "telemetry-address:"+telemetryAddress)
options = append(options, statsd.WithTelemetryAddr(telemetryAddress))

telemetryTransport := "udp"
if strings.HasPrefix(telemetryAddress, statsd.UnixAddressPrefix) {
telemetryTransport = "unix"
}
tags = append(tags, "telemetry-transport:"+telemetryTransport)

t, err := command.Flags().GetStringSlice("tags")
tags = append(tags, t...)
h := hash(tags)
tags = append(tags, "client-hash:"+strconv.Itoa(int(h)))

options = append(options, statsd.WithTags(tags))

log.Printf("Tags: %v - Hash: %x", tags, h)

options = append(options, statsd.WithOriginDetection())

c, err := statsd.New(address, options...)
if err != nil {
return nil, err
}

return &client{
client: c,
pointsPer10Seconds: pointsPer10Seconds,
sendAtStartOfBucket: sendAtStart,
}, nil
}

func hash(s []string) uint32 {
h := fnv.New32a()
for _, e := range s {
h.Write([]byte(e))
}
return h.Sum32()
}

func Flood(command *cobra.Command, args []string) {
c, err := initClient(command)
if err != nil {
log.Fatal(err)
}
log.Printf("Sending %d points per 10 seconds", c.pointsPer10Seconds)

for {
t1 := time.Now()

for sent := 0; sent < c.pointsPer10Seconds; sent++ {
err := c.client.Incr("flood.dogstatsd.count", []string{}, 1)
if err != nil {
log.Printf("Error: %v", err)
}
if !c.sendAtStartOfBucket {
time.Sleep(time.Duration(8) * time.Second / time.Duration(c.pointsPer10Seconds))
}
}
err := c.client.Count("flood.dogstatsd.expected", int64(c.pointsPer10Seconds), []string{}, 1)
if err != nil {
log.Printf("Error: %v", err)
}

t2 := time.Now()

duration := t2.Sub(t1)
s := time.Duration(10)*time.Second - duration
if s > 0 {
// Sleep until the next bucket
log.Printf("Sleeping for %f seconds", s.Seconds())
time.Sleep(s)
} else {
log.Printf("We're %f seconds behind", s.Seconds()*-1)
}
}
c.client.Close()
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ go 1.13
require (
github.com/Microsoft/go-winio v0.5.0
github.com/golang/mock v1.6.0
github.com/spf13/cobra v1.7.0 // indirect
github.com/stretchr/testify v1.8.1
)
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
github.com/Microsoft/go-winio v0.5.0 h1:Elr9Wn+sGKPlkaBvwu4mTrxtmOp3F3yV9qhaHbXGjwU=
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
Expand Down

0 comments on commit 6d9120b

Please sign in to comment.