Skip to content
This repository was archived by the owner on May 29, 2018. It is now read-only.

Commit 05edc67

Browse files
committed
Append/Follow
1 parent 3d737e8 commit 05edc67

File tree

6 files changed

+310
-182
lines changed

6 files changed

+310
-182
lines changed

client.go

Lines changed: 51 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"bytes"
55
"errors"
66
"fmt"
7-
"github.com/garyburd/go-websocket/websocket"
7+
"github.com/sqs/go-websocket/websocket"
88
"io"
99
"net"
1010
"net/http"
@@ -13,55 +13,47 @@ import (
1313
"time"
1414
)
1515

16-
func Get(u *url.URL) (io.ReadCloser, error) {
17-
req, err := http.NewRequest("HEAD", u.String(), nil)
18-
if err != nil {
19-
return nil, err
16+
func Follow(u *url.URL) (io.ReadCloser, error) {
17+
ws, resp, err := newClient(u, "FOLLOW")
18+
if err == websocket.ErrBadHandshake {
19+
err = errorFromResponse(resp, nil)
2020
}
21-
resp, err := http.DefaultClient.Do(req)
2221
if err != nil {
2322
return nil, err
2423
}
25-
if resp.StatusCode != http.StatusOK {
26-
switch resp.StatusCode {
27-
case http.StatusNotFound:
28-
return nil, os.ErrNotExist
29-
default:
30-
return nil, fmt.Errorf("head failed with status %d", resp.StatusCode)
31-
}
32-
}
33-
34-
ws, err := newClient(u, "GET")
35-
if err != nil {
36-
return nil, err
24+
if resp.StatusCode == http.StatusOK {
25+
return resp.Body, nil
3726
}
3827

39-
op, r, err := ws.NextReader()
40-
if err != nil {
41-
return nil, err
42-
}
43-
if op != websocket.OpText {
44-
return nil, errors.New("websocket op is not text")
45-
}
46-
47-
return &webSocketReadCloser{r, ws}, nil
28+
return &webSocketReadCloser{ws}, nil
4829
}
4930

5031
type webSocketReadCloser struct {
51-
io.Reader
5232
ws *websocket.Conn
5333
}
5434

5535
func (r *webSocketReadCloser) Read(p []byte) (n int, err error) {
56-
return r.Reader.Read(p)
36+
op, rdr, err := r.ws.NextReader()
37+
if err != nil {
38+
return 0, err
39+
}
40+
if op != websocket.OpText {
41+
return 0, errors.New("websocket op is not text")
42+
}
43+
n, err = rdr.Read(p)
44+
45+
if err == io.EOF {
46+
return r.Read(p)
47+
}
48+
return
5749
}
5850

5951
func (r *webSocketReadCloser) Close() error {
6052
return r.ws.Close()
6153
}
6254

63-
func Put(u *url.URL, r io.Reader) error {
64-
w, err := OpenPut(u)
55+
func Append(u *url.URL, r io.Reader) error {
56+
w, err := OpenAppend(u)
6557
if err != nil {
6658
return err
6759
}
@@ -71,36 +63,24 @@ func Put(u *url.URL, r io.Reader) error {
7163
return err
7264
}
7365

74-
func OpenPut(u *url.URL) (io.WriteCloser, error) {
75-
req, err := http.NewRequest("HEAD", u.String(), nil)
76-
if err != nil {
77-
return nil, err
78-
}
79-
resp, err := http.DefaultClient.Do(req)
66+
func OpenAppend(u *url.URL) (io.WriteCloser, error) {
67+
ws, resp, err := newClient(u, "APPEND")
8068
if err != nil {
81-
return nil, err
82-
}
83-
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
84-
switch resp.StatusCode {
85-
default:
86-
return nil, fmt.Errorf("head failed with status %d", resp.StatusCode)
69+
if err == websocket.ErrBadHandshake {
70+
return nil, errorFromResponse(resp, nil)
8771
}
88-
}
89-
90-
ws, err := newClient(u, "PUT")
91-
if err != nil {
9272
return nil, err
9373
}
9474

95-
return &putWriteCloser{new(bytes.Buffer), ws}, nil
75+
return &appendWriteCloser{new(bytes.Buffer), ws}, nil
9676
}
9777

98-
type putWriteCloser struct {
78+
type appendWriteCloser struct {
9979
io.Writer
10080
ws *websocket.Conn
10181
}
10282

103-
func (pw *putWriteCloser) Write(p []byte) (n int, err error) {
83+
func (pw *appendWriteCloser) Write(p []byte) (n int, err error) {
10484
pw.ws.SetWriteDeadline(time.Now().Add(writeWait))
10585
w, err := pw.ws.NextWriter(websocket.OpText)
10686
if err != nil {
@@ -110,15 +90,31 @@ func (pw *putWriteCloser) Write(p []byte) (n int, err error) {
11090
return w.Write(p)
11191
}
11292

113-
func (pw *putWriteCloser) Close() error {
93+
func (pw *appendWriteCloser) Close() error {
11494
return pw.ws.Close()
11595
}
11696

117-
func newClient(u *url.URL, method string) (*websocket.Conn, error) {
97+
func newClient(u *url.URL, method string) (*websocket.Conn, *http.Response, error) {
11898
c, err := net.Dial("tcp", u.Host)
11999
if err != nil {
120-
return nil, err
100+
return nil, nil, err
101+
}
102+
return websocket.NewClient(c, u, http.Header{xVerb: []string{method}}, readBufSize, writeBufSize)
103+
}
104+
105+
// errorFromResponse returns err if err != nil, or another non-nil error if resp
106+
// indicates a non-HTTP 200 response.
107+
func errorFromResponse(resp *http.Response, err error) error {
108+
if err != nil {
109+
return err
110+
}
111+
if resp.StatusCode != http.StatusOK {
112+
switch resp.StatusCode {
113+
case http.StatusNotFound:
114+
return os.ErrNotExist
115+
default:
116+
return fmt.Errorf("HTTP status %d", resp.StatusCode)
117+
}
121118
}
122-
ws, _, err := websocket.NewClient(c, u, http.Header{xMethod: []string{method}}, readBufSize, writeBufSize)
123-
return ws, err
119+
return nil
124120
}

client_test.go

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,36 +5,40 @@ import (
55
"io"
66
"net/url"
77
"os"
8+
"path/filepath"
89
"testing"
910
"time"
1011
)
1112

12-
type getTest struct {
13+
type followTest struct {
1314
path string
1415
body string
1516
writeFiles map[string]string
1617
err error
1718
}
1819

19-
func TestGet(t *testing.T) {
20-
tests := []getTest{
20+
func TestFollow(t *testing.T) {
21+
t.Parallel()
22+
23+
tests := []followTest{
2124
{path: "/foo1", body: "bar", writeFiles: map[string]string{"/foo1": "bar"}},
2225

2326
{path: "/doesntexist", err: os.ErrNotExist},
2427
}
2528
for _, test := range tests {
26-
testGet(t, test)
29+
testFollow(t, test)
2730
}
2831
}
2932

30-
func testGet(t *testing.T, test getTest) {
33+
func testFollow(t *testing.T, test followTest) {
3134
label := test.path
3235

3336
server := newTestServer()
3437
defer server.close()
3538

3639
for path, data := range test.writeFiles {
37-
w, err := server.fs.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL)
40+
path = filepath.Join(server.dir, path)
41+
w, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC|os.O_EXCL, 0644)
3842
if err != nil {
3943
t.Fatalf("%s: fs.WriterOpen: %s", label, err)
4044
}
@@ -49,12 +53,12 @@ func testGet(t *testing.T, test getTest) {
4953
}
5054

5155
u, _ := url.Parse(server.URL + test.path)
52-
r, err := Get(u)
56+
r, err := Follow(u)
5357
if err == nil {
5458
defer r.Close()
5559
}
5660
if test.err != err {
57-
t.Errorf("%s: Get: want error %v, got %v", label, test.err, err)
61+
t.Errorf("%s: Follow: want error %v, got %v", label, test.err, err)
5862
return
5963
}
6064
if test.err != nil {
@@ -67,53 +71,57 @@ func testGet(t *testing.T, test getTest) {
6771
}
6872
}
6973

70-
type putTest struct {
74+
type appendTest struct {
7175
path string
7276
data io.Reader
7377
fileData string
7478
err error
7579
}
7680

77-
func TestPut(t *testing.T) {
78-
tests := []putTest{
81+
func TestAppend(t *testing.T) {
82+
t.Parallel()
83+
84+
tests := []appendTest{
7985
{path: "/foo1", data: bytes.NewReader([]byte("bar")), fileData: "bar"},
8086
{path: "/foo2", data: bytes.NewBuffer([]byte("bar")), fileData: "bar"},
81-
{path: "/foo3", data: &slowReader{R: &fixedReader{R: bytes.NewReader([]byte("quxx")), N: 2}, Wait: time.Millisecond * 25}, fileData: "quxx"},
87+
{path: "/foo3", data: &slowReader{R: &fixedReader{R: bytes.NewReader([]byte("quxx")), N: 2}, Wait: time.Millisecond * 10}, fileData: "quxx"},
8288
}
8389
for _, test := range tests {
84-
testPut(t, test)
90+
testAppend(t, test)
8591
}
8692
}
8793

88-
func testPut(t *testing.T, test putTest) {
94+
func testAppend(t *testing.T, test appendTest) {
8995
label := test.path
9096

9197
server := newTestServer()
9298
defer server.close()
99+
fpath := filepath.Join(server.dir, test.path)
93100

94101
u, _ := url.Parse(server.URL + test.path)
95-
err := Put(u, test.data)
102+
err := Append(u, test.data)
96103
if test.err != err {
97-
t.Errorf("%s: Put: want error %v, got %v", label, test.err, err)
104+
t.Errorf("%s: Append: want error %v, got %v", label, test.err, err)
98105
return
99106
}
100107
if test.err != nil {
101108
return
102109
}
103110

104-
time.Sleep(50 * time.Millisecond)
111+
time.Sleep(10 * time.Millisecond)
105112

106-
_, err = server.fs.Stat(test.path)
113+
_, err = os.Stat(fpath)
107114
if err != nil {
108115
t.Errorf("%s: Stat: %s", label, err)
109116
return
110117
}
111118

112-
f, err := server.fs.Open(test.path)
119+
f, err := os.Open(fpath)
113120
if err != nil {
114121
t.Errorf("%s: Open: %s", label, err)
115122
return
116123
}
124+
defer f.Close()
117125
fileData := string(readAll(t, f))
118126
if test.fileData != fileData {
119127
t.Errorf("%s: want fileData == %q, got %q", label, test.fileData, fileData)

cmd/httpfstream-server/server.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package main
33
import (
44
"flag"
55
"fmt"
6-
"github.com/gorilla/handlers"
76
"github.com/sourcegraph/httpfstream"
8-
"github.com/sourcegraph/rwvfs"
97
"log"
108
"net/http"
119
"os"
@@ -36,9 +34,9 @@ func main() {
3634

3735
os.MkdirAll(*root, 0700)
3836

39-
h := httpfstream.New(rwvfs.OS(*root))
37+
h := httpfstream.New(*root)
4038
h.Log = log.New(os.Stderr, "", 0)
41-
http.Handle("/", handlers.CombinedLoggingHandler(os.Stdout, h))
39+
http.Handle("/", h)
4240

4341
log.Printf("Starting server on %s\n", *bindAddr)
4442
err := http.ListenAndServe(*bindAddr, nil)

http_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package httpfstream
22

33
import (
4-
"github.com/sourcegraph/rwvfs"
54
"io"
65
"io/ioutil"
76
"log"
87
"net/http"
98
"net/http/httptest"
9+
"net/url"
1010
"os"
1111
"testing"
1212
)
1313

1414
type testServer struct {
1515
*httptest.Server
1616
dir string
17-
fs rwvfs.FileSystem
1817
}
1918

2019
func newTestServer() testServer {
@@ -28,14 +27,12 @@ func newTestServer() testServer {
2827
}
2928

3029
rootMux := http.NewServeMux()
31-
fs := rwvfs.OS(dir)
32-
h := New(fs)
30+
h := New(dir)
3331
h.Log = log.New(os.Stderr, "", 0)
3432
rootMux.Handle("/", h)
3533
return testServer{
3634
Server: httptest.NewServer(rootMux),
3735
dir: dir,
38-
fs: fs,
3936
}
4037
}
4138

@@ -44,10 +41,16 @@ func (s testServer) close() {
4441
os.RemoveAll(s.dir)
4542
}
4643

47-
func readAll(t *testing.T, rdr io.Reader) []byte {
48-
if c, ok := rdr.(io.Closer); ok {
49-
defer c.Close()
44+
func httpGET(t *testing.T, u *url.URL) string {
45+
resp, err := http.Get(u.String())
46+
if err != nil {
47+
t.Fatalf("httpGET %s: %s", u, err)
5048
}
49+
defer resp.Body.Close()
50+
return string(readAll(t, resp.Body))
51+
}
52+
53+
func readAll(t *testing.T, rdr io.Reader) []byte {
5154
data, err := ioutil.ReadAll(rdr)
5255
if err != nil {
5356
t.Fatal("ReadAll", err)

0 commit comments

Comments
 (0)