Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change secondary network Pod controller to subscribe to CNIServer events #5767

Merged
merged 1 commit into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ import (
"antrea.io/antrea/pkg/agent/querier"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/stats"
support "antrea.io/antrea/pkg/agent/supportbundlecollection"
Expand Down Expand Up @@ -535,7 +534,6 @@ func run(o *Options) error {
}

var cniServer *cniserver.CNIServer
var cniPodInfoStore cnipodcache.CNIPodInfoStore
var externalNodeController *externalnode.ExternalNodeController
var localExternalNodeInformer cache.SharedIndexInformer

Expand All @@ -554,17 +552,9 @@ func run(o *Options) error {
networkConfig,
networkReadyCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
cniPodInfoStore = cnipodcache.NewCNIPodInfoStore()
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, cniPodInfoStore)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel, nil)
if err != nil {
return fmt.Errorf("error initializing CNI server: %v", err)
}
err = cniServer.Initialize(ovsBridgeClient, ofClient, ifaceStore, podUpdateChannel)
if err != nil {
return fmt.Errorf("error initializing CNI server with cniPodInfoStore cache: %v", err)
}
} else {
listOptions := func(options *metav1.ListOptions) {
Expand Down Expand Up @@ -700,8 +690,8 @@ func run(o *Options) error {
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
if err := secondarynetwork.Initialize(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(), nodeConfig.Name, cniPodInfoStore,
stopCh,
k8sClient, localPodInformer.Get(), nodeConfig.Name,
podUpdateChannel, stopCh,
&o.config.SecondaryNetwork, ovsdbConnection); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
}
Expand Down
19 changes: 8 additions & 11 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
agenttypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/ovs/ovsconfig"
Expand Down Expand Up @@ -72,8 +71,6 @@ type podConfigurator struct {
// podUpdateNotifier is used for notifying updates of local Pods to other components which may benefit from this
// information, i.e. NetworkPolicyController, EgressController.
podUpdateNotifier channel.Notifier
// consumed by secondary network creation.
podInfoStore cnipodcache.CNIPodInfoStore
}

func newPodConfigurator(
Expand All @@ -86,7 +83,6 @@ func newPodConfigurator(
isOvsHardwareOffloadEnabled bool,
disableTXChecksumOffload bool,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) (*podConfigurator, error) {
ifConfigurator, err := newInterfaceConfigurator(ovsDatapathType, isOvsHardwareOffloadEnabled, disableTXChecksumOffload)
if err != nil {
Expand All @@ -100,7 +96,6 @@ func newPodConfigurator(
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podInfoStore: podInfoStore,
}, nil
}

Expand Down Expand Up @@ -243,7 +238,8 @@ func (pc *podConfigurator) configureInterfaces(
}

var containerConfig *interfacestore.InterfaceConfig
if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil {
if containerConfig, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS,
hostIface, containerIface, result.IPs, result.VLANID, containerAccess); err != nil {
return fmt.Errorf("failed to connect to ovs for container %s: %v", containerID, err)
}
defer func() {
Expand Down Expand Up @@ -486,7 +482,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
return nil
}

func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, containerConfig *interfacestore.InterfaceConfig) error {
func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName, netNS string, containerConfig *interfacestore.InterfaceConfig) error {
// create OVS Port and add attach container configuration into external_ids
containerID := containerConfig.ContainerID
klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
Expand Down Expand Up @@ -519,8 +515,9 @@ func (pc *podConfigurator) connectInterfaceToOVSCommon(ovsPortName string, conta
event := agenttypes.PodUpdate{
PodName: containerConfig.PodName,
PodNamespace: containerConfig.PodNamespace,
IsAdd: true,
ContainerID: containerConfig.ContainerID,
NetNS: netNS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how it handles restart case if it relies on the netNS which is not persistent anywhere. There may be two cases it can't handle:

  1. If agent is restarted after CNI event is handled but before PodController handles it, it can't create no longer create the secondary interface.
  2. After agent is restarted, it can't clean up the secondary interface as the cache is always empty?

Should it cache netNS in containerConfig and use containerConfig to initialize podCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Today we do not persist any secondary interface configuration, and I plan to implement that in the next PR. We can definitely save net NS with the primary interface OVS port and I thought about that too. But I want to mention that only address the case antrea-agent restarts after Pod's primary interfaces are created but before any secondary interface is created, as only secondary interface creation (not deletion) will require net NS. I am still thinking is it worthwhile to cover that corner case or not.

IsAdd: true,
}
pc.podUpdateNotifier.Notify(event)
return nil
Expand Down Expand Up @@ -548,8 +545,8 @@ func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interface
event := agenttypes.PodUpdate{
PodName: containerConfig.PodName,
PodNamespace: containerConfig.PodNamespace,
IsAdd: false,
ContainerID: containerConfig.ContainerID,
IsAdd: false,
}
pc.podUpdateNotifier.Notify(event)
klog.Infof("Removed interfaces for container %s", containerID)
Expand Down Expand Up @@ -577,8 +574,8 @@ func (pc *podConfigurator) connectInterceptedInterface(
if err = pc.routeClient.MigrateRoutesToGw(hostIface.Name); err != nil {
return fmt.Errorf("connectInterceptedInterface failed to migrate: %w", err)
}
_, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, hostIface,
containerIface, containerIPs, 0, containerAccess)
_, err = pc.connectInterfaceToOVS(podName, podNamespace, containerID, containerNetNS,
hostIface, containerIface, containerIPs, 0, containerAccess)
return err
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
podName string,
podNamespace string,
containerID string,
netNS string,
hostIface *current.Interface,
containerIface *current.Interface,
ips []*current.IPConfig,
Expand All @@ -38,7 +39,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
// Use the outer veth interface name as the OVS port name.
ovsPortName := hostIface.Name
containerConfig := buildContainerConfig(ovsPortName, containerID, podName, podNamespace, containerIface, ips, vlanID)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig)
}

func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.InterfaceConfig, containerAccess *containerAccessArbitrator) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/pod_configuration_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func createPodConfigurator(controller *gomock.Controller, testIfaceConfigurator
mockOFClient = openflowtest.NewMockClient(controller)
ifaceStore = interfacestore.NewInterfaceStore()
mockRoute = routetest.NewMockInterface(controller)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100), nil)
configurator, _ := newPodConfigurator(mockOVSBridgeClient, mockOFClient, mockRoute, ifaceStore, gwMAC, "system", false, false, channel.NewSubscribableChannel("PodUpdate", 100))
configurator.ifConfigurator = testIfaceConfigurator
return configurator
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/agent/cniserver/pod_configuration_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
podName string,
podNamespace string,
containerID string,
netNS string,
hostIface *current.Interface,
containerIface *current.Interface,
ips []*current.IPConfig,
Expand All @@ -87,7 +88,7 @@ func (pc *podConfigurator) connectInterfaceToOVS(
// HNSEndpoint/HostComputeEndpoint, the current implementation will still work. It will choose the synchronized
// way to create OVS port.
if hostInterfaceExistsFunc(hostIfAlias) {
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, containerConfig)
return containerConfig, pc.connectInterfaceToOVSCommon(ovsPortName, netNS, containerConfig)
}
klog.V(2).Infof("Adding OVS port %s for container %s", ovsPortName, containerID)
ovsAttachInfo := BuildOVSPortExternalIDs(containerConfig)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/cniserver/secondary.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
)

func NewSecondaryInterfaceConfigurator(ovsBridgeClient ovsconfig.OVSBridgeClient) (*podConfigurator, error) {
return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil, nil)
return newPodConfigurator(ovsBridgeClient, nil, nil, nil, nil, ovsconfig.OVSDatapathSystem, false, false, nil)
}

// ConfigureSriovSecondaryInterface configures a SR-IOV secondary interface for a Pod.
Expand Down
31 changes: 2 additions & 29 deletions pkg/agent/cniserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/util"
cnipb "antrea.io/antrea/pkg/apis/cni/v1beta1"
"antrea.io/antrea/pkg/cni"
Expand Down Expand Up @@ -115,7 +114,6 @@ type CNIServer struct {
// Enable AntreaIPAM for secondary networks implementd by other CNIs.
enableSecondaryNetworkIPAM bool
disableTXChecksumOffload bool
secondaryNetworkEnabled bool
networkConfig *config.NetworkConfig
// networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it.
networkReadyCh <-chan struct{}
Expand Down Expand Up @@ -523,13 +521,6 @@ func (s *CNIServer) CmdAdd(ctx context.Context, request *cnipb.CniCmdRequest) (*
// mark success as true to avoid rollback
success = true

if s.secondaryNetworkEnabled {
// Go cache the CNI server info at CNIConfigInfo cache, for podWatch usage
cniInfo := &cnipodcache.CNIConfigInfo{CNIVersion: cniVersion, PodName: podName, PodNamespace: podNamespace,
ContainerID: cniConfig.ContainerId, ContainerNetNS: netNS, PodCNIDeleted: false}
s.podConfigurator.podInfoStore.AddCNIConfigInfo(cniInfo)
}

return resultToResponse(cniResult), nil
}

Expand Down Expand Up @@ -558,16 +549,7 @@ func (s *CNIServer) cmdDel(_ context.Context, cniConfig *CNIConfig) (*cnipb.CniC
return s.configInterfaceFailureResponse(err), nil
}
klog.InfoS("CmdDel for container succeeded", "container", cniConfig.ContainerId)
if s.secondaryNetworkEnabled {
podName := string(cniConfig.K8S_POD_NAME)
podNamespace := string(cniConfig.K8S_POD_NAMESPACE)
containerInfo := s.podConfigurator.podInfoStore.GetCNIConfigInfoByContainerID(podName, podNamespace, cniConfig.ContainerId)
if containerInfo != nil {
// Update PodCNIDeleted = true.
// This is to let Podwatch controller know that the CNI server cleaned up this Pod's primary network configuration.
s.podConfigurator.podInfoStore.SetPodCNIDeleted(containerInfo)
}
}

return &cnipb.CniCmdResponse{CniResult: []byte("")}, nil
}

Expand Down Expand Up @@ -652,21 +634,12 @@ func (s *CNIServer) Initialize(
ofClient openflow.Client,
ifaceStore interfacestore.InterfaceStore,
podUpdateNotifier channel.Notifier,
podInfoStore cnipodcache.CNIPodInfoStore,
) error {
var err error
// If podInfoStore is not nil, secondaryNetwork configuration is supported.
if podInfoStore != nil {
s.secondaryNetworkEnabled = true
} else {
s.secondaryNetworkEnabled = false
}

s.podConfigurator, err = newPodConfigurator(
ovsBridgeClient, ofClient, s.routeClient, ifaceStore, s.nodeConfig.GatewayConfig.MAC,
ovsBridgeClient.GetOVSDatapathType(), ovsBridgeClient.IsHardwareOffloadEnabled(),
s.disableTXChecksumOffload,
podUpdateNotifier, podInfoStore)
s.disableTXChecksumOffload, podUpdateNotifier)
if err != nil {
return fmt.Errorf("error during initialize podConfigurator: %v", err)
}
Expand Down
Loading
Loading