Skip to content

Commit

Permalink
fix conflicts between concurrency and request counts
Browse files Browse the repository at this point in the history
Signed-off-by: ii2day <ji.li@daocloud.io>
  • Loading branch information
ii2day committed Oct 17, 2023
1 parent 45b512a commit 70bea9f
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 55 deletions.
10 changes: 2 additions & 8 deletions pkg/loadRequest/loadDns/dns.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,16 @@ func DnsRequest(logger *zap.Logger, reqData *DnsRequestData) (result *v1beta1.DN

w := &Work{
Concurrency: config.AgentConfig.Configmap.NetdnsDefaultConcurrency,
RequestTimeSecond: reqData.DurationInSecond,
QPS: reqData.Qps,
Timeout: reqData.PerRequestTimeoutInMs,
Msg: new(dns.Msg).SetQuestion(reqData.TargetDomain, reqData.DnsType),
Protocol: string(reqData.Protocol),
ServerAddr: reqData.DnsServerAddr,
EnableLatencyMetric: reqData.EnableLatencyMetric,
Logger: logger.Named("dns-client"),
}
w.Init()

// The monitoring task timed out
if duration > 0 {
go func() {
time.Sleep(duration)
w.Stop()
}()
}
logger.Sugar().Infof("begin to request %v for duration %v ", w.ServerAddr, duration.String())
w.Run()
logger.Sugar().Infof("finish all request %v for %s ", w.report.totalCount, w.ServerAddr)
Expand Down
49 changes: 35 additions & 14 deletions pkg/loadRequest/loadDns/dns_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/kdoctor-io/kdoctor/pkg/k8s/apis/system/v1beta1"
"github.com/kdoctor-io/kdoctor/pkg/utils/stats"
"github.com/miekg/dns"
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sync"
"time"
Expand Down Expand Up @@ -56,23 +57,30 @@ type Work struct {
// Timeout in seconds.
Timeout int

/// RequestTimeSecond request in second
RequestTimeSecond int

// Qps is the rate limit in queries per second.
QPS int

EnableLatencyMetric bool

initOnce sync.Once
results chan *result
stopCh chan struct{}
startTime metav1.Time
report *report
Logger *zap.Logger

initOnce sync.Once
results chan *result
stopCh chan struct{}
qosTokenBucket chan struct{}
startTime metav1.Time
report *report
}

// Init initializes internal data-structures
func (b *Work) Init() {
b.initOnce.Do(func() {
b.results = make(chan *result, maxResult)
b.stopCh = make(chan struct{}, b.Concurrency)
b.qosTokenBucket = make(chan struct{}, b.QPS)
})
}

Expand All @@ -87,6 +95,26 @@ func (b *Work) Run() {
runReporter(b.report)
}()

// Send qps number of tokens to the channel qosTokenBucket every second to the coroutine for execution
go func() {
c := time.After(time.Duration(b.RequestTimeSecond) * time.Second)
ticker := time.NewTicker(time.Second)
for {
select {
case <-c:
// Reach request duration stop request
b.Logger.Sugar().Debugf("request finish remaining number of tokens len: %d", len(b.qosTokenBucket))
b.Stop()
return
case <-ticker.C:
b.Logger.Sugar().Debugf("request token channel len: %d", len(b.qosTokenBucket))
for i := 0; i < b.QPS; i++ {
b.qosTokenBucket <- struct{}{}
}
}
}
}()

b.runWorkers()
b.Finish()
}
Expand All @@ -100,6 +128,7 @@ func (b *Work) Stop() {

func (b *Work) Finish() {
close(b.results)
close(b.qosTokenBucket)
total := metav1.Now().Sub(b.startTime.Time)
// Wait until the reporter is done.
<-b.report.done
Expand All @@ -117,10 +146,6 @@ func (b *Work) makeRequest(client *dns.Client, conn *dns.Conn, wg *sync.WaitGrou
}

func (b *Work) runWorker() {
var ticker *time.Ticker
if b.QPS > 0 {
ticker = time.NewTicker(time.Duration(1e6*b.Concurrency/(b.QPS)) * time.Microsecond)
}
client := new(dns.Client)
client.Net = b.Protocol
client.Timeout = time.Duration(b.Timeout) * time.Millisecond
Expand All @@ -139,12 +164,8 @@ func (b *Work) runWorker() {
case <-b.stopCh:
wg.Wait()
return
default:
if b.QPS > 0 {
<-ticker.C
}
case <-b.qosTokenBucket:
wg.Add(1)

// check connect close
// if close new connect
if conn == nil {
Expand Down
9 changes: 2 additions & 7 deletions pkg/loadRequest/loadHttp/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func HttpRequest(logger *zap.Logger, reqData *HttpRequestData) *v1beta1.HttpMetr

w := &Work{
Request: req,
RequestTimeSecond: reqData.RequestTimeSecond,
Concurrency: config.AgentConfig.Configmap.NethttpDefaultConcurrency,
QPS: reqData.Qps,
Timeout: reqData.PerRequestTimeoutMS,
Expand All @@ -88,16 +89,10 @@ func HttpRequest(logger *zap.Logger, reqData *HttpRequestData) *v1beta1.HttpMetr
ExpectStatusCode: reqData.ExpectStatusCode,
RequestBody: reqData.Body,
EnableLatencyMetric: reqData.EnableLatencyMetric,
Logger: logger.Named("http-client"),
}
logger.Sugar().Infof("do http requests work=%v", w)
w.Init()

// The monitoring task timed out
go func() {
time.Sleep(duration)
w.Stop()
}()

logger.Sugar().Infof("begin to request %v for duration %v ", w.Request.URL, duration.String())
w.Run()
logger.Sugar().Infof("finish all request %v for %s ", w.report.totalCount, w.Request.URL)
Expand Down
63 changes: 42 additions & 21 deletions pkg/loadRequest/loadHttp/http_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,21 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"github.com/kdoctor-io/kdoctor/pkg/k8s/apis/system/v1beta1"
config "github.com/kdoctor-io/kdoctor/pkg/types"
"github.com/kdoctor-io/kdoctor/pkg/utils/stats"
"golang.org/x/net/http2"
"io"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"net/http/httptrace"
"net/url"
"strconv"
"sync"
"time"

"github.com/kdoctor-io/kdoctor/pkg/k8s/apis/system/v1beta1"
config "github.com/kdoctor-io/kdoctor/pkg/types"
"github.com/kdoctor-io/kdoctor/pkg/utils/stats"
"go.uber.org/zap"

"golang.org/x/net/http2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Max size of the buffer of result channel.
Expand Down Expand Up @@ -114,6 +117,9 @@ type Work struct {
// Timeout in seconds.
Timeout int

/// RequestTimeSecond request in second
RequestTimeSecond int

// Qps is the rate limit in queries per second.
QPS int

Expand Down Expand Up @@ -142,19 +148,23 @@ type Work struct {
// Optional.
EnableLatencyMetric bool

initOnce sync.Once
results chan *result
stopCh chan struct{}
start time.Duration
startTime metav1.Time
report *report
Logger *zap.Logger

initOnce sync.Once
results chan *result
stopCh chan struct{}
qosTokenBucket chan struct{}
start time.Duration
startTime metav1.Time
report *report
}

// Init initializes internal data-structures
func (b *Work) Init() {
b.initOnce.Do(func() {
b.results = make(chan *result, MaxResultChannelSize)
b.stopCh = make(chan struct{}, b.Concurrency)
b.qosTokenBucket = make(chan struct{}, b.QPS)
})
}

Expand All @@ -170,6 +180,25 @@ func (b *Work) Run() {
runReporter(b.report)
}()

// Send qps number of tokens to the channel qosTokenBucket every second to the coroutine for execution
go func() {
c := time.After(time.Duration(b.RequestTimeSecond) * time.Second)
ticker := time.NewTicker(time.Second)
for {
select {
case <-c:
// Reach request duration stop request
b.Logger.Sugar().Debugf("request finish remaining number of tokens len: %d", len(b.qosTokenBucket))
b.Stop()
return
case <-ticker.C:
b.Logger.Sugar().Debugf("request token channel len: %d", len(b.qosTokenBucket))
for i := 0; i < b.QPS; i++ {
b.qosTokenBucket <- struct{}{}
}
}
}
}()
b.runWorkers()
b.Finish()
}
Expand All @@ -183,6 +212,7 @@ func (b *Work) Stop() {

func (b *Work) Finish() {
close(b.results)
close(b.qosTokenBucket)
total := b.now() - b.start
// Wait until the reporter is done.
<-b.report.done
Expand Down Expand Up @@ -253,11 +283,6 @@ func (b *Work) makeRequest(c *http.Client, wg *sync.WaitGroup) {
}

func (b *Work) runWorker() {
var ticker *time.Ticker
if b.QPS > 0 {
ticker = time.NewTicker(time.Duration(1e6*b.Concurrency/(b.QPS)) * time.Microsecond)
}

tr := &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{b.Cert},
Expand Down Expand Up @@ -297,14 +322,10 @@ func (b *Work) runWorker() {
wg.Wait()
client.CloseIdleConnections()
return
default:
if b.QPS > 0 {
<-ticker.C
}
case <-b.qosTokenBucket:
wg.Add(1)
go b.makeRequest(client, wg)
}

}
}

Expand Down
2 changes: 0 additions & 2 deletions test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,6 @@ deploy_project:
HELM_OPTION+=" --set kdoctorAgent.ingress.enable=false " ; \
fi ; \
HELM_OPTION+=" --set feature.aggregateReport.enabled=true " ; \
HELM_OPTION+=" --set feature.nethttp_defaultConcurrency=10 " ; \
HELM_OPTION+=" --set feature.netdns_defaultConcurrency=10 " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.cpu=100m " ; \
HELM_OPTION+=" --set kdoctorAgent.resources.requests.memory=128Mi " ; \
HELM_OPTION+=" --set feature.aggregateReport.controller.reportHostPath=/var/run/kdoctor/controller " ; \
Expand Down
6 changes: 3 additions & 3 deletions test/e2e/common/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
// qps
expectRequestCount := float64(rs.Spec.Request.QPS * rs.Spec.Request.DurationInSecond)
realRequestCount := float64(m.Metrics.RequestCounts)
if math.Abs(realRequestCount-expectRequestCount)/expectRequestCount > 0.1 {
if math.Abs(realRequestCount-expectRequestCount)/expectRequestCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.1,real request count: %d,expect request count:%d", int(realRequestCount), int(expectRequestCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
Expand Down Expand Up @@ -298,7 +298,7 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
realCount := float64(m.Metrics.RequestCounts)
// report request count
reportRequestCount += m.Metrics.RequestCounts
if math.Abs(realCount-expectCount)/expectCount > 0.1 {
if math.Abs(realCount-expectCount)/expectCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.1,real request count: %d,expect request count:%d", int(realCount), int(expectCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
Expand Down Expand Up @@ -363,7 +363,7 @@ func CompareResult(f *frame.Framework, name, taskKind string, podIPs []string, n
realCount := float64(m.Metrics.RequestCounts)
// report request count
reportRequestCount += m.Metrics.RequestCounts
if math.Abs(realCount-expectCount)/expectCount > 0.1 {
if math.Abs(realCount-expectCount)/expectCount > 0.05 {
return GetResultFromReport(r), fmt.Errorf("The error in the number of requests is greater than 0.1, real request count: %d,expect request count:%d ", int(realCount), int(expectCount))
}
if float64(m.Metrics.SuccessCounts)/float64(m.Metrics.RequestCounts) != m.SucceedRate {
Expand Down

0 comments on commit 70bea9f

Please sign in to comment.