Skip to content

Commit

Permalink
Lazily initialize localPodInformer in a more readable way
Browse files Browse the repository at this point in the history
Instead of checking various conditions to determine whether
localPodInformer should be initialized, which is error-prone and looks
ugly, this patch adds a Generic interface for lazy evaluation and uses
it to initialize localPodInformer when it's necessary.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Nov 14, 2023
1 parent 5701d1a commit 2a0c329
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 16 deletions.
29 changes: 13 additions & 16 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ import (
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/util/lazy"
"antrea.io/antrea/pkg/util/podstore"
"antrea.io/antrea/pkg/version"
)
Expand Down Expand Up @@ -323,23 +324,19 @@ func run(o *Options) error {
externalEntityUpdateChannel = channel.NewSubscribableChannel("ExternalEntityUpdate", 100)
}

// Initialize localPodInformer for NPLAgent, AntreaIPAMController,
// StretchedNetworkPolicyController, and secondary network controller.
var localPodInformer cache.SharedIndexInformer
if o.enableNodePortLocal || enableBridgingMode || enableMulticlusterNP || enableFlowExporter ||
features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) ||
features.DefaultFeatureGate.Enabled(features.TrafficControl) {
// Lazily initialize localPodInformer when it's required by any module.
localPodInformer := lazy.New(func() cache.SharedIndexInformer {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
localPodInformer = coreinformers.NewFilteredPodInformer(
return coreinformers.NewFilteredPodInformer(
k8sClient,
metav1.NamespaceAll,
resyncPeriodDisabled,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController.
listOptions,
)
}
})

var mcDefaultRouteController *mcroute.MCDefaultRouteController
var mcStrechedNetworkPolicyController *mcroute.StretchedNetworkPolicyController
Expand Down Expand Up @@ -383,7 +380,7 @@ func run(o *Options) error {
mcStrechedNetworkPolicyController = mcroute.NewMCAgentStretchedNetworkPolicyController(
ofClient,
ifaceStore,
localPodInformer,
localPodInformer.Get(),
namespaceInformer,
labelIDInformer,
podUpdateChannel,
Expand Down Expand Up @@ -622,7 +619,7 @@ func run(o *Options) error {

var flowExporter *exporter.FlowExporter
if enableFlowExporter {
podStore := podstore.NewPodStore(localPodInformer)
podStore := podstore.NewPodStore(localPodInformer.Get())
flowExporterOptions := &flowexporter.FlowExporterOptions{
FlowCollectorAddr: o.flowCollectorAddr,
FlowCollectorProto: o.flowCollectorProto,
Expand Down Expand Up @@ -677,7 +674,7 @@ func run(o *Options) error {
nplController, err := npl.InitializeNPLAgent(
k8sClient,
serviceInformer,
localPodInformer,
localPodInformer.Get(),
o.nplStartPort,
o.nplEndPort,
nodeConfig.Name,
Expand All @@ -691,7 +688,7 @@ func run(o *Options) error {
// Antrea IPAM is needed by bridging mode and secondary network IPAM.
if enableAntreaIPAM {
ipamController, err := ipam.InitializeAntreaIPAMController(
crdClient, namespaceInformer, ipPoolInformer, localPodInformer, enableBridgingMode)
crdClient, namespaceInformer, ipPoolInformer, localPodInformer.Get(), enableBridgingMode)
if err != nil {
return fmt.Errorf("failed to start Antrea IPAM agent: %v", err)
}
Expand All @@ -701,7 +698,7 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
if err := secondarynetwork.Initialize(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer, nodeConfig.Name, cniPodInfoStore,
k8sClient, localPodInformer.Get(), nodeConfig.Name, cniPodInfoStore,
stopCh,
&o.config.SecondaryNetwork, ovsdbConnection); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
Expand All @@ -714,15 +711,15 @@ func run(o *Options) error {
ovsBridgeClient,
ovsCtlClient,
trafficControlInformer,
localPodInformer,
localPodInformer.Get(),
namespaceInformer,
podUpdateChannel)
go tcController.Run(stopCh)
}

// Start the localPodInformer
if localPodInformer != nil {
go localPodInformer.Run(stopCh)
if localPodInformer.Evaluated() {
go localPodInformer.Get().Run(stopCh)
}

informerFactory.Start(stopCh)
Expand Down
62 changes: 62 additions & 0 deletions pkg/util/lazy/lazy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lazy

import (
"sync"
"sync/atomic"
)

// Lazy defers the evaluation of getter until it's accessed the first access.
type Lazy[T any] interface {
// Get returns the value, evaluate it if necessary.
Get() T
// Evaluated returns whether the value has been evaluated or not.
Evaluated() bool
}

type lazy[T any] struct {
getter func() T
// res is the cached result.
res T
done uint32
m sync.Mutex
}

// New returns a new lazily evaluated value. The getter is executed only when it's accessed the first access.
func New[T any](getter func() T) Lazy[T] {
return &lazy[T]{getter: getter}
}

func (l *lazy[T]) Get() T {
if atomic.LoadUint32(&l.done) == 0 {
return l.doSlow()
}
return l.res
}

func (l *lazy[T]) doSlow() T {
l.m.Lock()
defer l.m.Unlock()
if l.done == 0 {
defer atomic.StoreUint32(&l.done, 1)
l.res = l.getter()
}
return l.res
}

func (l *lazy[T]) Evaluated() bool {
return atomic.LoadUint32(&l.done) == 1
}
48 changes: 48 additions & 0 deletions pkg/util/lazy/lazy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package lazy

import (
"testing"

"github.com/stretchr/testify/assert"
)

type foo struct{}

func TestLazy(t *testing.T) {
var called int
lazyFoo := New(func() *foo {
called++
return &foo{}
})
assert.False(t, lazyFoo.Evaluated())
assert.Equal(t, 0, called)

ch := make(chan *foo, 10)
for i := 0; i < 10; i++ {
go func() {
ch <- lazyFoo.Get()
}()
}
// Got the first result.
foo := <-ch
assert.True(t, lazyFoo.Evaluated())
// Got the rest 9 results, all of them should reference the same object.
for i := 1; i < 10; i++ {
assert.Same(t, foo, <-ch)
}
assert.Equal(t, 1, called, "The getter should only be called exactly once")
}

0 comments on commit 2a0c329

Please sign in to comment.