Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPIRE Agent] add telemetry around LRU cache entry operations #4335

Merged
Next Next commit
[SPIRE Agent] add telemetry around LRU cache entry operations
Signed-off-by: gordonhu7 <hu.gordon@hotmail.com>
  • Loading branch information
gordonhu7 committed Jul 11, 2023
commit 2270bc98a783f8ae65b11196d7c0f3eafc907dec
6 changes: 3 additions & 3 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type StaleEntry struct {
// Entry stale registration entry
Entry *common.RegistrationEntry
// SVIDs expiration time
ExpiresAt time.Time
SVIDExpiresAt time.Time
}

func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache {
Expand Down Expand Up @@ -429,8 +429,8 @@ func (c *Cache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ func TestGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[foo.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[foo.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down
27 changes: 25 additions & 2 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ import (
const (
DefaultSVIDCacheMaxSize = 1000
SVIDSyncInterval = 500 * time.Millisecond
SVIDMapSize = "lru_cache_svid_map_size"
RecordMapSize = "lru_cache_record_map_size"
EntryAdded = "lru_cache_entry_add"
EntryUpdated = "lru_cache_entry_update"
EntryRemoved = "lru_cache_entry_remove"
)

// Cache caches each registration entry, bundles, and JWT SVIDs for the agent.
Expand Down Expand Up @@ -174,6 +179,13 @@ func (c *LRUCache) CountSVIDs() int {
return len(c.svids)
}

func (c *LRUCache) CountRecords() int {
c.mu.RLock()
defer c.mu.RUnlock()

return len(c.records)
}

func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry {
set, setDone := allocSelectorSet(selectors...)
defer setDone()
Expand Down Expand Up @@ -214,6 +226,7 @@ func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber {
// updated through a call to UpdateSVIDs.
func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool) {
c.mu.Lock()
defer func() { c.metrics.SetGauge([]string{RecordMapSize}, float32(c.CountRecords())) }()
defer c.mu.Unlock()

// Remove bundles that no longer exist. The bundle for the agent trust
Expand Down Expand Up @@ -262,13 +275,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
fedRem, fedRemDone := allocStringSet()
defer fedRemDone()

entriesRemoved := 0
// Remove records for registration entries that no longer exist
for id, record := range c.records {
if _, ok := update.RegistrationEntries[id]; !ok {
c.log.WithFields(logrus.Fields{
telemetry.Entry: id,
telemetry.SPIFFEID: record.entry.SpiffeId,
}).Debug("Entry removed")
entriesRemoved++

// built a set of selectors for the record being removed, drop the
// record for each selector index, and add the entry selectors to
Expand All @@ -283,8 +298,11 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
delete(c.staleEntries, id)
}
}
c.metrics.IncrCounter([]string{EntryRemoved}, float32(entriesRemoved))
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved

outdatedEntries := make(map[string]struct{})
entriesUpdated := 0
entriesCreated := 0

// Add/update records for registration entries in the update
for _, newEntry := range update.RegistrationEntries {
Expand Down Expand Up @@ -366,11 +384,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
}
if existingEntry != nil {
log.Debug("Entry updated")
entriesUpdated++
} else {
log.Debug("Entry created")
entriesCreated++
}
}
}
c.metrics.IncrCounter([]string{EntryUpdated}, float32(entriesUpdated))
c.metrics.IncrCounter([]string{EntryAdded}, float32(entriesCreated))

// entries with active subscribers which are not cached will be put in staleEntries map;
// irrespective of what svid cache size as we cannot deny identity to a subscriber
Expand Down Expand Up @@ -422,6 +444,7 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R

func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
c.mu.Lock()
defer func() { c.metrics.SetGauge([]string{SVIDMapSize}, float32(c.CountSVIDs())) }()
defer c.mu.Unlock()

// Allocate a set of selectors that
Expand Down Expand Up @@ -471,8 +494,8 @@ func (c *LRUCache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
59 changes: 57 additions & 2 deletions pkg/agent/manager/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/proto/spire/common"
"github.com/spiffe/spire/test/clock"
"github.com/spiffe/spire/test/fakes/fakemetrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -102,6 +103,19 @@ func TestLRUCacheCountSVIDs(t *testing.T) {
require.Equal(t, 1, cache.CountSVIDs())
}

func TestLRUCacheCountRecords(t *testing.T) {
cache := newTestLRUCache(t)
// populate the cache with FOO and BAR without SVIDS
foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
cache.UpdateEntries(updateEntries, nil)
require.Equal(t, 2, cache.CountRecords())
}

func TestLRUCacheBundleChanges(t *testing.T) {
cache := newTestLRUCache(t)

Expand Down Expand Up @@ -564,8 +578,8 @@ func TestLRUCacheGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[bar.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[bar.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down Expand Up @@ -872,6 +886,47 @@ func TestSubscribeToLRUCacheChanges(t *testing.T) {
}
}

func TestMetrics(t *testing.T) {
cache := newTestLRUCache(t)
fakeMetrics := fakemetrics.New()
cache.metrics = fakeMetrics

foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
updateSVIDs := &UpdateSVIDs{
X509SVIDs: makeX509SVIDs(foo),
}
cache.UpdateEntries(updateEntries, nil)
cache.UpdateSVIDs(updateSVIDs)

fooUpdate := makeRegistrationEntry("FOO", "A", "B")
cache.UpdateEntries(&UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(fooUpdate),
}, nil)
cache.UpdateEntries(updateEntries, nil)

assert.Equal(t, []fakemetrics.MetricItem{
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{SVIDMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryUpdated}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{EntryAdded}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())
}

func TestNewLRUCache(t *testing.T) {
// negative value
cache := newTestLRUCacheWithConfig(-5, clock.NewMock(t))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
}

staleEntries = append(staleEntries, &cache.StaleEntry{
Entry: record.entry,
ExpiresAt: expiresAt,
Entry: record.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,8 @@ func TestGetStaleEntries(t *testing.T) {
Entry: barEntry,
},
{
Entry: fohEntry,
ExpiresAt: expiresAt,
Entry: fohEntry,
SVIDExpiresAt: expiresAt,
},
}
require.Equal(t, expectedStaleEntries, c.GetStaleEntries())
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVI
csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
CurrentSVIDExpiresAt: entry.SVIDExpiresAt,
})
}

Expand Down Expand Up @@ -158,7 +158,10 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U

privateKeys := make(map[string]crypto.Signer, len(csrs))
for _, csr := range csrs {
log := m.c.Log.WithField("spiffe_id", csr.SpiffeID)
log := m.c.Log.WithFields(logrus.Fields{
"spiffe_id": csr.SpiffeID,
"entry_id": csr.EntryID,
hugordon7 marked this conversation as resolved.
Show resolved Hide resolved
})
if !csr.CurrentSVIDExpiresAt.IsZero() {
log = log.WithField("expires_at", csr.CurrentSVIDExpiresAt.Format(time.RFC3339))
}
Expand Down