From 89f44c8ed8afcd24c792e7d97c7a3b982127678a Mon Sep 17 00:00:00 2001 From: Pengyuan Bian Date: Wed, 15 Jan 2020 23:09:37 -0800 Subject: [PATCH] Remove retry of individual time series (#19896) * remove sd adapter retry * only retry deadline exceed and unavailable. * lint --- .../stackdriver/metric/bufferedClient.go | 32 ++------- .../stackdriver/metric/bufferedClient_test.go | 69 ++----------------- 2 files changed, 11 insertions(+), 90 deletions(-) diff --git a/mixer/adapter/stackdriver/metric/bufferedClient.go b/mixer/adapter/stackdriver/metric/bufferedClient.go index 9b99653087d5..d98e7c0d2263 100644 --- a/mixer/adapter/stackdriver/metric/bufferedClient.go +++ b/mixer/adapter/stackdriver/metric/bufferedClient.go @@ -165,8 +165,9 @@ func (b *buffered) Send() { // We need to build framework level support for these kinds of async tasks. Perhaps a generic batching adapter // can handle some of this complexity? if err != nil { - ets := handleError(err, timeSeries) - b.updateRetryBuffer(ets) + if isRetryable(status.Code(err)) { + b.updateRetryBuffer(timeSeries) + } b.l.Errorf("%d time series was sent and Stackdriver returned: %v\n", len(timeSeries), err) // nolint: errcheck if isOutOfOrderError(err) { b.l.Debugf("Given data: %v", timeSeries) @@ -190,31 +191,6 @@ func (b *buffered) Close() error { return b.closeMe.Close() } -// handleError extract out timeseries that fails to create from response status. -// If no specific timeseries listed in error response, retry all time series in batch. -func handleError(err error, tsSent []*monitoringpb.TimeSeries) []*monitoringpb.TimeSeries { - errorTS := make([]*monitoringpb.TimeSeries, 0) - retryAll := true - s, ok := status.FromError(err) - if !ok { - return errorTS - } - sd := s.Details() - for _, i := range sd { - if t, ok := i.(*monitoringpb.CreateTimeSeriesError); ok { - retryAll = false - if !isRetryable(codes.Code(t.GetStatus().Code)) { - continue - } - errorTS = append(errorTS, t.GetTimeSeries()) - } - } - if isRetryable(status.Code(err)) && retryAll { - errorTS = append(errorTS, tsSent...) - } - return errorTS -} - func (b *buffered) updateRetryBuffer(errorTS []*monitoringpb.TimeSeries) { retryCounter := map[uint64]int{} for _, ts := range errorTS { @@ -239,7 +215,7 @@ func (b *buffered) updateRetryBuffer(errorTS []*monitoringpb.TimeSeries) { func isRetryable(c codes.Code) bool { switch c { - case codes.Canceled, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted, codes.Internal, codes.Unavailable: + case codes.DeadlineExceeded, codes.Unavailable: return true } return false diff --git a/mixer/adapter/stackdriver/metric/bufferedClient_test.go b/mixer/adapter/stackdriver/metric/bufferedClient_test.go index 48d1d3500e98..351567ad8bc3 100644 --- a/mixer/adapter/stackdriver/metric/bufferedClient_test.go +++ b/mixer/adapter/stackdriver/metric/bufferedClient_test.go @@ -22,8 +22,6 @@ import ( "time" "github.com/gogo/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/golang/protobuf/ptypes/any" gax "github.com/googleapis/gax-go/v2" xcontext "golang.org/x/net/context" monitoring "google.golang.org/genproto/googleapis/monitoring/v3" @@ -213,7 +211,7 @@ func TestBuffered_Close(t *testing.T) { } func createRetryPushFn(pushCount *int, expReqTS [][]*monitoring.TimeSeries, withError []bool, failedTS [][]*monitoring.TimeSeries, - errorCode [][]codes.Code, overallCode codes.Code, t *testing.T) pushFunc { + overallCode codes.Code, t *testing.T) pushFunc { retryPushFn := func(ctx xcontext.Context, req *monitoring.CreateTimeSeriesRequest, opts ...gax.CallOption) error { if len(expReqTS) != len(withError) || len(expReqTS) != len(failedTS) || len(expReqTS) <= *pushCount { t.Errorf("args size does not match or potential out of bound. abort push operation %v %v %v %v", @@ -249,24 +247,9 @@ func createRetryPushFn(pushCount *int, expReqTS [][]*monitoring.TimeSeries, with return nil } - details := make([]*any.Any, 0, len(failedTS[*pushCount])) - for i, fts := range failedTS[*pushCount] { - ec := codes.Canceled - if errorCode != nil { - ec = errorCode[*pushCount][i] - } - em, _ := ptypes.MarshalAny(&monitoring.CreateTimeSeriesError{ - TimeSeries: fts, - Status: &grpcstatus.Status{ - Code: int32(ec), - }, - }) - details = append(details, em) - } return status.ErrorProto(&grpcstatus.Status{ Code: int32(overallCode), Message: "A subset of time series had errors.", - Details: details, }) } return retryPushFn @@ -283,23 +266,15 @@ func TestBuffered_Retry(t *testing.T) { failedTS [][]*monitoring.TimeSeries }{ { - name: "retry subset", - pushCount: 2, - requestTS: [][]*monitoring.TimeSeries{in, in[:2]}, - withError: []bool{true, false}, - overallCode: codes.Internal, - failedTS: [][]*monitoring.TimeSeries{in[:2], {}}, - }, - { - name: "retry all", + name: "retry", pushCount: 2, requestTS: [][]*monitoring.TimeSeries{in, in}, withError: []bool{true, false}, - overallCode: codes.Internal, + overallCode: codes.Unavailable, failedTS: [][]*monitoring.TimeSeries{{}, {}}, }, { - name: "do not retry all", + name: "do not retry", pushCount: 1, requestTS: [][]*monitoring.TimeSeries{in}, withError: []bool{true}, @@ -314,7 +289,7 @@ func TestBuffered_Retry(t *testing.T) { env := test.NewEnv(t) b := buffered{ l: env, - pushMetrics: createRetryPushFn(&pushCount, tt.requestTS, tt.withError, tt.failedTS, nil, tt.overallCode, t), + pushMetrics: createRetryPushFn(&pushCount, tt.requestTS, tt.withError, tt.failedTS, tt.overallCode, t), timeSeriesBatchSize: 100, retryLimit: 5, retryBuffer: []*monitoring.TimeSeries{}, @@ -347,7 +322,7 @@ func TestBuffered_RetryCombine(t *testing.T) { env := test.NewEnv(t) b := buffered{ l: env, - pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, nil, codes.Internal, t), + pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, codes.Unavailable, t), timeSeriesBatchSize: 100, retryLimit: 5, retryBuffer: []*monitoring.TimeSeries{}, @@ -383,7 +358,7 @@ func TestBuffered_RetryMaxAttempt(t *testing.T) { } b := buffered{ l: l, - pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, nil, codes.Internal, t), + pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, codes.Unavailable, t), timeSeriesBatchSize: 100, retryLimit: 2, retryBuffer: []*monitoring.TimeSeries{}, @@ -403,33 +378,3 @@ func TestBuffered_RetryMaxAttempt(t *testing.T) { t.Errorf("push call count is not expected, got %v want 3", pushCount) } } - -func TestBuffered_IgnoreInvalidArguments(t *testing.T) { - in := []*monitoring.TimeSeries{makeTS(m1, mr1, 1, 1), makeTS(m2, mr2, 1, 1), makeTS(m3, mr3, 1, 1)} - pushCount := 0 - l := test.NewEnv(t).Logger() - // Only the first two TS will be retried, the third one will be ignored since it is out of order. - requestTS := [][]*monitoring.TimeSeries{in, in[0:2]} - failedTS := [][]*monitoring.TimeSeries{in, {}} - withError := []bool{true, false} - errorMessage := [][]codes.Code{{codes.ResourceExhausted, codes.Internal, codes.InvalidArgument}} - b := buffered{ - l: l, - pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, errorMessage, codes.Internal, t), - timeSeriesBatchSize: 100, - retryLimit: 5, - retryBuffer: []*monitoring.TimeSeries{}, - retryCounter: map[uint64]int{}, - pushInterval: 100 * time.Millisecond, - mergeTrigger: 1000, - mergedTS: make(map[uint64]*monitoring.TimeSeries), - } - b.Record(in) - b.mergeTimeSeries() - b.Send() - - b.Send() - if pushCount != 2 { - t.Errorf("push call count is not expected, got %v want 2", pushCount) - } -}