Skip to content

Commit

Permalink
[SPIRE Agent] add telemetry around LRU cache entry operations (#4335)
Browse files Browse the repository at this point in the history
* [SPIRE Agent] add telemetry around LRU cache entry operations

Signed-off-by: gordonhu7 <hu.gordon@hotmail.com>
Co-authored-by: gordonhu7 <hu.gordon@hotmail.com>
Co-authored-by: Evan Gilman <evan@spirl.com>
  • Loading branch information
3 people authored Jul 20, 2023
1 parent 00909f2 commit a3d3cb8
Show file tree
Hide file tree
Showing 10 changed files with 152 additions and 15 deletions.
5 changes: 5 additions & 0 deletions doc/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ The following metrics are emitted:
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
Expand Down
6 changes: 3 additions & 3 deletions pkg/agent/manager/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type StaleEntry struct {
// Entry stale registration entry
Entry *common.RegistrationEntry
// SVIDs expiration time
ExpiresAt time.Time
SVIDExpiresAt time.Time
}

func New(log logrus.FieldLogger, trustDomain spiffeid.TrustDomain, bundle *Bundle, metrics telemetry.Metrics) *Cache {
Expand Down Expand Up @@ -429,8 +429,8 @@ func (c *Cache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,8 @@ func TestGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[foo.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[foo.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down
23 changes: 21 additions & 2 deletions pkg/agent/manager/cache/lru_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/spire/pkg/agent/common/backoff"
"github.com/spiffe/spire/pkg/common/telemetry"
agentmetrics "github.com/spiffe/spire/pkg/common/telemetry/agent"
"github.com/spiffe/spire/proto/spire/common"
)

Expand Down Expand Up @@ -174,6 +175,13 @@ func (c *LRUCache) CountSVIDs() int {
return len(c.svids)
}

func (c *LRUCache) CountRecords() int {
c.mu.RLock()
defer c.mu.RUnlock()

return len(c.records)
}

func (c *LRUCache) MatchingRegistrationEntries(selectors []*common.Selector) []*common.RegistrationEntry {
set, setDone := allocSelectorSet(selectors...)
defer setDone()
Expand Down Expand Up @@ -214,6 +222,7 @@ func (c *LRUCache) NewSubscriber(selectors []*common.Selector) Subscriber {
// updated through a call to UpdateSVIDs.
func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.RegistrationEntry, *common.RegistrationEntry, *X509SVID) bool) {
c.mu.Lock()
defer func() { agentmetrics.SetEntriesMapSize(c.metrics, c.CountRecords()) }()
defer c.mu.Unlock()

// Remove bundles that no longer exist. The bundle for the agent trust
Expand Down Expand Up @@ -262,13 +271,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
fedRem, fedRemDone := allocStringSet()
defer fedRemDone()

entriesRemoved := 0
// Remove records for registration entries that no longer exist
for id, record := range c.records {
if _, ok := update.RegistrationEntries[id]; !ok {
c.log.WithFields(logrus.Fields{
telemetry.Entry: id,
telemetry.SPIFFEID: record.entry.SpiffeId,
}).Debug("Entry removed")
entriesRemoved++

// built a set of selectors for the record being removed, drop the
// record for each selector index, and add the entry selectors to
Expand All @@ -283,8 +294,11 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
delete(c.staleEntries, id)
}
}
agentmetrics.IncrementEntriesRemoved(c.metrics, entriesRemoved)

outdatedEntries := make(map[string]struct{})
entriesUpdated := 0
entriesCreated := 0

// Add/update records for registration entries in the update
for _, newEntry := range update.RegistrationEntries {
Expand Down Expand Up @@ -366,11 +380,15 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R
}
if existingEntry != nil {
log.Debug("Entry updated")
entriesUpdated++
} else {
log.Debug("Entry created")
entriesCreated++
}
}
}
agentmetrics.IncrementEntriesAdded(c.metrics, entriesCreated)
agentmetrics.IncrementEntriesUpdated(c.metrics, entriesUpdated)

// entries with active subscribers which are not cached will be put in staleEntries map;
// irrespective of what svid cache size as we cannot deny identity to a subscriber
Expand Down Expand Up @@ -422,6 +440,7 @@ func (c *LRUCache) UpdateEntries(update *UpdateEntries, checkSVID func(*common.R

func (c *LRUCache) UpdateSVIDs(update *UpdateSVIDs) {
c.mu.Lock()
defer func() { agentmetrics.SetSVIDMapSize(c.metrics, c.CountSVIDs()) }()
defer c.mu.Unlock()

// Allocate a set of selectors that
Expand Down Expand Up @@ -471,8 +490,8 @@ func (c *LRUCache) GetStaleEntries() []*StaleEntry {
}

staleEntries = append(staleEntries, &StaleEntry{
Entry: cachedEntry.entry,
ExpiresAt: expiresAt,
Entry: cachedEntry.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
76 changes: 74 additions & 2 deletions pkg/agent/manager/cache/lru_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/proto/spire/common"
"github.com/spiffe/spire/test/clock"
"github.com/spiffe/spire/test/fakes/fakemetrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -102,6 +103,19 @@ func TestLRUCacheCountSVIDs(t *testing.T) {
require.Equal(t, 1, cache.CountSVIDs())
}

func TestLRUCacheCountRecords(t *testing.T) {
cache := newTestLRUCache(t)
// populate the cache with FOO and BAR without SVIDS
foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}
cache.UpdateEntries(updateEntries, nil)
require.Equal(t, 2, cache.CountRecords())
}

func TestLRUCacheBundleChanges(t *testing.T) {
cache := newTestLRUCache(t)

Expand Down Expand Up @@ -564,8 +578,8 @@ func TestLRUCacheGetStaleEntries(t *testing.T) {

// Assert that the entry again returns as stale. This time the `ExpiresAt` field should be populated with the expiration of the SVID.
expectedEntries = []*StaleEntry{{
Entry: cache.records[bar.EntryId].entry,
ExpiresAt: expiredAt,
Entry: cache.records[bar.EntryId].entry,
SVIDExpiresAt: expiredAt,
}}
assert.Equal(t, expectedEntries, cache.GetStaleEntries())

Expand Down Expand Up @@ -872,6 +886,64 @@ func TestSubscribeToLRUCacheChanges(t *testing.T) {
}
}

func TestMetrics(t *testing.T) {
cache := newTestLRUCache(t)
fakeMetrics := fakemetrics.New()
cache.metrics = fakeMetrics

foo := makeRegistrationEntry("FOO", "A")
bar := makeRegistrationEntry("BAR", "B")
updateEntries := &UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(foo, bar),
}

// add entries to cache
cache.UpdateEntries(updateEntries, nil)
assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())

// add SVIDs to cache
updateSVIDs := &UpdateSVIDs{
X509SVIDs: makeX509SVIDs(foo),
}
cache.UpdateSVIDs(updateSVIDs)
assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.SVIDMapSize}, Val: 1},
}, fakeMetrics.AllMetrics())

// update entries in cache
fooUpdate := makeRegistrationEntry("FOO", "A", "B")
cache.UpdateEntries(&UpdateEntries{
Bundles: makeBundles(bundleV1),
RegistrationEntries: makeRegistrationEntries(fooUpdate),
}, nil)
cache.UpdateEntries(updateEntries, nil)
assert.Equal(t, []fakemetrics.MetricItem{
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 2},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 0},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.SVIDMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryRemoved}, Val: 0},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryAdded}, Val: 1},
{Type: fakemetrics.IncrCounterType, Key: []string{telemetry.EntryUpdated}, Val: 1},
{Type: fakemetrics.SetGaugeType, Key: []string{telemetry.RecordMapSize}, Val: 2},
}, fakeMetrics.AllMetrics())
}

func TestNewLRUCache(t *testing.T) {
// negative value
cache := newTestLRUCacheWithConfig(-5, clock.NewMock(t))
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ func (c *Cache) GetStaleEntries() []*cache.StaleEntry {
}

staleEntries = append(staleEntries, &cache.StaleEntry{
Entry: record.entry,
ExpiresAt: expiresAt,
Entry: record.entry,
SVIDExpiresAt: expiresAt,
})
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/manager/storecache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1049,8 +1049,8 @@ func TestGetStaleEntries(t *testing.T) {
Entry: barEntry,
},
{
Entry: fohEntry,
ExpiresAt: expiresAt,
Entry: fohEntry,
SVIDExpiresAt: expiresAt,
},
}
require.Equal(t, expectedStaleEntries, c.GetStaleEntries())
Expand Down
7 changes: 5 additions & 2 deletions pkg/agent/manager/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (m *manager) updateSVIDs(ctx context.Context, log logrus.FieldLogger, c SVI
csrs = append(csrs, csrRequest{
EntryID: entry.Entry.EntryId,
SpiffeID: entry.Entry.SpiffeId,
CurrentSVIDExpiresAt: entry.ExpiresAt,
CurrentSVIDExpiresAt: entry.SVIDExpiresAt,
})
}

Expand Down Expand Up @@ -158,7 +158,10 @@ func (m *manager) fetchSVIDs(ctx context.Context, csrs []csrRequest) (_ *cache.U

privateKeys := make(map[string]crypto.Signer, len(csrs))
for _, csr := range csrs {
log := m.c.Log.WithField("spiffe_id", csr.SpiffeID)
log := m.c.Log.WithFields(logrus.Fields{
"spiffe_id": csr.SpiffeID,
"entry_id": csr.EntryID,
})
if !csr.CurrentSVIDExpiresAt.IsZero() {
log = log.WithField("expires_at", csr.CurrentSVIDExpiresAt.Format(time.RFC3339))
}
Expand Down
23 changes: 23 additions & 0 deletions pkg/common/telemetry/agent/lru.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package agent

import "github.com/spiffe/spire/pkg/common/telemetry"

func IncrementEntriesAdded(m telemetry.Metrics, entriesAdded int) {
m.IncrCounter([]string{telemetry.EntryAdded}, float32(entriesAdded))
}

func IncrementEntriesUpdated(m telemetry.Metrics, entriesUpdated int) {
m.IncrCounter([]string{telemetry.EntryUpdated}, float32(entriesUpdated))
}

func IncrementEntriesRemoved(m telemetry.Metrics, entriesRemoved int) {
m.IncrCounter([]string{telemetry.EntryRemoved}, float32(entriesRemoved))
}

func SetEntriesMapSize(m telemetry.Metrics, recordMapSize int) {
m.SetGauge([]string{telemetry.RecordMapSize}, float32(recordMapSize))
}

func SetSVIDMapSize(m telemetry.Metrics, svidMapSize int) {
m.SetGauge([]string{telemetry.SVIDMapSize}, float32(svidMapSize))
}
15 changes: 15 additions & 0 deletions pkg/common/telemetry/names.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,15 @@ const (
// ElapsedTime tags some duration of time.
ElapsedTime = "elapsed_time"

// EntryAdded is the counter key for when a entry is added to LRU cache
EntryAdded = "lru_cache_entry_add"

// EntryRemoved is the counter key for when a entry is removed from LRU cache
EntryRemoved = "lru_cache_entry_remove"

// EntryUpdated is the counter key for when an LRU cache entry is updated
EntryUpdated = "lru_cache_entry_update"

// EndpointSpiffeID tags endpoint SPIFFE ID
EndpointSpiffeID = "endpoint_spiffe_id"

Expand Down Expand Up @@ -415,6 +424,9 @@ const (
// ReceivedUID is like Received, specific to uid.
ReceivedUID = "received_uid"

// RecordMapSize is the gauge key to hold the size of the LRU cache entries map
RecordMapSize = "lru_cache_record_map_size"

// RefreshHint tags a bundle refresh hint
RefreshHint = "refresh_hint"

Expand Down Expand Up @@ -493,6 +505,9 @@ const (
// with other tags to add clarity
Subject = "subject"

// SVIDMapSize is the gauge key for the size of the LRU cache SVID map
SVIDMapSize = "lru_cache_svid_map_size"

// SVIDResponseLatency tags latency for SVID response
SVIDResponseLatency = "svid_response_latency"

Expand Down

0 comments on commit a3d3cb8

Please sign in to comment.