diff --git a/schema/google/showcase/v1beta1/sequence.proto b/schema/google/showcase/v1beta1/sequence.proto index ecd23b052..4c2a6bbe0 100644 --- a/schema/google/showcase/v1beta1/sequence.proto +++ b/schema/google/showcase/v1beta1/sequence.proto @@ -217,6 +217,7 @@ message AttemptSequenceRequest { (google.api.resource_reference).type = "showcase.googleapis.com/Sequence", (google.api.field_behavior) = REQUIRED ]; + } message AttemptStreamingSequenceRequest { @@ -224,7 +225,13 @@ message AttemptStreamingSequenceRequest { (google.api.resource_reference).type = "showcase.googleapis.com/StreamingSequence", (google.api.field_behavior) = REQUIRED ]; - + + // used to send the index of the last failed message + // in the string "content" of an AttemptStreamingSequenceResponse + // needed for stream resumption logic testing + int32 last_fail_index = 2 [ + (google.api.field_behavior) = OPTIONAL + ]; } // The response message for the Echo methods. diff --git a/server/services/sequence_service.go b/server/services/sequence_service.go index 08672affb..09736c3cc 100644 --- a/server/services/sequence_service.go +++ b/server/services/sequence_service.go @@ -202,6 +202,7 @@ func (s *sequenceServerImpl) CreateStreamingSequence(ctx context.Context, in *pb func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSequenceRequest, stream pb.SequenceService_AttemptStreamingSequenceServer) error { received := time.Now() name := in.GetName() + lastFailIndex := in.GetLastFailIndex() if name == "" { return status.Errorf( codes.InvalidArgument, @@ -241,7 +242,11 @@ func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSeq st = status.New(codes.OutOfRange, "Attempt exceeded predefined responses") } - for idx, word := range content { + if lastFailIndex < 0 { + lastFailIndex = 0 + } + + for idx, word := range content[lastFailIndex:] { if idx >= respIndex { break } @@ -282,7 +287,6 @@ func (s *sequenceServerImpl) AttemptStreamingSequence(in *pb.AttemptStreamingSeq AttemptDelay: attDelay, Status: st.Proto(), }) - return st.Err() } diff --git a/server/services/sequence_service_test.go b/server/services/sequence_service_test.go index 04a5e4a16..8675b2f0c 100644 --- a/server/services/sequence_service_test.go +++ b/server/services/sequence_service_test.go @@ -354,6 +354,73 @@ func TestStreamingSequenceRetry(t *testing.T) { } } +func TestStreamingSequenceWithLastFailIndex(t *testing.T) { + s := NewSequenceServer() + responses := []*pb.StreamingSequence_Response{ + { + Status: status.New(codes.Unavailable, "Unavailable").Proto(), + Delay: ptypes.DurationProto(1 * time.Second), + }, + { + Status: status.New(codes.Unavailable, "Unavailable").Proto(), + Delay: ptypes.DurationProto(2 * time.Second), + }, + { + Status: status.New(codes.OK, "OK").Proto(), + }, + } + + seq, err := s.CreateStreamingSequence(context.Background(), &pb.CreateStreamingSequenceRequest{ + StreamingSequence: &pb.StreamingSequence{Responses: responses, Content: "Hello World, nice to see you"}, + }) + if err != nil { + t.Errorf("CreateSequence(retry): unexpected err %+v", err) + } + + timeout := 5 * time.Second + _, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + delay := 100 * time.Millisecond + stream := &mockStreamSequence{} + + for n, r := range responses { + res := status.FromProto(r.GetStatus()) + // by passing the LastFailIndex as 3, we force the response to be the 3rd index of content, which is "to" + // the number of responses will still be the same though - the length of the sequence + err = s.AttemptStreamingSequence(&pb.AttemptStreamingSequenceRequest{Name: seq.GetName(), LastFailIndex: 3}, stream) + if c := status.Code(err); c != res.Code() { + t.Errorf("%s: status #%d was %v wanted %v", t.Name(), n, c, res.Code()) + } + + if n != len(responses)-1 { + time.Sleep(delay) + delay *= 2 + } + } + + r := streamingReport(seq.GetName()) + report, err := s.GetStreamingSequenceReport(context.Background(), &pb.GetStreamingSequenceReportRequest{Name: r}) + if err != nil { + t.Errorf("GetSequenceReport(retry): unexpected err %+v", err) + } + + attempts := report.GetAttempts() + if len(attempts) != len(responses) { + t.Errorf("%s: expected number of attempts to be %d but was %d", t.Name(), len(responses), len(attempts)) + } + + for n, a := range attempts { + if got, want := a.GetAttemptNumber(), int32(n); got != want { + t.Errorf("%s: expected attempt #%d but was #%d", t.Name(), want, got) + } + + if got, want := a.GetStatus().GetCode(), responses[n].GetStatus().GetCode(); got != want { + t.Errorf("%s: expected response %v but was %v", t.Name(), want, got) + } + } +} + func TestStreamingSequenceOutOfRange(t *testing.T) { s := NewSequenceServer()