Skip to content

Commit

Permalink
identity: create CachingIdentityAllocator type
Browse files Browse the repository at this point in the history
This type encapsulates values which previously were package-level variables
into a specific type, `CachingIdentityAllocator`. This removes the need
to directly import `pkg/identity/cache`, and will make unit testing easier
in packages which need to allocate identities on demand, e.g. `pkg/endpoint`
in the future.

The daemon now has an `CachingIdentityAllocator` member, which it plumbs
down into various subsystems which need to allocate identities, specifically
clustermesh, endpoint, and policy.

Signed-off by: Ian Vernon <ian@cilium.io>
  • Loading branch information
Ian Vernon authored and ianvernon committed Oct 13, 2019
1 parent cf342e1 commit 83e576e
Show file tree
Hide file tree
Showing 52 changed files with 678 additions and 467 deletions.
5 changes: 3 additions & 2 deletions cilium-health/launch/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cilium/cilium/pkg/endpoint/regeneration"
healthDefaults "github.com/cilium/cilium/pkg/health/defaults"
"github.com/cilium/cilium/pkg/health/probe"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/launcher"
"github.com/cilium/cilium/pkg/logging/logfields"
Expand Down Expand Up @@ -234,7 +235,7 @@ type EndpointAdder interface {
//
// CleanupEndpoint() must be called before calling LaunchAsEndpoint() to ensure
// cleanup of prior cilium-health endpoint instances.
func LaunchAsEndpoint(baseCtx context.Context, owner regeneration.Owner, n *node.Node, mtuConfig mtu.Configuration, epMgr EndpointAdder, proxy endpoint.EndpointProxy) (*Client, error) {
func LaunchAsEndpoint(baseCtx context.Context, owner regeneration.Owner, n *node.Node, mtuConfig mtu.Configuration, epMgr EndpointAdder, proxy endpoint.EndpointProxy, allocator cache.IdentityAllocator) (*Client, error) {
var (
cmd = launcher.Launcher{}
info = &models.EndpointChangeRequest{
Expand Down Expand Up @@ -312,7 +313,7 @@ func LaunchAsEndpoint(baseCtx context.Context, owner regeneration.Owner, n *node
}

// Create the endpoint
ep, err := endpoint.NewEndpointFromChangeModel(owner, proxy, info)
ep, err := endpoint.NewEndpointFromChangeModel(owner, proxy, allocator, info)
if err != nil {
return nil, fmt.Errorf("Error while creating endpoint model: %s", err)
}
Expand Down
57 changes: 36 additions & 21 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ type Daemon struct {
iptablesManager rulesManager

endpointManager *endpointmanager.EndpointManager

identityAllocator *cache.CachingIdentityAllocator
}

// GetPolicyRepository returns the policy repository of the daemon
Expand Down Expand Up @@ -326,22 +328,9 @@ func NewDaemon(dp datapath.Datapath, iptablesManager rulesManager) (*Daemon, *en
// Must be done before calling policy.NewPolicyRepository() below.
identity.InitWellKnownIdentities()

epMgr := endpointmanager.NewEndpointManager(&endpointsynchronizer.EndpointSynchronizer{})
epMgr.InitMetrics()

// Cleanup on exit if running in tandem with Flannel.
if option.Config.FlannelUninstallOnExit {
cleanupFuncs.Add(func() {
for _, ep := range epMgr.GetEndpoints() {
ep.DeleteBPFProgramLocked()
}
})
}

d := Daemon{
svc: service.NewService(),
k8sSvcCache: k8s.NewServiceCache(),
policy: policy.NewPolicyRepository(),
prefixLengths: createPrefixLengthCounter(),
k8sResourceSynced: map[string]chan struct{}{},
buildEndpointSem: semaphore.NewWeighted(int64(numWorkerThreads())),
Expand All @@ -351,10 +340,35 @@ func NewDaemon(dp datapath.Datapath, iptablesManager rulesManager) (*Daemon, *en
datapath: dp,
nodeDiscovery: nodediscovery.NewNodeDiscovery(nodeMngr, mtuConfig),
iptablesManager: iptablesManager,
endpointManager: epMgr,
}

d.identityAllocator = cache.NewCachingIdentityAllocator(&d)
d.policy = policy.NewPolicyRepository(d.identityAllocator.GetIdentityCache())

// Propagate identity allocator down to packages which themselves do not
// have types to which we can add an allocator member.
//
// TODO: convert these package level variables to types for easier unit
// testing in the future.
ipcache.IdentityAllocator = d.identityAllocator
proxy.Allocator = d.identityAllocator

d.endpointManager = endpointmanager.NewEndpointManager(&endpointsynchronizer.EndpointSynchronizer{
Allocator: d.identityAllocator,
})
d.endpointManager.InitMetrics()

bootstrapStats.daemonInit.End(true)

// Cleanup on exit if running in tandem with Flannel.
if option.Config.FlannelUninstallOnExit {
cleanupFuncs.Add(func() {
for _, ep := range d.endpointManager.GetEndpoints() {
ep.DeleteBPFProgramLocked()
}
})
}

// Open or create BPF maps.
bootstrapStats.mapsInit.Start()
err = d.initMaps()
Expand Down Expand Up @@ -490,10 +504,10 @@ func NewDaemon(dp datapath.Datapath, iptablesManager rulesManager) (*Daemon, *en

// This needs to be done after the node addressing has been configured
// as the node address is required as suffix.
// well known identities have already been initialized above
// well known identities have already been initialized above.
// Ignore the channel returned by this function, as we want the global
// identity allocator to run asynchronously.
cache.InitIdentityAllocator(&d, k8s.CiliumClient(), nil)
d.identityAllocator.InitIdentityAllocator(k8s.CiliumClient(), nil)

d.bootstrapClusterMesh(nodeMngr)

Expand Down Expand Up @@ -547,11 +561,12 @@ func (d *Daemon) bootstrapClusterMesh(nodeMngr *nodemanager.Manager) {
} else {
log.WithField("path", path).Info("Initializing ClusterMesh routing")
clustermesh, err := clustermesh.NewClusterMesh(clustermesh.Configuration{
Name: "clustermesh",
ConfigDirectory: path,
NodeKeyCreator: nodeStore.KeyCreator,
ServiceMerger: &d.k8sSvcCache,
NodeManager: nodeMngr,
Name: "clustermesh",
ConfigDirectory: path,
NodeKeyCreator: nodeStore.KeyCreator,
ServiceMerger: &d.k8sSvcCache,
NodeManager: nodeMngr,
RemoteIdentityWatcher: d.identityAllocator,
})
if err != nil {
log.WithError(err).Fatal("Unable to initialize ClusterMesh")
Expand Down
5 changes: 2 additions & 3 deletions daemon/daemon_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/cilium/cilium/pkg/envoy"
"github.com/cilium/cilium/pkg/flowdebug"
"github.com/cilium/cilium/pkg/identity"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/k8s"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/labels"
Expand Down Expand Up @@ -925,7 +924,7 @@ func initEnv(cmd *cobra.Command) {

policy.SetPolicyEnabled(option.Config.EnablePolicy)

if err := cache.AddUserDefinedNumericIdentitySet(option.Config.FixedIdentityMapping); err != nil {
if err := identity.AddUserDefinedNumericIdentitySet(option.Config.FixedIdentityMapping); err != nil {
log.Fatalf("Invalid fixed identities provided: %s", err)
}

Expand Down Expand Up @@ -1381,7 +1380,7 @@ func (d *Daemon) instantiateAPI() *restapi.CiliumAPI {

// /identity/
api.PolicyGetIdentityHandler = newGetIdentityHandler(d)
api.PolicyGetIdentityIDHandler = newGetIdentityIDHandler(d)
api.PolicyGetIdentityIDHandler = newGetIdentityIDHandler(d.identityAllocator)

// /identity/endpoints
api.PolicyGetIdentityEndpointsHandler = newGetIdentityEndpointsIDHandler(d)
Expand Down
5 changes: 4 additions & 1 deletion daemon/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cilium/cilium/pkg/endpoint"
"github.com/cilium/cilium/pkg/endpointmanager"
"github.com/cilium/cilium/pkg/identity/cache"
"github.com/cilium/cilium/pkg/identity/identitymanager"
"github.com/cilium/cilium/pkg/kvstore"
"github.com/cilium/cilium/pkg/labels"
"github.com/cilium/cilium/pkg/lock"
Expand Down Expand Up @@ -144,7 +145,9 @@ func (ds *DaemonSuite) TearDownTest(c *C) {

// Release the identity allocator reference created by NewDaemon. This
// is done manually here as we have no Close() function daemon
cache.Close()
ds.d.identityAllocator.Close()

identitymanager.RemoveAll()

ds.d.Close()
}
Expand Down
4 changes: 2 additions & 2 deletions daemon/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (d *Daemon) createEndpoint(ctx context.Context, epTemplate *models.Endpoint
epTemplate.DatapathConfiguration.RequireRouting = &disabled
}

ep, err := endpoint.NewEndpointFromChangeModel(d, d.l7Proxy, epTemplate)
ep, err := endpoint.NewEndpointFromChangeModel(d, d.l7Proxy, d.identityAllocator, epTemplate)
if err != nil {
return invalidDataError(ep, fmt.Errorf("unable to parse endpoint parameters: %s", err))
}
Expand Down Expand Up @@ -362,7 +362,7 @@ func (h *patchEndpointID) Handle(params PatchEndpointIDParams) middleware.Respon

// Validate the template. Assignment afterwards is atomic.
// Note: newEp's labels are ignored.
newEp, err2 := endpoint.NewEndpointFromChangeModel(h.d, h.d.l7Proxy, epTemplate)
newEp, err2 := endpoint.NewEndpointFromChangeModel(h.d, h.d.l7Proxy, h.d.identityAllocator, epTemplate)
if err2 != nil {
return api.Error(PutEndpointIDInvalidCode, err2)
}
Expand Down
8 changes: 4 additions & 4 deletions daemon/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
metricErrorAllow = "allow"
)

func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSelector][]net.IP) (map[policyApi.FQDNSelector][]*identity.Identity, error) {
func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSelector][]net.IP, identityAllocator *secIDCache.CachingIdentityAllocator) (map[policyApi.FQDNSelector][]*identity.Identity, error) {
var err error

// Used to track identities which are allocated in calls to
Expand All @@ -82,7 +82,7 @@ func identitiesForFQDNSelectorIPs(selectorsWithIPsToUpdate map[policyApi.FQDNSel
}).Debug("getting identities for IPs associated with FQDNSelector")
var currentlyAllocatedIdentities []*identity.Identity
if currentlyAllocatedIdentities, err = ipcache.AllocateCIDRsForIPs(selectorIPs); err != nil {
secIDCache.ReleaseSlice(context.TODO(), nil, usedIdentities)
identityAllocator.ReleaseSlice(context.TODO(), nil, usedIdentities)
log.WithError(err).WithField("prefixes", selectorIPs).Warn(
"failed to allocate identities for IPs")
return nil, err
Expand Down Expand Up @@ -252,7 +252,7 @@ func (d *Daemon) bootstrapFQDN(restoredEndpoints *endpointRestoreState, preCache
func (d *Daemon) updateSelectors(ctx context.Context, selectorWithIPsToUpdate map[policyApi.FQDNSelector][]net.IP, selectorsWithoutIPs []policyApi.FQDNSelector) (wg *sync.WaitGroup, err error) {
// Convert set of selectors with IPs to update to set of selectors
// with identities corresponding to said IPs.
selectorsIdentities, err := identitiesForFQDNSelectorIPs(selectorWithIPsToUpdate)
selectorsIdentities, err := identitiesForFQDNSelectorIPs(selectorWithIPsToUpdate, d.identityAllocator)
if err != nil {
return &sync.WaitGroup{}, err
}
Expand Down Expand Up @@ -450,7 +450,7 @@ func (d *Daemon) notifyOnDNSMsg(lookupTime time.Time, ep *endpoint.Endpoint, epI
Port: uint16(serverPort),
}
} else if serverSecID, exists := ipcache.IPIdentityCache.LookupByIP(serverIP); exists {
secID := secIDCache.LookupIdentityByID(serverSecID.ID)
secID := d.identityAllocator.LookupIdentityByID(serverSecID.ID)
// TODO: handle IPv6
lr.LogRecord.DestinationEndpoint = accesslog.EndpointInfo{
IPv4: serverIP,
Expand Down
2 changes: 1 addition & 1 deletion daemon/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (d *Daemon) initHealth() {
if client == nil || err != nil {
var launchErr error
d.cleanupHealthEndpoint()
client, launchErr = health.LaunchAsEndpoint(ctx, d, &d.nodeDiscovery.LocalNode, d.mtuConfig, d.endpointManager, d.l7Proxy)
client, launchErr = health.LaunchAsEndpoint(ctx, d, &d.nodeDiscovery.LocalNode, d.mtuConfig, d.endpointManager, d.l7Proxy, d.identityAllocator)
if launchErr != nil {
if err != nil {
return fmt.Errorf("failed to restart endpoint (check failed: %q): %s", err, launchErr)
Expand Down
20 changes: 13 additions & 7 deletions daemon/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import (
"github.com/go-openapi/runtime/middleware"
)

type getIdentity struct{}
type getIdentity struct {
d *Daemon
}

func newGetIdentityHandler(d *Daemon) GetIdentityHandler { return &getIdentity{} }
func newGetIdentityHandler(d *Daemon) GetIdentityHandler { return &getIdentity{d: d} }

func (h *getIdentity) Handle(params GetIdentityParams) middleware.Responder {
log.WithField(logfields.Params, logfields.Repr(params)).Debug("GET /identity request")
Expand All @@ -37,9 +39,9 @@ func (h *getIdentity) Handle(params GetIdentityParams) middleware.Responder {
if params.Labels == nil {
// if labels is nil, return all identities from the kvstore
// This is in response to "identity list" command
identities = cache.GetIdentities()
identities = h.d.identityAllocator.GetIdentities()
} else {
identity := cache.LookupIdentity(labels.NewLabelsFromModel(params.Labels))
identity := h.d.identityAllocator.LookupIdentity(labels.NewLabelsFromModel(params.Labels))
if identity == nil {
return NewGetIdentityIDNotFound()
}
Expand All @@ -50,17 +52,21 @@ func (h *getIdentity) Handle(params GetIdentityParams) middleware.Responder {
return NewGetIdentityOK().WithPayload(identities)
}

type getIdentityID struct{}
type getIdentityID struct {
c *cache.CachingIdentityAllocator
}

func newGetIdentityIDHandler(d *Daemon) GetIdentityIDHandler { return &getIdentityID{} }
func newGetIdentityIDHandler(c *cache.CachingIdentityAllocator) GetIdentityIDHandler {
return &getIdentityID{c: c}
}

func (h *getIdentityID) Handle(params GetIdentityIDParams) middleware.Responder {
nid, err := identity.ParseNumericIdentity(params.ID)
if err != nil {
return NewGetIdentityIDBadRequest()
}

identity := cache.LookupIdentityByID(nid)
identity := h.c.LookupIdentityByID(nid)
if identity == nil {
return NewGetIdentityIDNotFound()
}
Expand Down
16 changes: 8 additions & 8 deletions daemon/k8s_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
},
},
},
repo: policy.NewPolicyRepository(),
repo: policy.NewPolicyRepository(nil),
}
},
setupWanted: func() wanted {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
r.AddList(api.Rules{
api.NewRule().
WithEndpointSelector(api.EndpointSelector{
Expand Down Expand Up @@ -107,7 +107,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
{
name: "have a rule with user labels and update it without user labels, all other rules should be deleted",
setupArgs: func() args {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
lbls := utils.GetPolicyLabels("production", "db", uuid, utils.ResourceTypeCiliumNetworkPolicy)
lbls = append(lbls, labels.ParseLabelArray("foo=bar")...).Sort()
r.AddList(api.Rules{
Expand Down Expand Up @@ -150,7 +150,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
}
},
setupWanted: func() wanted {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
r.AddList(api.Rules{
api.NewRule().
WithEndpointSelector(api.EndpointSelector{
Expand Down Expand Up @@ -179,7 +179,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
{
name: "have a rule without user labels and update it with user labels, all other rules should be deleted",
setupArgs: func() args {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
r.AddList(api.Rules{
{
EndpointSelector: api.EndpointSelector{
Expand Down Expand Up @@ -221,7 +221,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
}
},
setupWanted: func() wanted {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
lbls := utils.GetPolicyLabels("production", "db", uuid, utils.ResourceTypeCiliumNetworkPolicy)
lbls = append(lbls, labels.ParseLabelArray("foo=bar")...).Sort()
r.AddList(api.Rules{
Expand All @@ -247,7 +247,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
{
name: "have a rule policy installed with multiple rules and apply an empty spec should delete all rules installed",
setupArgs: func() args {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
r.AddList(api.Rules{
{
EndpointSelector: api.EndpointSelector{
Expand Down Expand Up @@ -292,7 +292,7 @@ func (ds *DaemonSuite) Test_addCiliumNetworkPolicyV2(c *C) {
}
},
setupWanted: func() wanted {
r := policy.NewPolicyRepository()
r := policy.NewPolicyRepository(nil)
r.AddList(api.Rules{})
return wanted{
err: nil,
Expand Down
Loading

0 comments on commit 83e576e

Please sign in to comment.