Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rate limiting queue for packet-in #2015

Merged
merged 2 commits into from
Apr 7, 2021

Conversation

GraysonWu
Copy link
Contributor

Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
I have tested it using hping3 to simulate packet flood and agent can handle packet-in under the rate limit.

Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GraysonWu please update the rate as suggested and provide CPU usage data with & without your change, otherwise it's hard to evaluate the value of the PR and whether it does address the issue. Also provide the parameters you use to run hping3 and generate traffic.

Comment on lines 65 to 69
if reason == uint8(PacketInReasonTF) {
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Inf, 10)}, string(reason))
} else {
featurePacketIn.packetInQueue = workqueue.NewNamedRateLimitingQueue(&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(5), 10)}, string(reason))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a rate limit of 5 is a bit too conservative, how about 100? With a burst size of 200?
I feel like we can also use this for Traceflow

@@ -108,7 +114,7 @@ func (c *client) subscribeFeaturePacketIn(featurePacketIn *featureStartPacketIn)
return nil
}

func (c *client) parsePacketIn(packetInQueue workqueue.Interface, packetHandlerReason uint8) {
func (c *client) parsePacketIn(packetInQueue workqueue.RateLimitingInterface, packetHandlerReason uint8) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call packetInQueue.Forget(obj) after packetInQueue.Done(obj) even though it is a no-op (and add a comment to mention it is a no-op for the BucketRateLimiter implementation

@@ -71,7 +77,7 @@ func (f *featureStartPacketIn) ListenPacketIn() {
case pktIn := <-f.subscribeCh:
// Ensure that the queue doesn't grow too big. This is NOT to provide an exact guarantee.
if f.packetInQueue.Len() < packetInQueueSize {
f.packetInQueue.Add(pktIn)
f.packetInQueue.AddRateLimited(pktIn)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell from code, AddRateLimited adds a waitFor item to a channel of 1000 buckets if there is no available rate limter bucket, so it will be blocking when the channel is full. And it doesn't drop any items even it exceeds its rate limit.
Maybe the length check against the queue can avoid appending more items when it exceeds the length of the queue. However the length of the ratelimitingQueue is the items that are ready to be processed (their ratelimiting waiting time have expired), not including the items in the waitingForAddCh channel. If the consumers process the ready items very quickly, it could happen that there are 1000 waiting items in the waitingForAddCh and 0 ready items in the queue, and a waiting item is poped from the channel and pushed to the queue every 10ms (if the rate limit is 100), and each incoming packet-in event will block here for 10ms when calling AddRateLimited and they will not be handled properly as it will only be handled after 10s (1000*10ms) when Traceflow request or connection already expire.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to drop the packets here. Otherwise the backpressure will propagate to ofnet, and we will essentially block the loop that receives all messages from the switch (including bundle acknowledgements?): https://github.com/wenyingd/ofnet/blob/171b6795a2da8d488d38ae318ca3ce043481fc59/ofctrl/ofSwitch.go#L306

I am not a big fan of the current architecture: unless I am missing something, I think we have one channel / queue too many (subscribeCh and packetInQueue) and that everything could be done with a single channel.

My preference would be to change and simplify the code as follows:

  • remove packetInQueue altogether
  • make subscribeCh a buffered channel
  • change PacketRcvd (ofctfl_bridge.go) to drop the packet if the subscribe channel is full (avoid backpressure to ofnet)
  • when calling SubscribePacketIn, provide a rate limiter to control how fast packets can be dequeued by a consumer (this ensures low CPU usage of consumer)

maybe we can define a simple queue like this one:

type PacketInQueue struct {
    rateLimiter *rate.Limiter
    packetsCh chan *ofctrl.PacketIn
}

func NewPacketInQueue(size int, r rate.Limit) *PacketInQueue {
    return &PacketInQueue{rateLimiter: rate.NewLimiter(r, 1), packetsCh: make(chan *ofctrl.PacketIn, size)}
}

func (q *PacketInQueue) AddOrDrop(packet *ofctrl.PacketIn) bool {
    select {
        case q.packetsCh <- packet:
            return true
        default:
            // channel is full
            return false
    }
}

func (q *PacketInQueue) GetRateLimited(stopCh <-chan struct{}) *ofctrl.PacketIn {
    when := q.rateLimiter.Reserve().Delay()
    t := time.NewTimer(when)
    defer t.Stop()
    for {
        select {
            case <- stopCh:
                return nil
            case <- t.C:
                break
       }
    }
    for {
        select {
            case <- stopCh:
                return nil
            case packet := <- q.packetsCh:
                return packet
       }
    }
}

// receiver side:
go func() {
    for {
        packet := q.GetRateLimited(stopCh)
        if packet == nil {
            return
        }
        // call all registered handlers
    }
}

Of course we should continue to have one instance of PacketInQueue per packet type so that Traceflow packets get their own queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After my test (hping3 --flood 10.10.1.114 -p 80 -2), there is no significant difference in CPU usage whether we use this rateLimitQueue. I guess continuously calling AddRateLimited at a high rate also costs a lot of CPU? I think @antoninbas 's simplified code looks good and we can drop the packet over rate in this way. To make sure the Traceflow won't be choked or dropped by other packet-in messages, maybe make TraceFlow has its own queue would be better?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I wrote at the end, yes Traceflow should have its own queue

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad miss the end statement.
Then let me try if this way can save the CPU usage.

Before we add rate limiting mechanism in OVS, we use RateLimitingQueue for packet-in event in Antrea agent.
@GraysonWu
Copy link
Contributor Author

GraysonWu commented Apr 1, 2021

Pushed with what @antoninbas suggested here #2015 (comment). Tested using hping3 --flood 10.10.1.114 -p 80 -2.
With channel buffer size 200, rate.Limit(100, 1), CPU usage:

top - 23:58:23 up 5 days, 21:21,  0 users,  load average: 5.88, 2.67, 1.39
Tasks:   3 total,   1 running,   2 sleeping,   0 stopped,   0 zombie
%Cpu(s): 39.9 us, 44.9 sy,  0.0 ni,  0.8 id,  0.0 wa,  0.0 hi, 14.4 si,  0.0 st
MiB Mem :   1987.6 total,     71.8 free,    554.7 used,   1361.0 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used.   1246.2 avail Mem

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
      1 root      20   0 1340984  45516  24148 S  57.8   2.2   1:04.76 antrea-agent
     68 root      20   0    4240   2908   2324 S   0.0   0.1   0:00.01 bash
     77 root      20   0    6092   2468   1956 R   0.0   0.1   0:00.01 top

With channel buffer size 200, rate.Limit(rate.Inf, 1), CPU usage:

top - 00:10:30 up 5 days, 21:34,  0 users,  load average: 5.63, 2.54, 1.61
Tasks:   3 total,   1 running,   2 sleeping,   0 stopped,   0 zombie
%Cpu(s): 42.6 us, 47.3 sy,  0.0 ni,  0.5 id,  0.0 wa,  0.0 hi,  9.5 si,  0.0 st
MiB Mem :   1987.6 total,     75.5 free,    595.8 used,   1316.3 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used.   1213.8 avail Mem

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
      1 root      20   0 1342260  75116  22680 S  73.8   3.7   1:07.24 antrea-agent
     66 root      20   0    4240   2284   1832 S   0.0   0.1   0:00.01 bash
     75 root      20   0    6092   3136   2624 R   0.0   0.2   0:00.00 top

@antoninbas
Copy link
Contributor

Thanks @GraysonWu. At leasts we are not making things a bit better in terms of both CPU & memory :)
TBH I was expecting worst results without the rate limiting. Are you using NP logging or NP with reject action for your benchmark? Would you consider starting another hping from a different Pod as well?
Another benefit IMO of the change is that we will no longer be blocking the message handling loop in ofnet because of the unbuffered packet in receive channel, which could have affected other operations.

@GraysonWu
Copy link
Contributor Author

Thanks @GraysonWu. At leasts we are not making things a bit better in terms of both CPU & memory :)
TBH I was expecting worst results without the rate limiting. Are you using NP logging or NP with reject action for your benchmark? Would you consider starting another hping from a different Pod as well?
Another benefit IMO of the change is that we will no longer be blocking the message handling loop in ofnet because of the unbuffered packet in receive channel, which could have affected other operations.

It's NP with reject action. Let me try more hping to see the result.

@antoninbas
Copy link
Contributor

@GraysonWu could your try logging as well?

@GraysonWu
Copy link
Contributor Author

@GraysonWu could your try logging as well?

Sure.

@GraysonWu GraysonWu force-pushed the rate-limit-queue branch 2 times, most recently from 4911d6a to 9222dd5 Compare April 1, 2021 04:22
@codecov-io
Copy link

codecov-io commented Apr 1, 2021

Codecov Report

Merging #2015 (e5066af) into main (08ea67c) will decrease coverage by 23.13%.
The diff coverage is 83.87%.

Impacted file tree graph

@@             Coverage Diff             @@
##             main    #2015       +/-   ##
===========================================
- Coverage   65.12%   41.99%   -23.14%     
===========================================
  Files         197      256       +59     
  Lines       17407    18773     +1366     
===========================================
- Hits        11336     7883     -3453     
- Misses       4883     9794     +4911     
+ Partials     1188     1096       -92     
Flag Coverage Δ
kind-e2e-tests 41.99% <83.87%> (-14.18%) ⬇️
unit-tests ?

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
pkg/agent/openflow/client.go 46.80% <ø> (-16.86%) ⬇️
pkg/ovs/openflow/ofctrl_bridge.go 50.00% <82.60%> (+2.17%) ⬆️
pkg/agent/openflow/packetin.go 59.25% <87.50%> (-3.54%) ⬇️
pkg/controller/networkpolicy/crd_utils.go 0.00% <0.00%> (-88.61%) ⬇️
pkg/controller/networkpolicy/endpoint_querier.go 2.85% <0.00%> (-88.58%) ⬇️
...g/controller/networkpolicy/clusternetworkpolicy.go 0.00% <0.00%> (-87.15%) ⬇️
pkg/controller/networkpolicy/status_controller.go 0.00% <0.00%> (-86.85%) ⬇️
...kg/controller/networkpolicy/antreanetworkpolicy.go 0.00% <0.00%> (-85.00%) ⬇️
pkg/controller/networkpolicy/clustergroup.go 1.37% <0.00%> (-84.74%) ⬇️
pkg/agent/util/iptables/lock.go 0.00% <0.00%> (-81.82%) ⬇️
... and 179 more

Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code looks good to me, let's see if @tnqn has an objection to the change

pkg/agent/openflow/packetin.go Outdated Show resolved Hide resolved
pkg/agent/openflow/client.go Outdated Show resolved Hide resolved
Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found an issue. It will affect the measurements you did previously, so I believe these needs to be measured again.

pkg/ovs/openflow/ofctrl_bridge.go Show resolved Hide resolved
antoninbas
antoninbas previously approved these changes Apr 5, 2021
Copy link
Contributor

@antoninbas antoninbas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignore my earlier comment, it was incorrect. I just checked the Go documentation :)
LGTM

pkg/agent/openflow/packetin.go Outdated Show resolved Hide resolved
pkg/agent/openflow/packetin.go Outdated Show resolved Hide resolved
@GraysonWu
Copy link
Contributor Author

/test-all

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants