Skip to content
This repository was archived by the owner on Sep 12, 2018. It is now read-only.

Commit 1c68bdb

Browse files
committed
Uses streams internally for ipc Get/Put Content
This is done because libchan/spdystream does not currently support sending serialzied objects of size larger than 16MB See docker-archive-public/docker.libchan#65
1 parent 3f95694 commit 1c68bdb

File tree

4 files changed

+37
-26
lines changed

4 files changed

+37
-26
lines changed

storagedriver/ipc/client.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package ipc
22

33
import (
4+
"bytes"
45
"encoding/json"
56
"io"
7+
"io/ioutil"
68
"net"
79
"os"
810
"os/exec"
@@ -116,7 +118,7 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
116118
return nil, err
117119
}
118120

119-
var response GetContentResponse
121+
var response ReadStreamResponse
120122
err = receiver.Receive(&response)
121123
if err != nil {
122124
return nil, err
@@ -126,22 +128,26 @@ func (driver *StorageDriverClient) GetContent(path string) ([]byte, error) {
126128
return nil, response.Error
127129
}
128130

129-
return response.Content, nil
131+
defer response.Reader.Close()
132+
contents, err := ioutil.ReadAll(response.Reader)
133+
if err != nil {
134+
return nil, err
135+
}
136+
return contents, nil
130137
}
131138

132139
func (driver *StorageDriverClient) PutContent(path string, contents []byte) error {
133140
receiver, remoteSender := libchan.Pipe()
134141

135-
params := map[string]interface{}{"Path": path, "Contents": contents}
142+
params := map[string]interface{}{"Path": path, "Reader": WrapReader(bytes.NewReader(contents))}
136143
err := driver.sender.Send(&Request{Type: "PutContent", Parameters: params, ResponseChannel: remoteSender})
137144
if err != nil {
138145
return err
139146
}
140147

141-
var response PutContentResponse
148+
var response WriteStreamResponse
142149
err = receiver.Receive(&response)
143150
if err != nil {
144-
panic(err)
145151
return err
146152
}
147153

@@ -177,7 +183,7 @@ func (driver *StorageDriverClient) ReadStream(path string, offset uint64) (io.Re
177183
func (driver *StorageDriverClient) WriteStream(path string, offset, size uint64, reader io.ReadCloser) error {
178184
receiver, remoteSender := libchan.Pipe()
179185

180-
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReadCloser(reader)}
186+
params := map[string]interface{}{"Path": path, "Offset": offset, "Size": size, "Reader": WrapReader(reader)}
181187
err := driver.sender.Send(&Request{Type: "WriteStream", Parameters: params, ResponseChannel: remoteSender})
182188
if err != nil {
183189
return err

storagedriver/ipc/ipc.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"io"
7+
"io/ioutil"
78
"reflect"
89

910
"github.com/docker/libchan"
@@ -23,8 +24,14 @@ func (r noWriteReadWriteCloser) Write(p []byte) (n int, err error) {
2324
return 0, errors.New("Write unsupported")
2425
}
2526

26-
func WrapReadCloser(readCloser io.ReadCloser) io.ReadWriteCloser {
27-
return noWriteReadWriteCloser{readCloser}
27+
func WrapReader(reader io.Reader) io.ReadWriteCloser {
28+
if readWriteCloser, ok := reader.(io.ReadWriteCloser); ok {
29+
return readWriteCloser
30+
} else if readCloser, ok := reader.(io.ReadCloser); ok {
31+
return noWriteReadWriteCloser{readCloser}
32+
} else {
33+
return noWriteReadWriteCloser{ioutil.NopCloser(reader)}
34+
}
2835
}
2936

3037
type responseError struct {
@@ -46,15 +53,6 @@ func (err *responseError) Error() string {
4653
return fmt.Sprintf("%s: %s", err.Type, err.Message)
4754
}
4855

49-
type GetContentResponse struct {
50-
Content []byte
51-
Error *responseError
52-
}
53-
54-
type PutContentResponse struct {
55-
Error *responseError
56-
}
57-
5856
type ReadStreamResponse struct {
5957
Reader io.ReadWriteCloser
6058
Error *responseError

storagedriver/ipc/server.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package ipc
22

33
import (
4+
"bytes"
45
"io"
6+
"io/ioutil"
57
"net"
68
"os"
79

@@ -44,24 +46,29 @@ func receive(driver storagedriver.StorageDriver, receiver libchan.Receiver) {
4446
}
4547

4648
func handleRequest(driver storagedriver.StorageDriver, request Request) {
47-
4849
switch request.Type {
4950
case "GetContent":
5051
path, _ := request.Parameters["Path"].(string)
5152
content, err := driver.GetContent(path)
52-
response := GetContentResponse{
53-
Content: content,
54-
Error: ResponseError(err),
53+
var response ReadStreamResponse
54+
if err != nil {
55+
response = ReadStreamResponse{Error: ResponseError(err)}
56+
} else {
57+
response = ReadStreamResponse{Reader: WrapReader(bytes.NewReader(content))}
5558
}
5659
err = request.ResponseChannel.Send(&response)
5760
if err != nil {
5861
panic(err)
5962
}
6063
case "PutContent":
6164
path, _ := request.Parameters["Path"].(string)
62-
contents, _ := request.Parameters["Contents"].([]byte)
63-
err := driver.PutContent(path, contents)
64-
response := PutContentResponse{
65+
reader, _ := request.Parameters["Reader"].(io.ReadCloser)
66+
contents, err := ioutil.ReadAll(reader)
67+
defer reader.Close()
68+
if err == nil {
69+
err = driver.PutContent(path, contents)
70+
}
71+
response := WriteStreamResponse{
6572
Error: ResponseError(err),
6673
}
6774
err = request.ResponseChannel.Send(&response)
@@ -82,7 +89,7 @@ func handleRequest(driver storagedriver.StorageDriver, request Request) {
8289
if err != nil {
8390
response = ReadStreamResponse{Error: ResponseError(err)}
8491
} else {
85-
response = ReadStreamResponse{Reader: WrapReadCloser(reader)}
92+
response = ReadStreamResponse{Reader: WrapReader(reader)}
8693
}
8794
err = request.ResponseChannel.Send(&response)
8895
if err != nil {

storagedriver/testsuites/testsuites.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ func (suite *DriverSuite) TestWriteReadStreams4(c *C) {
128128
func (suite *DriverSuite) TestContinueStreamAppend(c *C) {
129129
filename := randomString(32)
130130

131-
chunkSize := uint64(32)
131+
chunkSize := uint64(10 * 1024 * 1024)
132132

133133
contentsChunk1 := []byte(randomString(chunkSize))
134134
contentsChunk2 := []byte(randomString(chunkSize))

0 commit comments

Comments
 (0)