Skip to content

Commit

Permalink
Remove retry of individual time series (istio#19896)
Browse files Browse the repository at this point in the history
* remove sd adapter retry

* only retry deadline exceed and unavailable.

* lint
  • Loading branch information
bianpengyuan authored and istio-testing committed Jan 16, 2020
1 parent 069f7d1 commit 89f44c8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 90 deletions.
32 changes: 4 additions & 28 deletions mixer/adapter/stackdriver/metric/bufferedClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,9 @@ func (b *buffered) Send() {
// We need to build framework level support for these kinds of async tasks. Perhaps a generic batching adapter
// can handle some of this complexity?
if err != nil {
ets := handleError(err, timeSeries)
b.updateRetryBuffer(ets)
if isRetryable(status.Code(err)) {
b.updateRetryBuffer(timeSeries)
}
b.l.Errorf("%d time series was sent and Stackdriver returned: %v\n", len(timeSeries), err) // nolint: errcheck
if isOutOfOrderError(err) {
b.l.Debugf("Given data: %v", timeSeries)
Expand All @@ -190,31 +191,6 @@ func (b *buffered) Close() error {
return b.closeMe.Close()
}

// handleError extract out timeseries that fails to create from response status.
// If no specific timeseries listed in error response, retry all time series in batch.
func handleError(err error, tsSent []*monitoringpb.TimeSeries) []*monitoringpb.TimeSeries {
errorTS := make([]*monitoringpb.TimeSeries, 0)
retryAll := true
s, ok := status.FromError(err)
if !ok {
return errorTS
}
sd := s.Details()
for _, i := range sd {
if t, ok := i.(*monitoringpb.CreateTimeSeriesError); ok {
retryAll = false
if !isRetryable(codes.Code(t.GetStatus().Code)) {
continue
}
errorTS = append(errorTS, t.GetTimeSeries())
}
}
if isRetryable(status.Code(err)) && retryAll {
errorTS = append(errorTS, tsSent...)
}
return errorTS
}

func (b *buffered) updateRetryBuffer(errorTS []*monitoringpb.TimeSeries) {
retryCounter := map[uint64]int{}
for _, ts := range errorTS {
Expand All @@ -239,7 +215,7 @@ func (b *buffered) updateRetryBuffer(errorTS []*monitoringpb.TimeSeries) {

func isRetryable(c codes.Code) bool {
switch c {
case codes.Canceled, codes.DeadlineExceeded, codes.ResourceExhausted, codes.Aborted, codes.Internal, codes.Unavailable:
case codes.DeadlineExceeded, codes.Unavailable:
return true
}
return false
Expand Down
69 changes: 7 additions & 62 deletions mixer/adapter/stackdriver/metric/bufferedClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ import (
"time"

"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
gax "github.com/googleapis/gax-go/v2"
xcontext "golang.org/x/net/context"
monitoring "google.golang.org/genproto/googleapis/monitoring/v3"
Expand Down Expand Up @@ -213,7 +211,7 @@ func TestBuffered_Close(t *testing.T) {
}

func createRetryPushFn(pushCount *int, expReqTS [][]*monitoring.TimeSeries, withError []bool, failedTS [][]*monitoring.TimeSeries,
errorCode [][]codes.Code, overallCode codes.Code, t *testing.T) pushFunc {
overallCode codes.Code, t *testing.T) pushFunc {
retryPushFn := func(ctx xcontext.Context, req *monitoring.CreateTimeSeriesRequest, opts ...gax.CallOption) error {
if len(expReqTS) != len(withError) || len(expReqTS) != len(failedTS) || len(expReqTS) <= *pushCount {
t.Errorf("args size does not match or potential out of bound. abort push operation %v %v %v %v",
Expand Down Expand Up @@ -249,24 +247,9 @@ func createRetryPushFn(pushCount *int, expReqTS [][]*monitoring.TimeSeries, with
return nil
}

details := make([]*any.Any, 0, len(failedTS[*pushCount]))
for i, fts := range failedTS[*pushCount] {
ec := codes.Canceled
if errorCode != nil {
ec = errorCode[*pushCount][i]
}
em, _ := ptypes.MarshalAny(&monitoring.CreateTimeSeriesError{
TimeSeries: fts,
Status: &grpcstatus.Status{
Code: int32(ec),
},
})
details = append(details, em)
}
return status.ErrorProto(&grpcstatus.Status{
Code: int32(overallCode),
Message: "A subset of time series had errors.",
Details: details,
})
}
return retryPushFn
Expand All @@ -283,23 +266,15 @@ func TestBuffered_Retry(t *testing.T) {
failedTS [][]*monitoring.TimeSeries
}{
{
name: "retry subset",
pushCount: 2,
requestTS: [][]*monitoring.TimeSeries{in, in[:2]},
withError: []bool{true, false},
overallCode: codes.Internal,
failedTS: [][]*monitoring.TimeSeries{in[:2], {}},
},
{
name: "retry all",
name: "retry",
pushCount: 2,
requestTS: [][]*monitoring.TimeSeries{in, in},
withError: []bool{true, false},
overallCode: codes.Internal,
overallCode: codes.Unavailable,
failedTS: [][]*monitoring.TimeSeries{{}, {}},
},
{
name: "do not retry all",
name: "do not retry",
pushCount: 1,
requestTS: [][]*monitoring.TimeSeries{in},
withError: []bool{true},
Expand All @@ -314,7 +289,7 @@ func TestBuffered_Retry(t *testing.T) {
env := test.NewEnv(t)
b := buffered{
l: env,
pushMetrics: createRetryPushFn(&pushCount, tt.requestTS, tt.withError, tt.failedTS, nil, tt.overallCode, t),
pushMetrics: createRetryPushFn(&pushCount, tt.requestTS, tt.withError, tt.failedTS, tt.overallCode, t),
timeSeriesBatchSize: 100,
retryLimit: 5,
retryBuffer: []*monitoring.TimeSeries{},
Expand Down Expand Up @@ -347,7 +322,7 @@ func TestBuffered_RetryCombine(t *testing.T) {
env := test.NewEnv(t)
b := buffered{
l: env,
pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, nil, codes.Internal, t),
pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, codes.Unavailable, t),
timeSeriesBatchSize: 100,
retryLimit: 5,
retryBuffer: []*monitoring.TimeSeries{},
Expand Down Expand Up @@ -383,7 +358,7 @@ func TestBuffered_RetryMaxAttempt(t *testing.T) {
}
b := buffered{
l: l,
pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, nil, codes.Internal, t),
pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, codes.Unavailable, t),
timeSeriesBatchSize: 100,
retryLimit: 2,
retryBuffer: []*monitoring.TimeSeries{},
Expand All @@ -403,33 +378,3 @@ func TestBuffered_RetryMaxAttempt(t *testing.T) {
t.Errorf("push call count is not expected, got %v want 3", pushCount)
}
}

func TestBuffered_IgnoreInvalidArguments(t *testing.T) {
in := []*monitoring.TimeSeries{makeTS(m1, mr1, 1, 1), makeTS(m2, mr2, 1, 1), makeTS(m3, mr3, 1, 1)}
pushCount := 0
l := test.NewEnv(t).Logger()
// Only the first two TS will be retried, the third one will be ignored since it is out of order.
requestTS := [][]*monitoring.TimeSeries{in, in[0:2]}
failedTS := [][]*monitoring.TimeSeries{in, {}}
withError := []bool{true, false}
errorMessage := [][]codes.Code{{codes.ResourceExhausted, codes.Internal, codes.InvalidArgument}}
b := buffered{
l: l,
pushMetrics: createRetryPushFn(&pushCount, requestTS, withError, failedTS, errorMessage, codes.Internal, t),
timeSeriesBatchSize: 100,
retryLimit: 5,
retryBuffer: []*monitoring.TimeSeries{},
retryCounter: map[uint64]int{},
pushInterval: 100 * time.Millisecond,
mergeTrigger: 1000,
mergedTS: make(map[uint64]*monitoring.TimeSeries),
}
b.Record(in)
b.mergeTimeSeries()
b.Send()

b.Send()
if pushCount != 2 {
t.Errorf("push call count is not expected, got %v want 2", pushCount)
}
}

0 comments on commit 89f44c8

Please sign in to comment.