Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ var (
maxLengthBytes string
maxAge int
maxSegmentSizeBytes string
consumerOffest string
consumerOffset string
printStatsV bool
rate int
variableRate int
Expand All @@ -41,6 +41,7 @@ var (
batchSize int
exitOnError bool
debugLogs bool
runDuration int
)

func init() {
Expand All @@ -58,14 +59,15 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().IntVarP(&variableRate, "variable-rate", "", 0, "Variable rate to value")
baseCmd.PersistentFlags().IntVarP(&variableBody, "variable-body", "", 0, "Variable body size")
baseCmd.PersistentFlags().IntVarP(&fixedBody, "fixed-body", "", 0, "Body size")
baseCmd.PersistentFlags().IntVarP(&runDuration, "time", "", 0, "Run Duration in seconds ( stop the test)")
baseCmd.PersistentFlags().BoolVarP(&exitOnError, "exit-on-error", "", true, "Close the app in case of error")
baseCmd.PersistentFlags().BoolVarP(&printStatsV, "print-stats", "", true, "Print stats")
baseCmd.PersistentFlags().BoolVarP(&debugLogs, "debug-logs", "", false, "Enable debug logs")
baseCmd.PersistentFlags().StringSliceVarP(&streams, "streams", "", []string{"perf-test-go"}, "Stream names")
baseCmd.PersistentFlags().StringVarP(&maxLengthBytes, "max-length-bytes", "", "0", "Stream max length bytes, e.g. 10MB, 50GB, etc.")
baseCmd.PersistentFlags().IntVarP(&maxAge, "max-age", "", 0, "Stream Age in hours, e.g. 1,2.. 24 , etc.")
baseCmd.PersistentFlags().StringVarP(&maxSegmentSizeBytes, "stream-max-segment-size-bytes", "", "500MB", "Stream segment size bytes, e.g. 10MB, 1GB, etc.")
baseCmd.PersistentFlags().StringVarP(&consumerOffest, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next of number")
baseCmd.PersistentFlags().StringVarP(&consumerOffset, "consumer-offset", "", "first", "Staring consuming, ex: first,last,next or random")
baseCmd.AddCommand(versionCmd)
baseCmd.AddCommand(newSilent())
}
Expand Down
84 changes: 58 additions & 26 deletions perfTest/cmd/silent.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"encoding/binary"
"fmt"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha"
Expand Down Expand Up @@ -45,6 +46,25 @@ var (
simulEnvironment *stream.Environment
)

func checkRunDuration() {
if runDuration > 0 {
start := time.Now()
ticker := time.NewTicker(10 * time.Second)
go func() {
for {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Seconds()
if v >= float64(runDuration) {
logInfo("Stopping after %s seconds", runDuration)
os.Exit(0)
}
}
}
}()
}
}

func printStats() {
if printStatsV {
start := time.Now()
Expand All @@ -54,20 +74,24 @@ func printStats() {
select {
case _ = <-ticker.C:
v := time.Now().Sub(start).Milliseconds()
start = time.Now()

PMessagesPerSecond := float64(atomic.LoadInt32(&publisherMessageCount)) / float64(v) * 1000
CMessagesPerSecond := float64(atomic.LoadInt32(&consumerMessageCount)) / float64(v) * 1000
ConfirmedMessagesPerSecond := float64(atomic.LoadInt32(&confirmedMessageCount)) / float64(v) * 1000

//if PMessagesPerSecond > 0 ||
// ConfirmedMessagesPerSecond > 0 ||
// CMessagesPerSecond > 0 ||
// consumersCloseCount > 0 ||
// publishErrors > 0 {
logInfo("Published %8.2f msg/s | Confirmed %8.2f msg/s | Consumed %8.2f msg/s | Cons. closed %3v | Pub errors %3v | %3v | %3v | msg sent: %3v |",
PMessagesPerSecond, ConfirmedMessagesPerSecond, CMessagesPerSecond, consumersCloseCount, publishErrors, decodeRate(), decodeBody(), atomic.LoadInt64(&messagesSent))
//}
}
}

}()
tickerReset := time.NewTicker(1 * time.Minute)
go func() {
for {
select {
case _ = <-tickerReset.C:
start = time.Now()

atomic.SwapInt32(&publisherMessageCount, 0)
atomic.SwapInt32(&consumerMessageCount, 0)
atomic.SwapInt32(&confirmedMessageCount, 0)
Expand Down Expand Up @@ -141,12 +165,14 @@ func startSimulation() error {
checkErr(err)
}
printStats()
checkRunDuration()

return err
}

func checkErr(err error) {
if err != nil {
logError("error: %s", err)
if exitOnError {
os.Exit(1)
}
Expand All @@ -172,10 +198,6 @@ func initStreams() error {
}

for _, streamName := range streams {
streamMetadata, err := env.StreamMetaData(streamName)
checkErr(err)
logInfo("stream %s, meta data: %s", streamName, streamMetadata)

err = env.DeclareStream(
streamName,
stream.NewStreamOptions().
Expand All @@ -195,6 +217,11 @@ func initStreams() error {
return err
}
}

streamMetadata, err := env.StreamMetaData(streamName)
checkErr(err)
logInfo("stream %s, meta data: %s", streamName, streamMetadata)

}
logInfo("End Init streams :%s\n", streams)
return env.Close()
Expand Down Expand Up @@ -240,27 +267,22 @@ func startPublisher(streamName string) error {
handlePublishError(chPublishError)

var arr []message.StreamMessage
var body string
var body []byte
for z := 0; z < batchSize; z++ {
body = fmt.Sprintf("simul_message")

if fixedBody > 0 {
body = ""
for i := 0; i < fixedBody; i++ {
body += "s"
}
body = make([]byte, fixedBody)
} else {
if variableBody > 0 {
body = ""
rand.Seed(time.Now().UnixNano())
n := rand.Intn(variableBody)
for i := 0; i < n; i++ {
body += "s"
}
body = make([]byte, rand.Intn(variableBody))
}
}

arr = append(arr, amqp.NewMessage([]byte(body)))
n := time.Now().UnixNano()
var buff = make([]byte, 8)
binary.BigEndian.PutUint64(buff, uint64(n))
msg := amqp.NewMessage(append(buff, body...))
arr = append(arr, msg)
}

go func(prod *ha.ReliableProducer, messages []message.StreamMessage) {
Expand Down Expand Up @@ -335,18 +357,28 @@ func handleConsumerClose(channelClose stream.ChannelClose) {
func startConsumer(consumerName string, streamName string) error {

handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
//logError("consumerMessageCount StoreOffset: %s", consumerMessageCount)
atomic.AddInt32(&consumerMessageCount, 1)

}
offsetSpec := stream.OffsetSpecification{}.Last()
switch consumerOffest {
switch consumerOffset {
case "last":
offsetSpec = stream.OffsetSpecification{}.Last()
case "first":
offsetSpec = stream.OffsetSpecification{}.First()
case "next":
offsetSpec = stream.OffsetSpecification{}.Next()
case "random":
rand.Seed(time.Now().UnixNano())
n := rand.Intn(3)
switch n {
case 0:
offsetSpec = stream.OffsetSpecification{}.First()
case 1:
offsetSpec = stream.OffsetSpecification{}.Next()
case 2:
offsetSpec = stream.OffsetSpecification{}.Last()
}
}

logInfo("Starting consumer number: %s, form %s", consumerName, offsetSpec)
Expand Down
2 changes: 1 addition & 1 deletion pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ type StreamMetadata struct {
func (sm StreamMetadata) String() string {
replicas := ""
for _, replica := range sm.Replicas {
replicas += fmt.Sprintf("%s:%s", replica.Host, replica.Port)
replicas += fmt.Sprintf(" - %s:%s", replica.Host, replica.Port)
}
return fmt.Sprintf("leader %s:%s, followers %s ", sm.Leader.Host, sm.Leader.Port, replicas)
}
Expand Down