Skip to content

Commit 807e49a

Browse files
joamakijulianwiedmann
authored andcommitted
datapath/tables: Use Table[*LocalNode] in NodeAddressController
Switch over to querying Table[*LocalNode] instead of using LocalNodeStore. This simplifies the processing loop as we now only have tables as inputs and can thus use statedb.WatchSet. Signed-off-by: Jussi Maki <jussi@isovalent.com>
1 parent 62f4856 commit 807e49a

File tree

1 file changed

+29
-66
lines changed

1 file changed

+29
-66
lines changed

pkg/datapath/tables/node_address.go

Lines changed: 29 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@ import (
1717
"github.com/cilium/hive/job"
1818
"github.com/cilium/statedb"
1919
"github.com/cilium/statedb/index"
20-
"github.com/cilium/stream"
2120
"github.com/spf13/pflag"
2221
"k8s.io/apimachinery/pkg/util/sets"
2322

2423
"github.com/cilium/cilium/pkg/defaults"
2524
"github.com/cilium/cilium/pkg/ip"
2625
"github.com/cilium/cilium/pkg/logging/logfields"
2726
"github.com/cilium/cilium/pkg/node"
27+
"github.com/cilium/cilium/pkg/node/addressing"
28+
nodeTypes "github.com/cilium/cilium/pkg/node/types"
2829
"github.com/cilium/cilium/pkg/option"
29-
"github.com/cilium/cilium/pkg/rate"
3030
"github.com/cilium/cilium/pkg/time"
3131
)
3232

@@ -155,7 +155,7 @@ var (
155155
//
156156
// The Table[NodeAddress] contains the actual assigned addresses on the node,
157157
// but not for example external Kubernetes node addresses that may be merely
158-
// NATd to a private address. Those can be queried through [node.LocalNodeStore].
158+
// NATd to a private address. Those can be queried through Table[*node.LocalNode].
159159
NodeAddressCell = cell.Module(
160160
"node-address",
161161
"Table of node addresses derived from system network devices",
@@ -216,13 +216,12 @@ type nodeAddressControllerParams struct {
216216
Routes statedb.Table[*Route]
217217
NodeAddresses statedb.RWTable[NodeAddress]
218218
AddressScopeMax AddressScopeMax
219-
LocalNode *node.LocalNodeStore
219+
Nodes statedb.Table[*node.LocalNode]
220220
}
221221

222222
type nodeAddressController struct {
223223
nodeAddressControllerParams
224224

225-
k8sIPv4, k8sIPv6 netip.Addr
226225
fallbackAddresses fallbackAddresses
227226
}
228227

@@ -241,88 +240,52 @@ func (n *nodeAddressController) register() {
241240
n.Lifecycle.Append(
242241
cell.Hook{
243242
OnStart: func(ctx cell.HookContext) error {
244-
if node, err := n.LocalNode.Get(ctx); err == nil {
245-
n.updateK8sNodeIPs(node)
246-
}
247-
248243
// Perform an initial synchronous reconciliation to populate the table.
249244
// This ensures that dependent cells see the initial state when they start.
250245
// The watch channels returned here will be the initial channels for the run loop.
251-
initialDevicesWatch, initialRoutesWatch := n.reconcile()
246+
ws := n.reconcile()
252247

253248
// Start the background job for continuous reconciliation.
254249
n.Jobs.Add(job.OneShot("node-address-update", func(ctx context.Context, reporter cell.Health) error {
255-
return n.run(ctx, reporter, initialDevicesWatch, initialRoutesWatch)
250+
return n.run(ctx, reporter, ws)
256251
}))
257252
return nil
258253
},
259254
})
260255
}
261256

262-
func (n *nodeAddressController) updateK8sNodeIPs(node node.LocalNode) (updated bool) {
263-
if ip := node.GetNodeIP(true); ip != nil {
264-
if newIP, ok := netip.AddrFromSlice(ip); ok {
265-
if newIP != n.k8sIPv6 {
266-
n.k8sIPv6 = newIP
267-
updated = true
268-
}
269-
}
270-
}
271-
if ip := node.GetNodeIP(false); ip != nil {
272-
if newIP, ok := netip.AddrFromSlice(ip); ok {
273-
if newIP != n.k8sIPv4 {
274-
n.k8sIPv4 = newIP
275-
updated = true
276-
}
277-
}
278-
}
279-
return
280-
}
281-
282-
func (n *nodeAddressController) run(ctx context.Context, reporter cell.Health, initialDevicesWatch, initialRoutesWatch <-chan struct{}) error {
283-
localNodeChanges := stream.ToChannel(ctx, stream.Debounce(n.LocalNode, nodeAddressControllerMinInterval))
284-
limiter := rate.NewLimiter(nodeAddressControllerMinInterval, 1)
285-
286-
// Use the initial watch channels provided from the OnStart hook.
287-
devicesWatch := initialDevicesWatch
288-
routesWatch := initialRoutesWatch
289-
257+
func (n *nodeAddressController) run(ctx context.Context, reporter cell.Health, ws *statedb.WatchSet) error {
290258
for {
291-
// Rate-limit to batch multiple updates together.
292-
if err := limiter.Wait(ctx); err != nil {
293-
return err
294-
}
295-
296-
// Wait for changes from any input source.
297-
select {
298-
case <-ctx.Done():
259+
// Wait for changes
260+
if _, err := ws.Wait(ctx, nodeAddressControllerMinInterval); err != nil {
299261
return nil
300-
case <-devicesWatch:
301-
// A device has changed.
302-
case <-routesWatch:
303-
// A route has changed.
304-
case localNode, ok := <-localNodeChanges:
305-
if !ok {
306-
localNodeChanges = nil
307-
continue
308-
}
309-
// Update Kubernetes node IPs. Reconciliation will happen after rate-limiting.
310-
n.updateK8sNodeIPs(localNode)
311262
}
312263

313-
// Perform the full reconciliation and get new watch channels for the next iteration.
314-
devicesWatch, routesWatch = n.reconcile()
264+
// Perform the full reconciliation and get new watch set
265+
ws = n.reconcile()
315266
}
316267
}
317268

318269
// reconcile performs a full reconciliation of the NodeAddress table. It computes
319270
// the desired state from the Devices table and updates the NodeAddress table
320271
// to match it. It returns the read transaction and new watch channels for Devices and Routes.
321-
func (n *nodeAddressController) reconcile() (<-chan struct{}, <-chan struct{}) {
272+
func (n *nodeAddressController) reconcile() *statedb.WatchSet {
273+
ws := statedb.NewWatchSet()
274+
322275
rtxn := n.DB.ReadTxn()
276+
277+
var k8sIPv4, k8sIPv6 netip.Addr
278+
if localNode, _, watch, found := n.Nodes.GetWatch(rtxn, node.LocalNodeQuery); found {
279+
k8sIPv4, _ = netip.AddrFromSlice(addressing.ExtractNodeIP[nodeTypes.Address](localNode.IPAddresses, false))
280+
k8sIPv6, _ = netip.AddrFromSlice(addressing.ExtractNodeIP[nodeTypes.Address](localNode.IPAddresses, true))
281+
ws.Add(watch)
282+
}
283+
323284
// Get iterators for the current state and new watch channels.
324285
allDevices, devicesWatch := n.Devices.AllWatch(rtxn)
286+
ws.Add(devicesWatch)
325287
localRoutes, routesWatch := n.Routes.PrefixWatch(rtxn, RouteIDIndex.Query(RouteID{Table: RT_TABLE_LOCAL}))
288+
ws.Add(routesWatch)
326289

327290
// A map to hold the desired state of node addresses, keyed by device name.
328291
newAddrsByDevice := make(map[string][]NodeAddress)
@@ -331,7 +294,7 @@ func (n *nodeAddressController) reconcile() (<-chan struct{}, <-chan struct{}) {
331294
// Get addresses from devices
332295
n.fallbackAddresses.clear()
333296
for dev := range allDevices {
334-
deviceAddrs := n.getAddressesFromDevice(dev)
297+
deviceAddrs := n.getAddressesFromDevice(dev, k8sIPv4, k8sIPv6)
335298
if deviceAddrs == nil {
336299
continue
337300
}
@@ -427,7 +390,7 @@ func (n *nodeAddressController) reconcile() (<-chan struct{}, <-chan struct{}) {
427390
n.update(wtxn, nil, n.Health, deletedDevName)
428391
}
429392
wtxn.Commit()
430-
return devicesWatch, routesWatch
393+
return ws
431394
}
432395

433396
// updates the node addresses of a single device.
@@ -497,7 +460,7 @@ func (n *nodeAddressController) shouldUseDeviceForNodeAddress(dev *Device) bool
497460
return true
498461
}
499462

500-
func (n *nodeAddressController) getAddressesFromDevice(dev *Device) []NodeAddress {
463+
func (n *nodeAddressController) getAddressesFromDevice(dev *Device, k8sIPv4, k8sIPv6 netip.Addr) []NodeAddress {
501464
if !n.shouldUseDeviceForNodeAddress(dev) {
502465
return nil
503466
}
@@ -527,7 +490,7 @@ func (n *nodeAddressController) getAddressesFromDevice(dev *Device) []NodeAddres
527490
index := len(addrs)
528491
isPublic := ip.IsPublicAddr(addr.Addr.AsSlice())
529492
if addr.Addr.Is4() {
530-
if addr.Addr.Unmap() == n.k8sIPv4.Unmap() {
493+
if addr.Addr.Unmap() == k8sIPv4.Unmap() {
531494
// Address matches the K8s Node IP. Force this to be picked.
532495
ipv4PublicIndex = index
533496
ipv4PrivateIndex = index
@@ -541,7 +504,7 @@ func (n *nodeAddressController) getAddressesFromDevice(dev *Device) []NodeAddres
541504
}
542505

543506
if addr.Addr.Is6() {
544-
if addr.Addr == n.k8sIPv6 {
507+
if addr.Addr == k8sIPv6 {
545508
// Address matches the K8s Node IP. Force this to be picked.
546509
ipv6PublicIndex = index
547510
ipv6PrivateIndex = index

0 commit comments

Comments
 (0)