Skip to content

Commit

Permalink
Ring buffer for notifications
Browse files Browse the repository at this point in the history
Results from BenchmarkListener:
Current code:
1000000	      1540 ns/op	     109 B/op	       1 allocs/op

New:
1000000	      1139 ns/op	      16 B/op	       1 allocs/op
  • Loading branch information
ash2k committed Sep 26, 2017
1 parent b188868 commit 1940f5a
Show file tree
Hide file tree
Showing 11 changed files with 192 additions and 18 deletions.
1 change: 1 addition & 0 deletions staging/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ filegroup(
"//staging/src/k8s.io/client-go/tools/reference:all-srcs",
"//staging/src/k8s.io/client-go/tools/remotecommand:all-srcs",
"//staging/src/k8s.io/client-go/transport:all-srcs",
"//staging/src/k8s.io/client-go/util/buffer:all-srcs",
"//staging/src/k8s.io/client-go/util/cert:all-srcs",
"//staging/src/k8s.io/client-go/util/exec:all-srcs",
"//staging/src/k8s.io/client-go/util/flowcontrol:all-srcs",
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiextensions-apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/tools/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/client-go/rest:go_default_library",
"//vendor/k8s.io/client-go/tools/pager:go_default_library",
"//vendor/k8s.io/client-go/util/buffer:go_default_library",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ func BenchmarkListener(b *testing.B) {
var swg sync.WaitGroup
swg.Add(b.N)
b.SetParallelism(concurrencyLevel)
// Preallocate enough space so that benchmark does not run out of it
pl := newProcessListener(&ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
swg.Done()
},
}, 0, 0, time.Now())
}, 0, 0, time.Now(), 1024*1024)
var wg wait.Group
defer wg.Wait() // Wait for .run and .pop to stop
defer close(pl.addCh) // Tell .run and .pop to stop
Expand Down
39 changes: 22 additions & 17 deletions staging/src/k8s.io/client-go/tools/cache/shared_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/buffer"

"github.com/golang/glog"
)
Expand Down Expand Up @@ -92,8 +93,13 @@ func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEve
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func() bool

// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond
const (
// syncedPollPeriod controls how often you look at the status of your sync funcs
syncedPollPeriod = 100 * time.Millisecond

// initialBufferSize is the initial number of event notifications that can be buffered.
initialBufferSize = 1024
)

// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the controller should shutdown
Expand Down Expand Up @@ -313,7 +319,7 @@ func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEv
}
}

listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now())
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)

if !s.started {
s.processor.addListener(listener)
Expand Down Expand Up @@ -465,6 +471,13 @@ type processorListener struct {

handler ResourceEventHandler

// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing

// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
Expand All @@ -477,11 +490,12 @@ type processorListener struct {
resyncLock sync.Mutex
}

func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time) *processorListener {
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
Expand All @@ -499,25 +513,16 @@ func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop

// pendingNotifications is an unbounded slice that holds all notifications not yet distributed
// there is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better
var pendingNotifications []interface{}
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
if len(pendingNotifications) == 0 { // Nothing to pop
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
notification = nil
} else {
notification = pendingNotifications[0]
pendingNotifications[0] = nil
pendingNotifications = pendingNotifications[1:]
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
Expand All @@ -528,7 +533,7 @@ func (p *processorListener) pop() {
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
pendingNotifications = append(pendingNotifications, notificationToAdd)
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions staging/src/k8s.io/client-go/util/buffer/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["ring_growing.go"],
visibility = ["//visibility:public"],
)

go_test(
name = "go_default_test",
srcs = ["ring_growing_test.go"],
library = ":go_default_library",
deps = ["//vendor/github.com/stretchr/testify/assert:go_default_library"],
)

filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)

filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
72 changes: 72 additions & 0 deletions staging/src/k8s.io/client-go/util/buffer/ring_growing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package buffer

// RingGrowing is a growing ring buffer.
// Not thread safe.
type RingGrowing struct {
data []interface{}
n int // Size of Data
beg int // First available element
readable int // Number of data items available
}

// NewRingGrowing constructs a new RingGrowing instance with provided parameters.
func NewRingGrowing(initialSize int) *RingGrowing {
return &RingGrowing{
data: make([]interface{}, initialSize),
n: initialSize,
}
}

// ReadOne reads (consumes) first item from the buffer if it is available, otherwise returns false.
func (r *RingGrowing) ReadOne() (data interface{}, ok bool) {
if r.readable == 0 {
return nil, false
}
r.readable--
element := r.data[r.beg]
r.data[r.beg] = nil // Remove reference to the object to help GC
if r.beg == r.n-1 {
// Was the last element
r.beg = 0
} else {
r.beg++
}
return element, true
}

// WriteOne adds an item to the end of the buffer, growing it if it is full.
func (r *RingGrowing) WriteOne(data interface{}) {
if r.readable == r.n {
// Time to grow
newN := r.n * 2
newData := make([]interface{}, newN)
to := r.beg + r.readable
if to <= r.n {
copy(newData, r.data[r.beg:to])
} else {
copied := copy(newData, r.data[r.beg:])
copy(newData[copied:], r.data[:(to%r.n)])
}
r.beg = 0
r.data = newData
r.n = newN
}
r.data[(r.readable+r.beg)%r.n] = data
r.readable++
}
50 changes: 50 additions & 0 deletions staging/src/k8s.io/client-go/util/buffer/ring_growing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package buffer

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestGrowth(t *testing.T) {
t.Parallel()
x := 10
g := NewRingGrowing(1)
for i := 0; i < x; i++ {
assert.Equal(t, i, g.readable)
g.WriteOne(i)
}
read := 0
for g.readable > 0 {
v, ok := g.ReadOne()
assert.True(t, ok)
assert.Equal(t, read, v)
read++
}
assert.Equalf(t, x, read, "expected to have read %d items: %d", x, read)
assert.Zerof(t, g.readable, "expected readable to be zero: %d", g.readable)
assert.Equalf(t, g.n, 16, "expected N to be 16: %d", g.n)
}

func TestEmpty(t *testing.T) {
t.Parallel()
g := NewRingGrowing(1)
_, ok := g.ReadOne()
assert.False(t, ok)
}
4 changes: 4 additions & 0 deletions staging/src/k8s.io/kube-aggregator/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions staging/src/k8s.io/sample-apiserver/Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 1940f5a

Please sign in to comment.