Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache and singleflight requests to kubelet #5408

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Add microcaching and merging of parallel requests to kubelet in the k…
…8s workload attestor

Adds a short lived cache for the responses from Kubelet reducing memory and CPU usage of the k8s workload attestor plugin.

Signed-off-by: Kevin Nisbet <kevin.nisbet+github@xybyte.com>
  • Loading branch information
knisbet committed Aug 25, 2024
commit d0d1228306e34c2779455e48de71969238ef40ef
119 changes: 105 additions & 14 deletions pkg/agent/plugin/workloadattestor/k8s/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/spiffe/spire/pkg/common/pemutil"
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/valyala/fastjson"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -178,15 +179,33 @@ type Plugin struct {
config *k8sConfig
containerHelper ContainerHelper
sigstoreVerifier sigstore.Verifier

cachedPodList map[string]*fastjson.Value
singleflight singleflight.Group

shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
shutdownWG sync.WaitGroup
}

func New() *Plugin {
ctx, cancel := context.WithCancel(context.Background())

return &Plugin{
clock: clock.New(),
getenv: os.Getenv,
clock: clock.New(),
getenv: os.Getenv,
shutdownCtx: ctx,
shutdownCtxCancel: cancel,
}
}

func (p *Plugin) Close() error {
p.shutdownCtxCancel()
p.shutdownWG.Wait()

return nil
}

func (p *Plugin) SetLogger(log hclog.Logger) {
p.log = log
}
Expand Down Expand Up @@ -219,22 +238,15 @@ func (p *Plugin) Attest(ctx context.Context, req *workloadattestorv1.AttestReque
for attempt := 1; ; attempt++ {
log = log.With(telemetry.Attempt, attempt)

podListBytes, err := config.Client.GetPodList()
podList, err := p.getPodList(ctx, config.Client)
if err != nil {
return nil, err
}

var parser fastjson.Parser
podList, err := parser.ParseBytes(podListBytes)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to parse kubelet response: %v", err)
}

var attestResponse *workloadattestorv1.AttestResponse
for _, podValue := range podList.GetArray("items") {
for podKey, podValue := range podList {
if podKnown {
uidBytes := podValue.Get("metadata", "uid").GetStringBytes()
if string(uidBytes) != string(podUID) {
if podKey != string(podUID) {
// The pod holding the container is known. Skip unrelated pods.
continue
}
Expand Down Expand Up @@ -430,6 +442,34 @@ func (p *Plugin) getConfig() (*k8sConfig, ContainerHelper, sigstore.Verifier, er
return p.config, p.containerHelper, p.sigstoreVerifier, nil
}

func (p *Plugin) setPodListCache(podList map[string]*fastjson.Value, expires time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()

p.cachedPodList = podList

p.shutdownWG.Add(1)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice, kicking off this goroutine without any soft of lifetime management should be ok given its current responsibilities, but in general we push for goroutine hygiene as much as possible. If we can, it would be good to make sure that this goroutine does not outlive the plugin, e.g. when the plugin is being unloaded.

One way to accomplish this is to:

  1. Implement io.Closer on *Plugin. The plugin framework will invoke Close() when the plugin is unloaded
  2. Add a wait group to the plugin that tracks the lifetime of this goroutine that close waits for

It might be overkill, considering this goroutine will only live at most ~250ms at the time of close, but bonus points, if you wanted to add a context that could be cancelled inside of Close to immediately bring it down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I should be able to post a fix this afternoon. I didn't bother for the same reasons you indicated, shouldn't take long to add in and test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the io.Closer and a context. I added the Context to the struct, but understand that can be debatable in different projects, happy to swap it out if there is a preference not to use a context within a struct in this project.

Also, I didn't link the Close function to wait for any in progress Attest calls to complete. I didn't see the behaviour in any other plugins, and I didn't deeply investigate whether the service is cancelling and waiting for any other in-progress Attest calls before unloading plugins. If an Attest call is able to still be in progress there is a small window where the cache will be saved and then immediately released. I'm happy to address if you'd like.

defer p.shutdownWG.Done()

select {
case <-p.clock.After(expires):
case <-p.shutdownCtx.Done():
}

p.mu.Lock()
defer p.mu.Unlock()
p.cachedPodList = nil
}()
}

func (p *Plugin) getPodListCache() map[string]*fastjson.Value {
p.mu.RLock()
defer p.mu.RUnlock()

return p.cachedPodList
}

func (p *Plugin) setContainerHelper(c ContainerHelper) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down Expand Up @@ -605,16 +645,67 @@ func (p *Plugin) getNodeName(name string, env string) string {
}
}

func (p *Plugin) getPodList(ctx context.Context, client *kubeletClient) (map[string]*fastjson.Value, error) {
result := p.getPodListCache()
if result != nil {
return result, nil
}

podList, err, _ := p.singleflight.Do("podList", func() (interface{}, error) {
result := p.getPodListCache()
if result != nil {
return result, nil
}

podListBytes, err := client.GetPodList(ctx)
if err != nil {
return nil, err
}

var parser fastjson.Parser
podList, err := parser.ParseBytes(podListBytes)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to parse kubelet response: %v", err)
}

items := podList.GetArray("items")
result = make(map[string]*fastjson.Value, len(items))

for _, podValue := range items {
uid := string(podValue.Get("metadata", "uid").GetStringBytes())

if uid == "" {
p.log.Warn("Pod has no UID", "pod", podValue)
continue
}

result[uid] = podValue
}

p.setPodListCache(result, p.config.PollRetryInterval/2)

return result, nil
})
if err != nil {
return nil, err
}

return podList.(map[string]*fastjson.Value), nil
}

type kubeletClient struct {
Transport *http.Transport
URL url.URL
Token string
}

func (c *kubeletClient) GetPodList() ([]byte, error) {
func (c *kubeletClient) GetPodList(ctx context.Context) ([]byte, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

url := c.URL
url.Path = "/pods"
req, err := http.NewRequest("GET", url.String(), nil)
req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil)
if err != nil {
return nil, status.Errorf(codes.Internal, "unable to create request: %v", err)
}
Expand Down
22 changes: 22 additions & 0 deletions pkg/agent/plugin/workloadattestor/k8s/k8s_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ func (s *Suite) SetupTest() {
}

func (s *Suite) TearDownTest() {
s.clock.Add(time.Minute)
s.setServer(nil)
os.RemoveAll(s.dir)
}
Expand All @@ -145,6 +146,7 @@ func (s *Suite) TestAttestWithPidInPodAfterRetry() {

resultCh := s.goAttest(p)

s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
Expand Down Expand Up @@ -173,6 +175,24 @@ func (s *Suite) TestAttestWithPidNotInPodCancelsEarly() {
s.Require().Nil(selectors)
}

func (s *Suite) TestAttestPodListCache() {
s.startInsecureKubelet()
p := s.loadInsecurePlugin()

s.addPodListResponse(podListFilePath)

s.requireAttestSuccessWithPod(p)
s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")

// The pod list is cached so we don't expect a request to kubelet
s.requireAttestSuccessWithPod(p)

// The cache expires after the clock advances by at least half the retry interval
s.clock.Add(time.Minute)
s.addPodListResponse(podListFilePath)
s.requireAttestSuccessWithPod(p)
}

func (s *Suite) TestAttestWithPidNotInPodAfterRetry() {
s.startInsecureKubelet()
p := s.loadInsecurePlugin()
Expand All @@ -185,6 +205,7 @@ func (s *Suite) TestAttestWithPidNotInPodAfterRetry() {

resultCh := s.goAttest(p)

s.clock.WaitForAfter(time.Minute, "waiting for cache expiry timer")
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
s.clock.Add(time.Second)
s.clock.WaitForAfter(time.Minute, "waiting for retry timer")
Expand Down Expand Up @@ -690,6 +711,7 @@ func (s *Suite) kubeletPort() int {
func (s *Suite) loadPlugin(configuration string) workloadattestor.WorkloadAttestor {
v1 := new(workloadattestor.V1)
p := s.newPlugin()

plugintest.Load(s.T(), builtin(p), v1,
plugintest.Configure(configuration),
)
Expand Down
Loading