Skip to content

Commit

Permalink
Log error detail in SDS request (istio#15930)
Browse files Browse the repository at this point in the history
* log SDS update failures reported by proxy

* add test

* revise

* fix test

* update

* revise

* update comment

* lint
  • Loading branch information
JimmyCYJ authored and istio-testing committed Aug 6, 2019
1 parent fd2091e commit 737bfb5
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 16 deletions.
10 changes: 9 additions & 1 deletion security/pkg/nodeagent/sds/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"istio.io/pkg/monitoring"
)

// TODO: Create an endpoint in Citadel agent to expose metrics.
// TODO(JimmyCYJ): Create an endpoint in Citadel agent to expose metrics.
// Previously exposed metrics: pending_push_per_connection, stale_conn_count_per_connection,
// pushes_per_connection, push_errors_per_connection, pushed_root_cert_expiry_timestamp, pushed_server_cert_expiry_timestamp
var (
Expand All @@ -45,6 +45,13 @@ var (
"total_stale_connections",
"The total number of stale SDS connections.",
)

// totalSecretUpdateFailureCounts records total number of secret update failures reported by
// proxy in SDS request <error_detail> field.
totalSecretUpdateFailureCounts = monitoring.NewSum(
"total_secret_update_failures",
"The total number of dynamic secret update failures reported by proxy.",
)
)

func init() {
Expand All @@ -53,5 +60,6 @@ func init() {
totalPushErrorCounts,
totalActiveConnCounts,
totalStaleConnCounts,
totalSecretUpdateFailureCounts,
)
}
21 changes: 13 additions & 8 deletions security/pkg/nodeagent/sds/sdsservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,24 +262,29 @@ func (s *sdsservice) StreamSecrets(stream sds.SecretDiscoveryService_StreamSecre
token = t
}

// Update metric for metrics.
// Update metrics.
totalActiveConnCounts.Increment()

if discReq.ErrorDetail != nil {
totalSecretUpdateFailureCounts.Increment()
}
// When nodeagent receives StreamSecrets request, if there is cached secret which matches
// request's <token, resourceName, Version>, then this request is a confirmation request.
// nodeagent stops sending response to envoy in this case.
if discReq.VersionInfo != "" && s.st.SecretExist(conID, resourceName, token, discReq.VersionInfo) {
sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, versionInfo %q\n",
conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo)
sdsServiceLog.Debugf("%s received SDS ACK from proxy %q, version info %q, "+
"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
discReq.ErrorDetail.GoString())
continue
}

if firstRequestFlag {
sdsServiceLog.Debugf("%s received first SDS request from proxy %q, versionInfo %q\n",
conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo)
sdsServiceLog.Debugf("%s received first SDS request from proxy %q, version info "+
"%q, error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
discReq.ErrorDetail.GoString())
} else {
sdsServiceLog.Debugf("%s received SDS request from proxy %q, versionInfo %q\n",
conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo)
sdsServiceLog.Debugf("%s received SDS request from proxy %q, version info %q, "+
"error details %s\n", conIDresourceNamePrefix, discReq.Node.Id, discReq.VersionInfo,
discReq.ErrorDetail.GoString())
}

// In ingress gateway agent mode, if the first SDS request is received but kubernetes secret is not ready,
Expand Down
185 changes: 178 additions & 7 deletions security/pkg/nodeagent/sds/sdsservice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,15 @@ import (
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
sds "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/gogo/protobuf/types"
"go.opencensus.io/stats/view"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"k8s.io/apimachinery/pkg/util/uuid"

rpc "istio.io/gogo-genproto/googleapis/google/rpc"
"istio.io/istio/security/pkg/nodeagent/cache"
"istio.io/istio/security/pkg/nodeagent/model"
"istio.io/istio/security/pkg/nodeagent/util"
)

var (
Expand Down Expand Up @@ -374,10 +375,10 @@ type notifyMsg struct {
func waitForNotificationToProceed(t *testing.T, notifyChan chan notifyMsg, proceedNotice string) {
for {
if notify := <-notifyChan; notify.Err != nil {
t.Errorf("get error from stream one: %v", notify.Message)
t.Errorf("get error from stream: %v", notify.Message)
} else {
if notify.Message != proceedNotice {
t.Errorf("push signal does not match, expected %s bug got %s", proceedNotice,
t.Errorf("push signal does not match, expected %s but got %s", proceedNotice,
notify.Message)
}
return
Expand Down Expand Up @@ -433,6 +434,12 @@ func TestStreamSecretsPush(t *testing.T) {
// lifetime, reset since it may be updated in other test case.
atomic.StoreInt64(&connectionNumber, 0)

initialTotalPush, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get initial value from metric totalPush: %v", err)
}
expectedTotalPush := 0

socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID()))
server, st := createSDSServer(t, socket)
defer server.Stop()
Expand All @@ -441,11 +448,13 @@ func TestStreamSecretsPush(t *testing.T) {
proxyID := "sidecar~127.0.0.1~SecretsPushStreamOne~local"
notifyChanOne := make(chan notifyMsg)
go testSDSStreamOne(streamOne, proxyID, notifyChanOne)
expectedTotalPush += 2

connTwo, streamTwo := createSDSStream(t, socket)
proxyIDTwo := "sidecar~127.0.0.1~SecretsPushStreamTwo~local"
notifyChanTwo := make(chan notifyMsg)
go testSDSStreamTwo(streamTwo, proxyIDTwo, notifyChanTwo)
expectedTotalPush++

waitForNotificationToProceed(t, notifyChanOne, "notify push secret")
// verify that the first SDS request sent by two streams do not hit cache.
Expand Down Expand Up @@ -507,6 +516,16 @@ func TestStreamSecretsPush(t *testing.T) {
if len(sdsClients) != 0 {
t.Fatalf("sdsClients, got %d, expected 0", len(sdsClients))
}

totalPushVal, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get value from metric totalPush: %v", err)
}
totalPushVal -= initialTotalPush
if totalPushVal != float64(expectedTotalPush) {
t.Errorf("unexpected metric totalPush: expected %v but got %v", expectedTotalPush,
totalPushVal)
}
}

func testSDSStreamMultiplePush(stream sds.SecretDiscoveryService_StreamSecretsClient, proxyID string,
Expand Down Expand Up @@ -546,6 +565,7 @@ func testSDSStreamMultiplePush(stream sds.SecretDiscoveryService_StreamSecretsCl
notifyChan <- notifyMsg{Err: errMisMatch, Message: errMisMatch.Error()}
}
}

notifyChan <- notifyMsg{Err: nil, Message: "close stream"}
}

Expand All @@ -555,6 +575,10 @@ func TestStreamSecretsMultiplePush(t *testing.T) {
// reset connectionNumber since since its value is kept in memory for all unit test cases lifetime, reset since it may be updated in other test case.
atomic.StoreInt64(&connectionNumber, 0)

initialTotalPush, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get initial value from metric totalPush: %v", err)
}
socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID()))
server, st := createSDSServer(t, socket)
defer server.Stop()
Expand All @@ -581,9 +605,156 @@ func TestStreamSecretsMultiplePush(t *testing.T) {
pushSecret); err != nil {
t.Fatalf("failed to send push notificiation to proxy %q", conID)
}

notifyChan <- notifyMsg{Err: nil, Message: "receive secret"}
conn.Close()
waitForNotificationToProceed(t, notifyChan, "close stream")

totalPushVal, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get value from metric totalPush: %v", err)
}
totalPushVal -= initialTotalPush
if totalPushVal != float64(1) {
t.Errorf("unexpected metric totalPush: expected 1 but got %v", totalPushVal)
}
}

func testSDSStreamUpdateFailures(stream sds.SecretDiscoveryService_StreamSecretsClient, proxyID string,
notifyChan chan notifyMsg) {
req := &api.DiscoveryRequest{
ResourceNames: []string{testResourceName},
Node: &core.Node{
Id: proxyID,
},
// Set a non-empty version info so that StreamSecrets() starts a cache check, and cache miss
// metric is updated accordingly.
VersionInfo: "initial_version",
}

// Send first request and
if err := stream.Send(req); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)}
}
resp, err := stream.Recv()
if err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)}
}
if err := verifySDSSResponse(resp, fakePrivateKey, fakeCertificateChain); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf(
"first SDS response verification failed: %v", err)}
}

// Send second request as an ACK and wait for notifyPush
req.VersionInfo = resp.VersionInfo
req.ResponseNonce = resp.Nonce
if err = stream.Send(req); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)}
}
notifyChan <- notifyMsg{Err: nil, Message: "notify push secret"}
if notify := <-notifyChan; notify.Message == "receive secret" {
resp, err = stream.Recv()
if err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)}
}
if err := verifySDSSResponse(resp, fakePushPrivateKey, fakePushCertificateChain); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf(
"second SDS response verification failed: %v", err)}
}
}

// Send third request and simulate the scenario that the second push causes secret update failure.
// The version info and response nonce in the third request should match the second request.
req.ErrorDetail = &rpc.Status{
Code: int32(rpc.INTERNAL),
Message: "fake error",
}
if err = stream.Send(req); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Send failed: %v", err)}
}
// The third request does not hit cache, so it is not on hold but replied with key/cert immediately.
resp, err = stream.Recv()
if err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf("stream.Recv failed: %v", err)}
}
if err := verifySDSSResponse(resp, fakePrivateKey, fakeCertificateChain); err != nil {
notifyChan <- notifyMsg{Err: err, Message: fmt.Sprintf(
"third SDS response verification failed: %v", err)}
}
notifyChan <- notifyMsg{Err: nil, Message: "close stream"}
}

// TestStreamSecretsUpdateFailures verifies that update failures reported by Envoy proxy is correctly
// recorded by metrics.
func TestStreamSecretsUpdateFailures(t *testing.T) {
// reset connectionNumber since since its value is kept in memory for all unit test cases lifetime,
// reset since it may be updated in other test case.
atomic.StoreInt64(&connectionNumber, 0)

socket := fmt.Sprintf("/tmp/gotest%s.sock", string(uuid.NewUUID()))
server, st := createSDSServer(t, socket)
defer server.Stop()

initialTotalPush, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get initial value from metric totalPush: %v", err)
}
initialTotalUpdateFailures, err := util.GetMetricsCounterValue("total_secret_update_failures")
if err != nil {
t.Errorf("Fail to get initial value from metric totalSecretUpdateFailureCounts: %v", err)
}

conn, stream := createSDSStream(t, socket)
proxyID := "sidecar~127.0.0.1~SecretsUpdateFailure~local"
notifyChan := make(chan notifyMsg)
go testSDSStreamUpdateFailures(stream, proxyID, notifyChan)

waitForNotificationToProceed(t, notifyChan, "notify push secret")
// verify that the first SDS request does not hit cache, and that the second SDS request hits cache.
waitForSecretCacheCheck(t, st, false, 1)
waitForSecretCacheCheck(t, st, true, 1)

// simulate logic in constructConnectionID() function.
conID := proxyID + "-1"
pushSecret := &model.SecretItem{
CertificateChain: fakePushCertificateChain,
PrivateKey: fakePushPrivateKey,
ResourceName: testResourceName,
Version: time.Now().String(),
}
// Test push new secret to proxy.
if err := NotifyProxy(cache.ConnKey{ConnectionID: conID, ResourceName: testResourceName},
pushSecret); err != nil {
t.Fatalf("failed to send push notificiation to proxy %q: %v", conID, err)
}
// load pushed secret into cache, this is needed to detect an ACK request.
st.secrets.Store(cache.ConnKey{ConnectionID: conID, ResourceName: testResourceName}, pushSecret)
notifyChan <- notifyMsg{Err: nil, Message: "receive secret"}

// verify that Envoy rejects previous push and send another SDS request with error info. This SDS
// request has original version info that does not match pushed secret in cache.
waitForSecretCacheCheck(t, st, false, 2)

waitForNotificationToProceed(t, notifyChan, "close stream")
conn.Close()

totalSecretUpdateFailureVal, err := util.GetMetricsCounterValue("total_secret_update_failures")
if err != nil {
t.Errorf("Fail to get value from metric totalSecretUpdateFailureCounts: %v", err)
}
totalSecretUpdateFailureVal -= initialTotalUpdateFailures
if totalSecretUpdateFailureVal != float64(1) {
t.Errorf("unexpected metric totalSecretUpdateFailureCounts: expected 1 but got %v",
totalSecretUpdateFailureVal)
}
totalPushVal, err := util.GetMetricsCounterValue("total_pushes")
if err != nil {
t.Errorf("Fail to get value from metric totalPush: %v", err)
}
totalPushVal -= initialTotalPush
if totalPushVal != float64(3) {
t.Errorf("unexpected metric totalPush: expected 3 but got %v", totalPushVal)
}
}

func verifySDSSResponse(resp *api.DiscoveryResponse, expectedPrivateKey []byte, expectedCertChain []byte) error {
Expand Down Expand Up @@ -855,11 +1026,11 @@ func checkStaledConnCount(t *testing.T) {
// Manually clear staled clients instead of waiting for ticker.
clearStaledClients()
metricName := "total_stale_connections"
rows, err := view.RetrieveData(metricName)
staleConnections, err := util.GetMetricsCounterValue(metricName)
if err != nil {
t.Errorf("unexpected error: %v", err.Error())
t.Errorf("Failed to get metric value for %s: %v", metricName, err)
}
if len(rows) == 1 && rows[0].Data.(*view.SumData).Value != 0 {
t.Errorf("expect %q to be 0, got %f", metricName, rows[0].Data.(*view.SumData).Value)
if staleConnections != float64(0) {
t.Errorf("expect %q to be 0, got %f", metricName, staleConnections)
}
}
19 changes: 19 additions & 0 deletions security/pkg/nodeagent/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"encoding/pem"
"fmt"
"time"

"go.opencensus.io/stats/view"
)

// parseCertAndGetExpiryTimestamp parses certificate and returns cert expire time, or return error
Expand All @@ -34,3 +36,20 @@ func ParseCertAndGetExpiryTimestamp(certByte []byte) (time.Time, error) {
}
return cert.NotAfter, nil
}

// GetMetricsCounterValue returns counter value in float64. For test purpose only.
func GetMetricsCounterValue(metricName string) (float64, error) {
rows, err := view.RetrieveData(metricName)
if err != nil {
return float64(0), err
}
if len(rows) == 0 {
return 0, nil
}
if len(rows) > 1 {
return float64(0), fmt.Errorf("unexpected number of data for view %s: %d",
metricName, len(rows))
}

return rows[0].Data.(*view.SumData).Value, nil
}

0 comments on commit 737bfb5

Please sign in to comment.