Skip to content

feat: allow list items to be processed in parallel #738

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 83 additions & 17 deletions pkg/cache/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ const (
// Limit is required to avoid memory spikes during cache initialization.
// The default limit of 50 is chosen based on experiments.
defaultListSemaphoreWeight = 50
// defaultListItemSemaphoreWeight limits the amount of items to process in parallel for each k8s list.
defaultListItemSemaphoreWeight = int64(1)
// defaultEventProcessingInterval is the default interval for processing events
defaultEventProcessingInterval = 100 * time.Millisecond
)
Expand Down Expand Up @@ -164,15 +166,16 @@ type ListRetryFunc func(err error) bool
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
log := textlogger.NewLogger(textlogger.NewConfig())
cache := &clusterCache{
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
eventMetaCh: nil,
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
config: config,
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
eventMetaCh: nil,
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
listItemSemaphoreWeight: defaultListItemSemaphoreWeight,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is great that it maintains existing behavior by default

resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
config: config,
kubectl: &kube.KubectlCmd{
Log: log,
Tracer: tracing.NopTracer{},
Expand Down Expand Up @@ -219,8 +222,9 @@ type clusterCache struct {
// size of a page for list operations pager.
listPageSize int64
// number of pages to prefetch for list pager.
listPageBufferSize int32
listSemaphore WeightedSemaphore
listPageBufferSize int32
listSemaphore WeightedSemaphore
listItemSemaphoreWeight int64

// retry options for list operations
listRetryLimit int32
Expand Down Expand Up @@ -262,6 +266,35 @@ type clusterCacheSync struct {
resyncTimeout time.Duration
}

// listItemTaskLimiter limits the amount of list items to process in parallel.
type listItemTaskLimiter struct {
sem WeightedSemaphore
wg sync.WaitGroup
}

// Run executes the given task concurrently, blocking if the pool is at capacity.
func (t *listItemTaskLimiter) Run(ctx context.Context, task func()) error {
t.wg.Add(1)
if err := t.sem.Acquire(ctx, 1); err != nil {
t.wg.Done()
return fmt.Errorf("failed to acquire semaphore: %w", err)
}

go func() {
defer t.wg.Done()
defer t.sem.Release(1)

task()
}()

return nil
}

// Wait blocks until all submitted tasks have completed.
func (t *listItemTaskLimiter) Wait() {
t.wg.Wait()
}

// ListRetryFuncNever never retries on errors
func ListRetryFuncNever(_ error) bool {
return false
Expand Down Expand Up @@ -446,6 +479,13 @@ func (c *clusterCache) newResource(un *unstructured.Unstructured) *Resource {
return resource
}

func (c *clusterCache) newListItemTaskLimiter() *listItemTaskLimiter {
return &listItemTaskLimiter{
sem: semaphore.NewWeighted(c.listItemSemaphoreWeight),
wg: sync.WaitGroup{},
}
}

func (c *clusterCache) setNode(n *Resource) {
key := n.ResourceKey()
c.resources[key] = n
Expand Down Expand Up @@ -629,17 +669,33 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso

// loadInitialState loads the state of all the resources retrieved by the given resource client.
func (c *clusterCache) loadInitialState(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, lock bool) (string, error) {
var items []*Resource
var (
items []*Resource
listLock = sync.Mutex{}
limiter = c.newListItemTaskLimiter()
)

resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
return listPager.EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
items = append(items, c.newResource(un))
if err := limiter.Run(ctx, func() {
newRes := c.newResource(un)
listLock.Lock()
items = append(items, newRes)
listLock.Unlock()
}); err != nil {
return fmt.Errorf("failed to process list item: %w", err)
}
}
return nil
})
})
Comment on lines 678 to 694

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to call this out as a trade-off, I think this will cause a change in ordering when using concurrency. But this is only when its enabled. Not a bad trade-off, just something to be aware of.


// Wait until all items have completed processing.
limiter.Wait()

if err != nil {
return "", fmt.Errorf("failed to load initial state of resource %s: %w", api.GroupKind.String(), err)
}
Expand Down Expand Up @@ -938,19 +994,29 @@ func (c *clusterCache) sync() error {
lock.Unlock()

return c.processApi(client, api, func(resClient dynamic.ResourceInterface, ns string) error {
limiter := c.newListItemTaskLimiter()

resourceVersion, err := c.listResources(ctx, resClient, func(listPager *pager.ListPager) error {
return listPager.EachListItem(context.Background(), metav1.ListOptions{}, func(obj runtime.Object) error {
if un, ok := obj.(*unstructured.Unstructured); !ok {
return fmt.Errorf("object %s/%s has an unexpected type", un.GroupVersionKind().String(), un.GetName())
} else {
newRes := c.newResource(un)
lock.Lock()
c.setNode(newRes)
lock.Unlock()
if err := limiter.Run(ctx, func() {
newRes := c.newResource(un)
lock.Lock()
c.setNode(newRes)
lock.Unlock()
}); err != nil {
return fmt.Errorf("failed to process list item: %w", err)
}
}
return nil
})
})

// Wait until all items have completed processing.
limiter.Wait()

if err != nil {
if c.isRestrictedResource(err) {
keep := false
Expand Down
46 changes: 39 additions & 7 deletions pkg/cache/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,39 @@ func getChildren(cluster *clusterCache, un *unstructured.Unstructured) []*Resour
return hierarchy[1:]
}

// Benchmark_sync is meant to simulate cluster initialization when populateResourceInfoHandler does nontrivial work.
func Benchmark_sync(t *testing.B) {
// BenchmarkSync benchmarks cluster initialization when populateResourceInfoHandler does nontrivial work.
// The benchmark is executed using different list item semaphore weights.
func BenchmarkSync(b *testing.B) {
b.Run("weight=1,overhead=100μs", func(bb *testing.B) {
runBenchmarkSync(bb, 1, 100*time.Microsecond)
})
b.Run("weight=2,overhead=100μs", func(bb *testing.B) {
runBenchmarkSync(bb, 2, 100*time.Microsecond)
})
b.Run("weight=4,overhead=100μs", func(bb *testing.B) {
runBenchmarkSync(bb, 4, 100*time.Microsecond)
})
b.Run("weight=8,overhead=100μs", func(bb *testing.B) {
runBenchmarkSync(bb, 8, 100*time.Microsecond)
})

b.Run("weight=1,overhead=500μs", func(bb *testing.B) {
runBenchmarkSync(bb, 1, 500*time.Microsecond)
})
b.Run("weight=2,overhead=500μs", func(bb *testing.B) {
runBenchmarkSync(bb, 2, 500*time.Microsecond)
})
b.Run("weight=4,overhead=500μs", func(bb *testing.B) {
runBenchmarkSync(bb, 4, 500*time.Microsecond)
})
b.Run("weight=8,overhead=500μs", func(bb *testing.B) {
runBenchmarkSync(bb, 8, 500*time.Microsecond)
})
}

func runBenchmarkSync(b *testing.B, weight int64, overhead time.Duration) {
b.Helper()

resources := []runtime.Object{}
for i := 0; i < 100; i++ {
resources = append(resources, &corev1.Pod{
Expand All @@ -174,18 +205,19 @@ func Benchmark_sync(t *testing.B) {
})
}

c := newCluster(t, resources...)
c := newCluster(b, resources...)
c.listItemSemaphoreWeight = weight

c.populateResourceInfoHandler = func(_ *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
time.Sleep(10 * time.Microsecond)
time.Sleep(overhead)
return nil, false
}

t.ResetTimer()
b.ResetTimer()

for n := 0; n < t.N; n++ {
for n := 0; n < b.N; n++ {
err := c.sync()
require.NoError(t, err)
require.NoError(b, err)
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cache/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ func SetListSemaphore(listSemaphore WeightedSemaphore) UpdateSettingsFunc {
}
}

// SetListItemSemaphoreWeight sets the weight to limit the amount of k8s list items to process in parallel.
func SetListItemSemaphoreWeight(listItemSemaphoreWeight int64) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.listItemSemaphoreWeight = listItemSemaphoreWeight
}
}

// SetResyncTimeout updates cluster re-sync timeout
func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc {
return func(cache *clusterCache) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/settings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,12 @@ func TestSetEventsProcessingInterval(t *testing.T) {
cache.Invalidate(SetEventProcessingInterval(interval))
assert.Equal(t, interval, cache.eventProcessingInterval)
}

func TestSetListItemSemaphoreWeight(t *testing.T) {
cache := NewClusterCache(&rest.Config{})
assert.Equal(t, defaultListItemSemaphoreWeight, cache.listItemSemaphoreWeight)

weight := int64(8)
cache.Invalidate(SetListItemSemaphoreWeight(weight))
assert.Equal(t, weight, cache.listItemSemaphoreWeight)
}