From 22be6674eba881e7d9d22c55441047ae018067af Mon Sep 17 00:00:00 2001 From: groovili Date: Fri, 1 May 2020 20:14:51 +0300 Subject: [PATCH] raw streaming services --- .gitignore | 19 +++ LICENSE | 21 +++ Makefile | 3 + README.md | 9 ++ client/client.go | 110 +++++++++++++++ go.mod | 9 ++ go.sum | 64 +++++++++ proto/streaming.pb.go | 308 ++++++++++++++++++++++++++++++++++++++++++ proto/streaming.proto | 25 ++++ server/server.go | 157 +++++++++++++++++++++ 10 files changed, 725 insertions(+) create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 Makefile create mode 100644 README.md create mode 100644 client/client.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 proto/streaming.pb.go create mode 100644 proto/streaming.proto create mode 100644 server/server.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..0fd8dd6 --- /dev/null +++ b/.gitignore @@ -0,0 +1,19 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib +.DS_Store +.bin +!.bin/.gitkeep +.idea + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..707e292 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 Max Ivanov + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..eda7c10 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +gen-streaming: + protoc ./proto/streaming.proto --go_out=plugins=grpc:. ./proto/*.proto +gen: gen-streaming \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..3337c3d --- /dev/null +++ b/README.md @@ -0,0 +1,9 @@ +# grpc-file-streaming + +TODO: + +- Figure out mismatch in sent and received files size +- Handle errors on client and server side with status +- Implement bidirectional streaming +- Generate certificates and use secure connection +- Dockerize application \ No newline at end of file diff --git a/client/client.go b/client/client.go new file mode 100644 index 0000000..ce1d13d --- /dev/null +++ b/client/client.go @@ -0,0 +1,110 @@ +package main + +import ( + "context" + "fmt" + "grpc-file-streaming/proto" + "io" + "os" + "path" + "time" + + log "github.com/sirupsen/logrus" + grpc "google.golang.org/grpc" +) + +func main() { + if len(os.Args) < 2 { + log.Fatal("Provide filename as second arg") + } + + filename := os.Args[1] + f, err := os.Open(filename) + if err != nil { + log.Errorf("Failed to open file: %v", err) + return + } + defer func() { + if err := f.Close(); err != nil { + log.Errorf("Failed to close file: %v", err) + } + }() + + host, port := "0.0.0.0", "50051" + + con, err := grpc.Dial(fmt.Sprintf("%s:%s", host, port), grpc.WithInsecure()) + if err != nil { + log.Errorf("Failed to dial to %s:%s :", host, port, err) + return + } + defer func() { + log.Info("Closing gRPC connection..") + if err := con.Close(); err != nil { + log.Error(err) + } + }() + + client := proto.NewFileStreamingClient(con) + + _, name := path.Split(f.Name()) + stat, _ := f.Stat() + + req := &proto.FileRequest{ + Filename: name, + Size: stat.Size(), + Data: &proto.File{ + Content: nil, + }, + } + + var resp *proto.FileResponse + + bufSize := 1 << 5 // 0.5 MB + buf := make([]byte, bufSize) + + ctx, canc := context.WithTimeout(context.Background(), time.Minute) + defer canc() + + stream, err := client.SendStream(ctx) + if err != nil { + log.Errorf("Failed to call SendStream: %v", err) + return + } + + startTime := time.Now() + var offset int64 + for { + n, err := f.ReadAt(buf, offset) + if err != nil { + if err == io.EOF { + resp, err = stream.CloseAndRecv() + + break + } + + log.Errorf("Failed to read file part: %v", err) + _ = stream.CloseSend() + break + } + + req.Data.Content = buf + + if err := stream.Send(req); err != nil { + if err != io.EOF { + log.Errorf("Failed to send stream: %v", err) + } + + _ = stream.CloseSend() + + break + } + + offset += int64(n) + } + + if resp.Success { + log.Infof("Successfully send file with size %d mB", resp.Size>>10) + } + + log.Infof("Task took %d seconds", int(time.Now().Sub(startTime).Seconds())) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..335aeff --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module grpc-file-streaming + +go 1.14 + +require ( + github.com/golang/protobuf v1.3.3 + github.com/sirupsen/logrus v1.5.0 + google.golang.org/grpc v1.28.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..e109f31 --- /dev/null +++ b/go.sum @@ -0,0 +1,64 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/sirupsen/logrus v1.5.0 h1:1N5EYkVAPEywqZRJd7cwnRtCb6xJx7NH3T3WUTF980Q= +github.com/sirupsen/logrus v1.5.0/go.mod h1:+F7Ogzej0PZc/94MaYx/nvG9jOFMD2osvC3s+Squfpo= +github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a h1:oWX7TPOiFAMXLq8o0ikBYfCJVlRHBcsciT5bXOrH628= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.28.1 h1:C1QC6KzgSiLyBabDi87BbjaGreoRgGUF5nOyvfrAZ1k= +google.golang.org/grpc v1.28.1/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/proto/streaming.pb.go b/proto/streaming.pb.go new file mode 100644 index 0000000..3d8f580 --- /dev/null +++ b/proto/streaming.pb.go @@ -0,0 +1,308 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// source: proto/streaming.proto + +package proto + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package + +type File struct { + Content []byte `protobuf:"bytes,1,opt,name=Content,proto3" json:"Content,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *File) Reset() { *m = File{} } +func (m *File) String() string { return proto.CompactTextString(m) } +func (*File) ProtoMessage() {} +func (*File) Descriptor() ([]byte, []int) { + return fileDescriptor_5556cc946f1d51e2, []int{0} +} + +func (m *File) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_File.Unmarshal(m, b) +} +func (m *File) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_File.Marshal(b, m, deterministic) +} +func (m *File) XXX_Merge(src proto.Message) { + xxx_messageInfo_File.Merge(m, src) +} +func (m *File) XXX_Size() int { + return xxx_messageInfo_File.Size(m) +} +func (m *File) XXX_DiscardUnknown() { + xxx_messageInfo_File.DiscardUnknown(m) +} + +var xxx_messageInfo_File proto.InternalMessageInfo + +func (m *File) GetContent() []byte { + if m != nil { + return m.Content + } + return nil +} + +type FileRequest struct { + Filename string `protobuf:"bytes,1,opt,name=Filename,proto3" json:"Filename,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=Size,proto3" json:"Size,omitempty"` + Data *File `protobuf:"bytes,3,opt,name=Data,proto3" json:"Data,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FileRequest) Reset() { *m = FileRequest{} } +func (m *FileRequest) String() string { return proto.CompactTextString(m) } +func (*FileRequest) ProtoMessage() {} +func (*FileRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_5556cc946f1d51e2, []int{1} +} + +func (m *FileRequest) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FileRequest.Unmarshal(m, b) +} +func (m *FileRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FileRequest.Marshal(b, m, deterministic) +} +func (m *FileRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_FileRequest.Merge(m, src) +} +func (m *FileRequest) XXX_Size() int { + return xxx_messageInfo_FileRequest.Size(m) +} +func (m *FileRequest) XXX_DiscardUnknown() { + xxx_messageInfo_FileRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_FileRequest proto.InternalMessageInfo + +func (m *FileRequest) GetFilename() string { + if m != nil { + return m.Filename + } + return "" +} + +func (m *FileRequest) GetSize() int64 { + if m != nil { + return m.Size + } + return 0 +} + +func (m *FileRequest) GetData() *File { + if m != nil { + return m.Data + } + return nil +} + +type FileResponse struct { + Success bool `protobuf:"varint,1,opt,name=Success,proto3" json:"Success,omitempty"` + Size int64 `protobuf:"varint,2,opt,name=Size,proto3" json:"Size,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *FileResponse) Reset() { *m = FileResponse{} } +func (m *FileResponse) String() string { return proto.CompactTextString(m) } +func (*FileResponse) ProtoMessage() {} +func (*FileResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_5556cc946f1d51e2, []int{2} +} + +func (m *FileResponse) XXX_Unmarshal(b []byte) error { + return xxx_messageInfo_FileResponse.Unmarshal(m, b) +} +func (m *FileResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + return xxx_messageInfo_FileResponse.Marshal(b, m, deterministic) +} +func (m *FileResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_FileResponse.Merge(m, src) +} +func (m *FileResponse) XXX_Size() int { + return xxx_messageInfo_FileResponse.Size(m) +} +func (m *FileResponse) XXX_DiscardUnknown() { + xxx_messageInfo_FileResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_FileResponse proto.InternalMessageInfo + +func (m *FileResponse) GetSuccess() bool { + if m != nil { + return m.Success + } + return false +} + +func (m *FileResponse) GetSize() int64 { + if m != nil { + return m.Size + } + return 0 +} + +func init() { + proto.RegisterType((*File)(nil), "proto.File") + proto.RegisterType((*FileRequest)(nil), "proto.FileRequest") + proto.RegisterType((*FileResponse)(nil), "proto.FileResponse") +} + +func init() { + proto.RegisterFile("proto/streaming.proto", fileDescriptor_5556cc946f1d51e2) +} + +var fileDescriptor_5556cc946f1d51e2 = []byte{ + // 218 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2d, 0x28, 0xca, 0x2f, + 0xc9, 0xd7, 0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0xcd, 0xcc, 0x4b, 0xd7, 0x03, 0xf3, 0x85, 0x58, + 0xc1, 0x94, 0x92, 0x02, 0x17, 0x8b, 0x5b, 0x66, 0x4e, 0xaa, 0x90, 0x04, 0x17, 0xbb, 0x73, 0x7e, + 0x5e, 0x49, 0x6a, 0x5e, 0x89, 0x04, 0xa3, 0x02, 0xa3, 0x06, 0x4f, 0x10, 0x8c, 0xab, 0x14, 0xc7, + 0xc5, 0x0d, 0x52, 0x11, 0x94, 0x5a, 0x58, 0x9a, 0x5a, 0x5c, 0x22, 0x24, 0xc5, 0xc5, 0x01, 0xe2, + 0xe6, 0x25, 0xe6, 0xa6, 0x82, 0x55, 0x72, 0x06, 0xc1, 0xf9, 0x42, 0x42, 0x5c, 0x2c, 0xc1, 0x99, + 0x55, 0xa9, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xcc, 0x41, 0x60, 0xb6, 0x90, 0x3c, 0x17, 0x8b, 0x4b, + 0x62, 0x49, 0xa2, 0x04, 0xb3, 0x02, 0xa3, 0x06, 0xb7, 0x11, 0x37, 0xc4, 0x76, 0x3d, 0xb0, 0x89, + 0x60, 0x09, 0x25, 0x1b, 0x2e, 0x1e, 0x88, 0xf9, 0xc5, 0x05, 0xf9, 0x79, 0xc5, 0x60, 0x97, 0x04, + 0x97, 0x26, 0x27, 0xa7, 0x16, 0x17, 0x83, 0xcd, 0xe7, 0x08, 0x82, 0x71, 0xb1, 0x19, 0x6f, 0xe4, + 0xc5, 0xc5, 0x0b, 0xd2, 0x1d, 0x0c, 0xf3, 0x9d, 0x90, 0x25, 0x17, 0x57, 0x70, 0x6a, 0x5e, 0x0a, + 0x44, 0x40, 0x48, 0x08, 0xd9, 0x3e, 0x88, 0x0f, 0xa4, 0x84, 0x51, 0xc4, 0x20, 0xb6, 0x2a, 0x31, + 0x68, 0x30, 0x3a, 0xb1, 0x47, 0x41, 0x02, 0x25, 0x89, 0x0d, 0x4c, 0x19, 0x03, 0x02, 0x00, 0x00, + 0xff, 0xff, 0x82, 0xd3, 0x47, 0xf9, 0x3b, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConnInterface + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion6 + +// FileStreamingClient is the client API for FileStreaming service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type FileStreamingClient interface { + SendStream(ctx context.Context, opts ...grpc.CallOption) (FileStreaming_SendStreamClient, error) +} + +type fileStreamingClient struct { + cc grpc.ClientConnInterface +} + +func NewFileStreamingClient(cc grpc.ClientConnInterface) FileStreamingClient { + return &fileStreamingClient{cc} +} + +func (c *fileStreamingClient) SendStream(ctx context.Context, opts ...grpc.CallOption) (FileStreaming_SendStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &_FileStreaming_serviceDesc.Streams[0], "/proto.FileStreaming/SendStream", opts...) + if err != nil { + return nil, err + } + x := &fileStreamingSendStreamClient{stream} + return x, nil +} + +type FileStreaming_SendStreamClient interface { + Send(*FileRequest) error + CloseAndRecv() (*FileResponse, error) + grpc.ClientStream +} + +type fileStreamingSendStreamClient struct { + grpc.ClientStream +} + +func (x *fileStreamingSendStreamClient) Send(m *FileRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *fileStreamingSendStreamClient) CloseAndRecv() (*FileResponse, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(FileResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// FileStreamingServer is the server API for FileStreaming service. +type FileStreamingServer interface { + SendStream(FileStreaming_SendStreamServer) error +} + +// UnimplementedFileStreamingServer can be embedded to have forward compatible implementations. +type UnimplementedFileStreamingServer struct { +} + +func (*UnimplementedFileStreamingServer) SendStream(srv FileStreaming_SendStreamServer) error { + return status.Errorf(codes.Unimplemented, "method SendStream not implemented") +} + +func RegisterFileStreamingServer(s *grpc.Server, srv FileStreamingServer) { + s.RegisterService(&_FileStreaming_serviceDesc, srv) +} + +func _FileStreaming_SendStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(FileStreamingServer).SendStream(&fileStreamingSendStreamServer{stream}) +} + +type FileStreaming_SendStreamServer interface { + SendAndClose(*FileResponse) error + Recv() (*FileRequest, error) + grpc.ServerStream +} + +type fileStreamingSendStreamServer struct { + grpc.ServerStream +} + +func (x *fileStreamingSendStreamServer) SendAndClose(m *FileResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *fileStreamingSendStreamServer) Recv() (*FileRequest, error) { + m := new(FileRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _FileStreaming_serviceDesc = grpc.ServiceDesc{ + ServiceName: "proto.FileStreaming", + HandlerType: (*FileStreamingServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SendStream", + Handler: _FileStreaming_SendStream_Handler, + ClientStreams: true, + }, + }, + Metadata: "proto/streaming.proto", +} diff --git a/proto/streaming.proto b/proto/streaming.proto new file mode 100644 index 0000000..1355fcb --- /dev/null +++ b/proto/streaming.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package proto; +option go_package = "proto"; + +message File { + bytes Content = 1; +} + +message FileRequest { + string Filename = 1; + int64 Size = 2; + File Data = 3; +} + +message FileResponse { + bool Success = 1; + int64 Size = 2; +} + +service FileStreaming { + rpc SendStream(stream FileRequest) returns (FileResponse) {}; +} + + diff --git a/server/server.go b/server/server.go new file mode 100644 index 0000000..f7a1f67 --- /dev/null +++ b/server/server.go @@ -0,0 +1,157 @@ +package main + +import ( + "fmt" + "grpc-file-streaming/proto" + "io" + "io/ioutil" + "net" + "os" + "os/signal" + + log "github.com/sirupsen/logrus" + grpc "google.golang.org/grpc" +) + +type server struct { + tmpDir string +} + +func newServer() (*server, error) { + dir, err := os.Getwd() + if err != nil { + return nil, err + } + + tmpDir, err := ioutil.TempDir(fmt.Sprintf("%s%s", dir, string(os.PathSeparator)), "server_tmp_*") + if err != nil { + return nil, err + } + + return &server{tmpDir: tmpDir}, nil +} + +func (s *server) shutdown() error { + return os.RemoveAll(s.tmpDir) +} + +func (s *server) SendStream(stream proto.FileStreaming_SendStreamServer) error { + // receive first part of stream with required metadata + st, err := stream.Recv() + if err != nil { + return err + } + + // create file in temporary dir + f, err := os.Create(fmt.Sprintf("%s%s%s", s.tmpDir, string(os.PathSeparator), st.GetFilename())) + if err != nil { + return err + } + defer func() { + if err := f.Close(); err != nil { + log.Error(err) + } + }() + + expectedSize := st.GetSize() + var size int64 + + // write first chunk of data + n, err := f.Write(st.Data.Content) + if err != nil { + return err + } + + size += int64(n) + + // receive stream parts until EOF, this means client sent all parts + for { + // check deadline + if done := stream.Context().Err(); done != nil { + // TODO: remove file if deadline exceeded and sent gRPC status + return err + } + + st, err = stream.Recv() + if err != nil { + // all chunks are received + if err == io.EOF { + break + } + + log.Errorf("Error while receiving stream: %v", err) + return err + } + + n, err := f.Write(st.Data.Content) + if err != nil { + log.Errorf("Error while writing file: %v", err) + return err + } + + size += int64(n) + } + + resp := &proto.FileResponse{ + Success: false, + Size: size, + } + + if size == expectedSize { + resp.Success = true + } + + return stream.SendAndClose(resp) +} + +func main() { + host, port := "0.0.0.0", "50051" + + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt) + + l, err := net.Listen("tcp", fmt.Sprintf("%s:%s", host, port)) + if err != nil { + log.Fatalf("Failed to listen on %s:%s : %v", host, port, err) + } + defer func() { + log.Info("Closing tcp connection..") + + if err := l.Close(); err != nil { + log.Errorf("Failed to close tcp connection: %v", err) + } + }() + + service, err := newServer() + if err != nil { + log.Fatalf("Failed to create server: %v", err) + } + defer func() { + log.Info("Shutting down service..") + + if err := service.shutdown(); err != nil { + log.Error(err) + } + }() + + s := grpc.NewServer([]grpc.ServerOption{}...) + + proto.RegisterFileStreamingServer(s, service) + + go func() { + log.Info("Starting gRPC server..") + + if err := s.Serve(l); err != nil { + log.Error(err) + return + } + }() + + <-stop + log.Info("Stopping gRPC server...") + + // close server for new cons, finish ongoing + s.GracefulStop() + + log.Info("Server stopped") +}