diff --git a/security/pkg/nodeagent/sds/monitoring.go b/security/pkg/nodeagent/sds/monitoring.go index 3f60ee797f71..252514b2c7e9 100644 --- a/security/pkg/nodeagent/sds/monitoring.go +++ b/security/pkg/nodeagent/sds/monitoring.go @@ -18,7 +18,7 @@ import ( "istio.io/pkg/monitoring" ) -// TODO: Create an endpoint in Citadel agent to expose metrics. +// TODO(JimmyCYJ): Create an endpoint in Citadel agent to expose metrics. // Previously exposed metrics: pending_push_per_connection, stale_conn_count_per_connection, // pushes_per_connection, push_errors_per_connection, pushed_root_cert_expiry_timestamp, pushed_server_cert_expiry_timestamp var ( @@ -45,6 +45,13 @@ var ( "total_stale_connections", "The total number of stale SDS connections.", ) + + // totalSecretUpdateFailureCounts records total number of secret update failures reported by + // proxy in SDS request field. + totalSecretUpdateFailureCounts = monitoring.NewSum( + "total_secret_update_failures", + "The total number of dynamic secret update failures reported by proxy.", + ) ) func init() { @@ -53,5 +60,6 @@ func init() { totalPushErrorCounts, totalActiveConnCounts, totalStaleConnCounts, + totalSecretUpdateFailureCounts, ) } diff --git a/security/pkg/nodeagent/sds/sdsservice.go b/security/pkg/nodeagent/sds/sdsservice.go index cbafe3466032..6836611882e2 100644 --- a/security/pkg/nodeagent/sds/sdsservice.go +++ b/security/pkg/nodeagent/sds/sdsservice.go @@ -262,24 +262,29 @@ func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecre token = t } - // Update metric for metrics. + // Update metrics. totalActiveConnCounts.Increment() - + if discReq.ErrorDetail != nil { + totalSecretUpdateFailureCounts.Increment() + } // When nodeagent receives StreamSecrets request, if there is cached secret which matches // request's , then this request is a confirmation request. // nodeagent stops sending response to envoy in this case. if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) { - sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, versionInfo %q\n", - conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo) + sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+ + "error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo, + discReq.ErrorDetail.GoString()) continue } if firstRequestFlag { - sdsServiceLog.Debugf("%s received first SDS request from proxy %q, versionInfo %q\n", - conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo) + sdsServiceLog.Debugf("%s received first SDS request from proxy %q, version info "+ + "%q, error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo, + discReq.ErrorDetail.GoString()) } else { - sdsServiceLog.Debugf("%s received SDS request from proxy %q, versionInfo %q\n", - conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo) + sdsServiceLog.Debugf("%s received SDS request from proxy %q, version info %q, "+ + "error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo, + discReq.ErrorDetail.GoString()) } // In ingress gateway agent mode, if the first SDS request is received but kubernetes secret is not ready, diff --git a/security/pkg/nodeagent/sds/sdsservice_test.go b/security/pkg/nodeagent/sds/sdsservice_test.go index cf5316f48d43..52982cced2d5 100644 --- a/security/pkg/nodeagent/sds/sdsservice_test.go +++ b/security/pkg/nodeagent/sds/sdsservice_test.go @@ -31,14 +31,15 @@ import ( "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" sds "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" "github.com/gogo/protobuf/types" - "go.opencensus.io/stats/view" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "k8s.io/apimachinery/pkg/util/uuid" + rpc "istio.io/gogo-genproto/googleapis/google/rpc" "istio.io/istio/security/pkg/nodeagent/cache" "istio.io/istio/security/pkg/nodeagent/model" + "istio.io/istio/security/pkg/nodeagent/util" ) var ( @@ -374,10 +375,10 @@ type notifyMsg struct { func waitForNotificationToProceed(t *testing.T, notifyChan chan notifyMsg, proceedNotice string) { for { if notify := <-notifyChan; notify.Err != nil { - t.Errorf("get error from stream one: %v", notify.Message) + t.Errorf("get error from stream: %v", notify.Message) } else { if notify.Message != proceedNotice { - t.Errorf("push signal does not match, expected %s bug got %s", proceedNotice, + t.Errorf("push signal does not match, expected %s but got %s", proceedNotice, notify.Message) } return @@ -433,6 +434,12 @@ func TestStreamSecretsPush(t *testing.T) { // lifetime, reset since it may be updated in other test case. atomic.StoreInt64(&connectionNumber, 0) + initialTotalPush, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get initial value from metric totalPush: %v", err) + } + expectedTotalPush := 0 + socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID())) server, st := createSDSServer(t, socket) defer server.Stop() @@ -441,11 +448,13 @@ func TestStreamSecretsPush(t *testing.T) { proxyID := "sidecar~127.0.0.1~SecretsPushStreamOne~local" notifyChanOne := make(chan notifyMsg) go testSDSStreamOne(streamOne, proxyID, notifyChanOne) + expectedTotalPush += 2 connTwo, streamTwo := createSDSStream(t, socket) proxyIDTwo := "sidecar~127.0.0.1~SecretsPushStreamTwo~local" notifyChanTwo := make(chan notifyMsg) go testSDSStreamTwo(streamTwo, proxyIDTwo, notifyChanTwo) + expectedTotalPush++ waitForNotificationToProceed(t, notifyChanOne, "notify push secret") // verify that the first SDS request sent by two streams do not hit cache. @@ -507,6 +516,16 @@ func TestStreamSecretsPush(t *testing.T) { if len(sdsClients) != 0 { t.Fatalf("sdsClients, got %d, expected 0", len(sdsClients)) } + + totalPushVal, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get value from metric totalPush: %v", err) + } + totalPushVal -= initialTotalPush + if totalPushVal != float64(expectedTotalPush) { + t.Errorf("unexpected metric totalPush: expected %v but got %v", expectedTotalPush, + totalPushVal) + } } func testSDSStreamMultiplePush(stream sds.SecretDiscoveryService_StreamSecretsClient, proxyID string, @@ -546,6 +565,7 @@ func testSDSStreamMultiplePush(stream sds.SecretDiscoveryService_StreamSecretsCl notifyChan <- notifyMsg{Err: errMisMatch, Message: errMisMatch.Error()} } } + notifyChan <- notifyMsg{Err: nil, Message: "close stream"} } @@ -555,6 +575,10 @@ func TestStreamSecretsMultiplePush(t *testing.T) { // reset connectionNumber since since its value is kept in memory for all unit test cases lifetime, reset since it may be updated in other test case. atomic.StoreInt64(&connectionNumber, 0) + initialTotalPush, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get initial value from metric totalPush: %v", err) + } socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID())) server, st := createSDSServer(t, socket) defer server.Stop() @@ -581,9 +605,156 @@ func TestStreamSecretsMultiplePush(t *testing.T) { pushSecret); err != nil { t.Fatalf("failed to send push notificiation to proxy %q", conID) } + notifyChan <- notifyMsg{Err: nil, Message: "receive secret"} conn.Close() waitForNotificationToProceed(t, notifyChan, "close stream") + + totalPushVal, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get value from metric totalPush: %v", err) + } + totalPushVal -= initialTotalPush + if totalPushVal != float64(1) { + t.Errorf("unexpected metric totalPush: expected 1 but got %v", totalPushVal) + } +} + +func testSDSStreamUpdateFailures(stream sds.SecretDiscoveryService_StreamSecretsClient, proxyID string, + notifyChan chan notifyMsg) { + req := &api.DiscoveryRequest{ + ResourceNames: []string{testResourceName}, + Node: &core.Node{ + Id: proxyID, + }, + // Set a non-empty version info so that StreamSecrets() starts a cache check, and cache miss + // metric is updated accordingly. + VersionInfo: "initial_version", + } + + // Send first request and + if err := stream.Send(req); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)} + } + resp, err := stream.Recv() + if err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)} + } + if err := verifySDSSResponse(resp, fakePrivateKey, fakeCertificateChain); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf( + "first SDS response verification failed: %v", err)} + } + + // Send second request as an ACK and wait for notifyPush + req.VersionInfo = resp.VersionInfo + req.ResponseNonce = resp.Nonce + if err = stream.Send(req); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)} + } + notifyChan <- notifyMsg{Err: nil, Message: "notify push secret"} + if notify := <-notifyChan; notify.Message == "receive secret" { + resp, err = stream.Recv() + if err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)} + } + if err := verifySDSSResponse(resp, fakePushPrivateKey, fakePushCertificateChain); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf( + "second SDS response verification failed: %v", err)} + } + } + + // Send third request and simulate the scenario that the second push causes secret update failure. + // The version info and response nonce in the third request should match the second request. + req.ErrorDetail = &rpc.Status{ + Code: int32(rpc.INTERNAL), + Message: "fake error", + } + if err = stream.Send(req); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)} + } + // The third request does not hit cache, so it is not on hold but replied with key/cert immediately. + resp, err = stream.Recv() + if err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)} + } + if err := verifySDSSResponse(resp, fakePrivateKey, fakeCertificateChain); err != nil { + notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf( + "third SDS response verification failed: %v", err)} + } + notifyChan <- notifyMsg{Err: nil, Message: "close stream"} +} + +// TestStreamSecretsUpdateFailures verifies that update failures reported by Envoy proxy is correctly +// recorded by metrics. +func TestStreamSecretsUpdateFailures(t *testing.T) { + // reset connectionNumber since since its value is kept in memory for all unit test cases lifetime, + // reset since it may be updated in other test case. + atomic.StoreInt64(&connectionNumber, 0) + + socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID())) + server, st := createSDSServer(t, socket) + defer server.Stop() + + initialTotalPush, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get initial value from metric totalPush: %v", err) + } + initialTotalUpdateFailures, err := util.GetMetricsCounterValue("total_secret_update_failures") + if err != nil { + t.Errorf("Fail to get initial value from metric totalSecretUpdateFailureCounts: %v", err) + } + + conn, stream := createSDSStream(t, socket) + proxyID := "sidecar~127.0.0.1~SecretsUpdateFailure~local" + notifyChan := make(chan notifyMsg) + go testSDSStreamUpdateFailures(stream, proxyID, notifyChan) + + waitForNotificationToProceed(t, notifyChan, "notify push secret") + // verify that the first SDS request does not hit cache, and that the second SDS request hits cache. + waitForSecretCacheCheck(t, st, false, 1) + waitForSecretCacheCheck(t, st, true, 1) + + // simulate logic in constructConnectionID() function. + conID := proxyID + "-1" + pushSecret := &model.SecretItem{ + CertificateChain: fakePushCertificateChain, + PrivateKey: fakePushPrivateKey, + ResourceName: testResourceName, + Version: time.Now().String(), + } + // Test push new secret to proxy. + if err := NotifyProxy(cache.ConnKey{ConnectionID: conID, ResourceName: testResourceName}, + pushSecret); err != nil { + t.Fatalf("failed to send push notificiation to proxy %q: %v", conID, err) + } + // load pushed secret into cache, this is needed to detect an ACK request. + st.secrets.Store(cache.ConnKey{ConnectionID: conID, ResourceName: testResourceName}, pushSecret) + notifyChan <- notifyMsg{Err: nil, Message: "receive secret"} + + // verify that Envoy rejects previous push and send another SDS request with error info. This SDS + // request has original version info that does not match pushed secret in cache. + waitForSecretCacheCheck(t, st, false, 2) + + waitForNotificationToProceed(t, notifyChan, "close stream") + conn.Close() + + totalSecretUpdateFailureVal, err := util.GetMetricsCounterValue("total_secret_update_failures") + if err != nil { + t.Errorf("Fail to get value from metric totalSecretUpdateFailureCounts: %v", err) + } + totalSecretUpdateFailureVal -= initialTotalUpdateFailures + if totalSecretUpdateFailureVal != float64(1) { + t.Errorf("unexpected metric totalSecretUpdateFailureCounts: expected 1 but got %v", + totalSecretUpdateFailureVal) + } + totalPushVal, err := util.GetMetricsCounterValue("total_pushes") + if err != nil { + t.Errorf("Fail to get value from metric totalPush: %v", err) + } + totalPushVal -= initialTotalPush + if totalPushVal != float64(3) { + t.Errorf("unexpected metric totalPush: expected 3 but got %v", totalPushVal) + } } func verifySDSSResponse(resp *api.DiscoveryResponse, expectedPrivateKey []byte, expectedCertChain []byte) error { @@ -855,11 +1026,11 @@ func checkStaledConnCount(t *testing.T) { // Manually clear staled clients instead of waiting for ticker. clearStaledClients() metricName := "total_stale_connections" - rows, err := view.RetrieveData(metricName) + staleConnections, err := util.GetMetricsCounterValue(metricName) if err != nil { - t.Errorf("unexpected error: %v", err.Error()) + t.Errorf("Failed to get metric value for %s: %v", metricName, err) } - if len(rows) == 1 && rows[0].Data.(*view.SumData).Value != 0 { - t.Errorf("expect %q to be 0, got %f", metricName, rows[0].Data.(*view.SumData).Value) + if staleConnections != float64(0) { + t.Errorf("expect %q to be 0, got %f", metricName, staleConnections) } } diff --git a/security/pkg/nodeagent/util/util.go b/security/pkg/nodeagent/util/util.go index c5fd94c11c51..80622ca93f90 100644 --- a/security/pkg/nodeagent/util/util.go +++ b/security/pkg/nodeagent/util/util.go @@ -19,6 +19,8 @@ import ( "encoding/pem" "fmt" "time" + + "go.opencensus.io/stats/view" ) // parseCertAndGetExpiryTimestamp parses certificate and returns cert expire time, or return error @@ -34,3 +36,20 @@ func ParseCertAndGetExpiryTimestamp(certByte []byte) (time.Time, error) { } return cert.NotAfter, nil } + +// GetMetricsCounterValue returns counter value in float64. For test purpose only. +func GetMetricsCounterValue(metricName string) (float64, error) { + rows, err := view.RetrieveData(metricName) + if err != nil { + return float64(0), err + } + if len(rows) == 0 { + return 0, nil + } + if len(rows) > 1 { + return float64(0), fmt.Errorf("unexpected number of data for view %s: %d", + metricName, len(rows)) + } + + return rows[0].Data.(*view.SumData).Value, nil +}