Skip to content

Commit

Permalink
Merge pull request grpc-ecosystem#200 from grpc-ecosystem/fix/bidi-ze…
Browse files Browse the repository at this point in the history
…ro-length

Avoid Internal Server Error on zero-length input for bidi streaming
  • Loading branch information
yugui authored Jul 27, 2016
2 parents a8f25bd + 67bfd71 commit c8ec92d
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 6 deletions.
7 changes: 5 additions & 2 deletions examples/examplepb/flow_combination.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions examples/examplepb/stream.pb.gw.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 114 additions & 0 deletions examples/integration_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"encoding/json"
"fmt"
"io"
Expand All @@ -9,6 +10,7 @@ import (
"reflect"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -152,6 +154,8 @@ func TestABE(t *testing.T) {
testABELookup(t)
testABELookupNotFound(t)
testABEList(t)
testABEBulkEcho(t)
testABEBulkEchoZeroLength(t)
testAdditionalBindings(t)
}

Expand Down Expand Up @@ -527,6 +531,116 @@ func testABEList(t *testing.T) {
}
}

func testABEBulkEcho(t *testing.T) {
reqr, reqw := io.Pipe()
var wg sync.WaitGroup
var want []*sub.StringMessage
wg.Add(1)
go func() {
defer wg.Done()
defer reqw.Close()
var m jsonpb.Marshaler
for i := 0; i < 1000; i++ {
msg := sub.StringMessage{Value: proto.String(fmt.Sprintf("message %d", i))}
buf, err := m.MarshalToString(&msg)
if err != nil {
t.Errorf("m.Marshal(%v) failed with %v; want success", &msg, err)
return
}
if _, err := fmt.Fprintln(reqw, buf); err != nil {
t.Errorf("fmt.Fprintln(reqw, %q) failed with %v; want success", buf, err)
return
}
want = append(want, &msg)
}
}()

url := "http://localhost:8080/v1/example/a_bit_of_everything/echo"
req, err := http.NewRequest("POST", url, reqr)
if err != nil {
t.Errorf("http.NewRequest(%q, %q, reqr) failed with %v; want success", "POST", url, err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Transfer-Encoding", "chunked")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("http.Post(%q, %q, req) failed with %v; want success", url, "application/json", err)
return
}
defer resp.Body.Close()
if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("resp.StatusCode = %d; want %d", got, want)
}

var got []*sub.StringMessage
wg.Add(1)
go func() {
defer wg.Done()

dec := json.NewDecoder(resp.Body)
for i := 0; ; i++ {
var item struct {
Result json.RawMessage `json:"result"`
Error map[string]interface{} `json:"error"`
}
err := dec.Decode(&item)
if err == io.EOF {
break
}
if err != nil {
t.Errorf("dec.Decode(&item) failed with %v; want success; i = %d", err, i)
}
if len(item.Error) != 0 {
t.Errorf("item.Error = %#v; want empty; i = %d", item.Error, i)
continue
}
var msg sub.StringMessage
if err := jsonpb.UnmarshalString(string(item.Result), &msg); err != nil {
t.Errorf("jsonpb.UnmarshalString(%q, &msg) failed with %v; want success", item.Result, err)
}
got = append(got, &msg)
}
}()

wg.Wait()
if !reflect.DeepEqual(got, want) {
t.Errorf("got = %v; want %v", got, want)
}
}

func testABEBulkEchoZeroLength(t *testing.T) {
url := "http://localhost:8080/v1/example/a_bit_of_everything/echo"
req, err := http.NewRequest("POST", url, bytes.NewReader(nil))
if err != nil {
t.Errorf("http.NewRequest(%q, %q, bytes.NewReader(nil)) failed with %v; want success", "POST", url, err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Transfer-Encoding", "chunked")
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("http.Post(%q, %q, req) failed with %v; want success", url, "application/json", err)
return
}
defer resp.Body.Close()
if got, want := resp.StatusCode, http.StatusOK; got != want {
t.Errorf("resp.StatusCode = %d; want %d", got, want)
}

dec := json.NewDecoder(resp.Body)
var item struct {
Result json.RawMessage `json:"result"`
Error map[string]interface{} `json:"error"`
}
if err := dec.Decode(&item); err == nil {
t.Errorf("dec.Decode(&item) succeeded; want io.EOF; item = %#v", item)
} else if err != io.EOF {
t.Errorf("dec.Decode(&item) failed with %v; want success", err)
return
}
}

func testAdditionalBindings(t *testing.T) {
for i, f := range []func() *http.Response{
func() *http.Response {
Expand Down
7 changes: 5 additions & 2 deletions protoc-gen-grpc-gateway/gengateway/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ var (
return nil
}
if err := handleSend(); err != nil {
if err := stream.CloseSend(); err != nil {
grpclog.Printf("Failed to terminate client stream: %v", err)
if cerr := stream.CloseSend(); cerr != nil {
grpclog.Printf("Failed to terminate client stream: %v", cerr)
}
if err == io.EOF {
return stream, metadata, nil
}
return nil, metadata, err
}
Expand Down

0 comments on commit c8ec92d

Please sign in to comment.