Skip to content

Commit

Permalink
Keep the cached Endpoints the same as those installed in OVS
Browse files Browse the repository at this point in the history
The main purpose of this PR is to avoid potential inconsistencies between
the cached Endpoints and those installed in OVS, like #4681, #4692.

This PR also updates:

- Method UninstallEndpointFlows of ofClient, support deleting flows of
  multiple Endpoints.
- Remove possible groups when a Service is deleted.
- Log something when a group for a Service is not created.
- Optimize and unify log.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Mar 9, 2023
1 parent 3b224c4 commit 6913627
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 134 deletions.
57 changes: 39 additions & 18 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Client interface {
InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error
// UninstallEndpointFlows removes flows of the Endpoint installed by
// InstallEndpointFlows.
UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error
UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error

// InstallServiceFlows installs flows for accessing Service NodePort, LoadBalancer and ClusterIP. It installs the
// flow that uses the group/bucket to do service LB. If the affinityTimeout is not zero, it also installs the flow
Expand Down Expand Up @@ -452,21 +452,36 @@ func (c *client) modifyFlows(cache *flowCategoryCache, flowCacheKey string, flow

// deleteFlows deletes all the flows in the flow cache indexed by the provided flowCacheKey.
func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) error {
fCacheI, ok := cache.Load(flowCacheKey)
if !ok {
// no matching flows found in the cache
return nil
return c.deleteFlowsWithMultipleKeys(cache, []string{flowCacheKey})
}

// deleteFlowsWithMultipleKeys uninstalls the flows with different flowCache keys and remove them from the cache on success.
// It will skip the keys which are not in the cache. All flows will be uninstalled via a bundle.
func (c *client) deleteFlowsWithMultipleKeys(cache *flowCategoryCache, keys []string) error {
// allFlows keeps the flows we will delete via a bundle.
var allFlows []binding.Flow
for _, key := range keys {
flows, ok := cache.Load(key)
// If a flow cache entry of the key does not exist, skip it.
if !ok {
klog.V(2).InfoS("Cached flow with provided key was found", "key", key)
continue
}
for _, flow := range flows.(flowCache) {
allFlows = append(allFlows, flow)
}
}
fCache := fCacheI.(flowCache)
// Delete flows from OVS.
delFlows := make([]binding.Flow, 0, len(fCache))
for _, flow := range fCache {
delFlows = append(delFlows, flow)
if len(allFlows) == 0 {
return nil
}
if err := c.ofEntryOperations.DeleteAll(delFlows); err != nil {
err := c.ofEntryOperations.DeleteAll(allFlows)
if err != nil {
return err
}
cache.Delete(flowCacheKey)
// Delete the keys and corresponding flows from the flow cache.
for _, key := range keys {
cache.Delete(key)
}
return nil
}

Expand Down Expand Up @@ -681,16 +696,22 @@ func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []pro
return c.addFlowsWithMultipleKeys(c.featureService.cachedFlows, keyToFlows)
}

func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint proxy.Endpoint) error {
func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

port, err := endpoint.Port()
if err != nil {
return fmt.Errorf("error when getting port: %w", err)
// keyToFlows is a map from the flows' cache key to the flows.
flowCacheKeys := make([]string, 0, len(endpoints))

for _, endpoint := range endpoints {
port, err := endpoint.Port()
if err != nil {
return fmt.Errorf("error when getting port: %w", err)
}
flowCacheKeys = append(flowCacheKeys, generateEndpointFlowCacheKey(endpoint.IP(), port, protocol))
}
cacheKey := generateEndpointFlowCacheKey(endpoint.IP(), port, protocol)
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)

return c.deleteFlowsWithMultipleKeys(c.featureService.cachedFlows, flowCacheKeys)
}

func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@ func Test_client_InstallEndpointFlows(t *testing.T) {
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(len(tc.endpoints))
m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1)

assert.NoError(t, fc.InstallEndpointFlows(tc.protocol, tc.endpoints))
var flows []string
Expand All @@ -1048,8 +1048,8 @@ func Test_client_InstallEndpointFlows(t *testing.T) {
}
assert.ElementsMatch(t, tc.expectedFlows, flows)

assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, tc.endpoints))
for _, ep := range tc.endpoints {
assert.NoError(t, fc.UninstallEndpointFlows(tc.protocol, ep))
endpointPort, _ := ep.Port()
cacheKey := generateEndpointFlowCacheKey(ep.IP(), endpointPort, tc.protocol)
_, ok := fc.featureService.cachedFlows.Load(cacheKey)
Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6913627

Please sign in to comment.