Skip to content

Commit e917f9a

Browse files
authored
Merge pull request #56 from smacker/grpc_max_msg_size
Grpc max msg size
2 parents 0ac15b5 + 3e83361 commit e917f9a

File tree

7 files changed

+124
-52
lines changed

7 files changed

+124
-52
lines changed

cmd/dummy/main.go

+13-21
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,40 @@
11
package main
22

33
import (
4-
"os"
4+
"context"
55

66
"github.com/src-d/lookout"
77
"github.com/src-d/lookout/dummy"
8+
"github.com/src-d/lookout/util/flags"
9+
"github.com/src-d/lookout/util/grpchelper"
810

9-
"github.com/jessevdk/go-flags"
1011
"google.golang.org/grpc"
1112
_ "google.golang.org/grpc/grpclog/glogger"
1213
)
1314

14-
const maxMsgSize = 1024 * 1024 * 100 // 100mb
15-
1615
var (
1716
version = "local_build_1"
18-
parser = flags.NewParser(nil, flags.Default)
17+
parser = flags.NewParser()
1918
)
2019

2120
type ServeCommand struct {
21+
flags.CommonOptions
2222
Analyzer string `long:"analyzer" default:"ipv4://localhost:10302" env:"LOOKOUT_ANALYZER" description:"gRPC URL to bind the analyzer to"`
2323
DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"grPC URL of the data server"`
2424
}
2525

2626
func (c *ServeCommand) Execute(args []string) error {
2727
var err error
28-
c.DataServer, err = lookout.ToGoGrpcAddress(c.DataServer)
28+
c.DataServer, err = grpchelper.ToGoGrpcAddress(c.DataServer)
2929
if err != nil {
3030
return err
3131
}
3232

33-
conn, err := grpc.Dial(c.DataServer,
33+
conn, err := grpchelper.DialContext(
34+
context.Background(),
35+
c.DataServer,
3436
grpc.WithInsecure(),
35-
grpc.WithDefaultCallOptions(grpc.FailFast(false), grpc.MaxCallRecvMsgSize(maxMsgSize)),
37+
grpc.WithDefaultCallOptions(grpc.FailFast(false)),
3638
)
3739
if err != nil {
3840
return err
@@ -43,10 +45,10 @@ func (c *ServeCommand) Execute(args []string) error {
4345
DataClient: lookout.NewDataClient(conn),
4446
}
4547

46-
server := grpc.NewServer()
48+
server := grpchelper.NewServer()
4749
lookout.RegisterAnalyzerServer(server, a)
4850

49-
lis, err := lookout.Listen(c.Analyzer)
51+
lis, err := grpchelper.Listen(c.Analyzer)
5052
if err != nil {
5153
return err
5254
}
@@ -60,15 +62,5 @@ func main() {
6062
panic(err)
6163
}
6264

63-
if _, err := parser.Parse(); err != nil {
64-
if err, ok := err.(*flags.Error); ok {
65-
if err.Type == flags.ErrHelp {
66-
os.Exit(0)
67-
}
68-
69-
parser.WriteHelp(os.Stdout)
70-
}
71-
72-
os.Exit(1)
73-
}
65+
flags.RunMain(parser)
7466
}

cmd/lookout/event.go

+9-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import (
99
"github.com/src-d/lookout"
1010
"github.com/src-d/lookout/service/bblfsh"
1111
"github.com/src-d/lookout/service/git"
12+
"github.com/src-d/lookout/util/flags"
13+
"github.com/src-d/lookout/util/grpchelper"
1214
"google.golang.org/grpc"
1315

1416
gogit "gopkg.in/src-d/go-git.v4"
@@ -17,6 +19,7 @@ import (
1719
)
1820

1921
type EventCommand struct {
22+
flags.CommonOptions
2023
DataServer string `long:"data-server" default:"ipv4://localhost:10301" env:"LOOKOUT_DATA_SERVER" description:"gRPC URL to bind the data server to"`
2124
Bblfshd string `long:"bblfshd" default:"ipv4://localhost:9432" env:"LOOKOUT_BBLFSHD" description:"gRPC URL of the Bblfshd server"`
2225
GitDir string `long:"git-dir" default:"." env:"GIT_DIR" description:"path to the .git directory to analyze"`
@@ -84,13 +87,13 @@ func (c *EventCommand) makeDataServerHandler() (*lookout.DataServerHandler, erro
8487
loader := git.NewStorerCommitLoader(c.repo.Storer)
8588
dataService = git.NewService(loader)
8689

87-
c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd)
90+
c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd)
8891
if err != nil {
8992
return nil, err
9093
}
9194
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
9295
defer cancel()
93-
bblfshConn, err := grpc.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock())
96+
bblfshConn, err := grpchelper.DialContext(timeoutCtx, c.Bblfshd, grpc.WithInsecure(), grpc.WithBlock())
9497
if err != nil {
9598
log.Warningf("bblfsh service is unavailable. No UAST will be provided to analyzer. Error: %s", err)
9699
} else {
@@ -110,10 +113,10 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul
110113
setGrpcLogger()
111114
}
112115

113-
grpcSrv := grpc.NewServer()
116+
grpcSrv := grpchelper.NewServer()
114117
lookout.RegisterDataServer(grpcSrv, srv)
115118

116-
lis, err := lookout.Listen(c.DataServer)
119+
lis, err := grpchelper.Listen(c.DataServer)
117120
if err != nil {
118121
return nil, err
119122
}
@@ -126,19 +129,18 @@ func (c *EventCommand) bindDataServer(srv *lookout.DataServerHandler, serveResul
126129
func (c *EventCommand) analyzerClient() (lookout.AnalyzerClient, error) {
127130
var err error
128131

129-
c.Args.Analyzer, err = lookout.ToGoGrpcAddress(c.Args.Analyzer)
132+
c.Args.Analyzer, err = grpchelper.ToGoGrpcAddress(c.Args.Analyzer)
130133
if err != nil {
131134
return nil, err
132135
}
133136

134137
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Second)
135138
defer cancel()
136-
conn, err := grpc.DialContext(
139+
conn, err := grpchelper.DialContext(
137140
timeoutCtx,
138141
c.Args.Analyzer,
139142
grpc.WithInsecure(),
140143
grpc.WithBlock(),
141-
grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)),
142144
)
143145
if err != nil {
144146
return nil, err

cmd/lookout/main.go

+3-15
Original file line numberDiff line numberDiff line change
@@ -4,37 +4,25 @@ import (
44
stdlog "log"
55
"os"
66

7-
"github.com/jessevdk/go-flags"
7+
"github.com/src-d/lookout/util/flags"
88
"google.golang.org/grpc/grpclog"
99
"gopkg.in/src-d/go-log.v1"
1010
)
1111

12-
const maxMsgSize = 1024 * 1024 * 100 // 100mb
13-
1412
var (
1513
name = "lookout"
1614
version = "undefined"
1715
build = "undefined"
1816
)
1917

20-
var parser = flags.NewParser(nil, flags.Default)
18+
var parser = flags.NewParser()
2119

2220
func init() {
2321
log.DefaultLogger = log.New(log.Fields{"app": name})
2422
}
2523

2624
func main() {
27-
if _, err := parser.Parse(); err != nil {
28-
if err, ok := err.(*flags.Error); ok {
29-
if err.Type == flags.ErrHelp {
30-
os.Exit(0)
31-
}
32-
33-
parser.WriteHelp(os.Stdout)
34-
}
35-
36-
os.Exit(1)
37-
}
25+
flags.RunMain(parser)
3826
}
3927

4028
func setGrpcLogger() {

cmd/lookout/serve.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"github.com/src-d/lookout/provider/github"
1111
"github.com/src-d/lookout/service/bblfsh"
1212
"github.com/src-d/lookout/service/git"
13+
"github.com/src-d/lookout/util/flags"
14+
"github.com/src-d/lookout/util/grpchelper"
1315

1416
"google.golang.org/grpc"
1517
"gopkg.in/src-d/go-billy.v4/osfs"
@@ -25,6 +27,7 @@ func init() {
2527
}
2628

2729
type ServeCommand struct {
30+
flags.CommonOptions
2831
ConfigFile string `long:"config" short:"c" default:"config.yml" env:"LOOKOUT_CONFIG_FILE" description:"path to configuration file"`
2932
GithubUser string `long:"github-user" env:"GITHUB_USER" description:"user for the GitHub API"`
3033
GithubToken string `long:"github-token" env:"GITHUB_TOKEN" description:"access token for the GitHub API"`
@@ -106,12 +109,12 @@ func (c *ServeCommand) initPoster() (lookout.Poster, error) {
106109
}
107110

108111
func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.AnalyzerClient, error) {
109-
addr, err := lookout.ToGoGrpcAddress(conf.Addr)
112+
addr, err := grpchelper.ToGoGrpcAddress(conf.Addr)
110113
if err != nil {
111114
return nil, err
112115
}
113116

114-
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithBlock())
117+
conn, err := grpchelper.DialContext(context.Background(), addr, grpc.WithInsecure(), grpc.WithBlock())
115118
if err != nil {
116119
return nil, err
117120
}
@@ -121,12 +124,12 @@ func (c *ServeCommand) startAnalyzer(conf lookout.AnalyzerConfig) (lookout.Analy
121124

122125
func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) {
123126
var err error
124-
c.Bblfshd, err = lookout.ToGoGrpcAddress(c.Bblfshd)
127+
c.Bblfshd, err = grpchelper.ToGoGrpcAddress(c.Bblfshd)
125128
if err != nil {
126129
return nil, err
127130
}
128131

129-
bblfshConn, err := grpc.Dial(c.Bblfshd, grpc.WithInsecure())
132+
bblfshConn, err := grpchelper.DialContext(context.Background(), c.Bblfshd, grpc.WithInsecure())
130133
if err != nil {
131134
return nil, err
132135
}
@@ -147,9 +150,9 @@ func (c *ServeCommand) initDataHadler() (*lookout.DataServerHandler, error) {
147150
}
148151

149152
func (c *ServeCommand) startServer(srv *lookout.DataServerHandler) error {
150-
grpcSrv := grpc.NewServer()
153+
grpcSrv := grpchelper.NewServer()
151154
lookout.RegisterDataServer(grpcSrv, srv)
152-
lis, err := lookout.Listen(c.DataServer)
155+
lis, err := grpchelper.Listen(c.DataServer)
153156
if err != nil {
154157
return err
155158
}

dummy/dummy_test.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77

88
"github.com/src-d/lookout"
99
"github.com/src-d/lookout/service/git"
10+
"github.com/src-d/lookout/util/grpchelper"
1011

1112
"github.com/stretchr/testify/suite"
1213
"google.golang.org/grpc"
@@ -45,7 +46,7 @@ func (s *DummySuite) SetupSuite() {
4546
}
4647
lookout.RegisterDataServer(s.apiServer, server)
4748

48-
lis, err := lookout.Listen("ipv4://0.0.0.0:9991")
49+
lis, err := grpchelper.Listen("ipv4://0.0.0.0:9991")
4950
require.NoError(err)
5051

5152
go s.apiServer.Serve(lis)
@@ -86,7 +87,7 @@ func (s *DummySuite) Test() {
8687
s.analyzerServer = grpc.NewServer()
8788
lookout.RegisterAnalyzerServer(s.analyzerServer, a)
8889

89-
lis, err := lookout.Listen("ipv4://0.0.0.0:9995")
90+
lis, err := grpchelper.Listen("ipv4://0.0.0.0:9995")
9091
require.NoError(err)
9192

9293
done := make(chan error)

util/flags/flags.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package flags
2+
3+
import (
4+
"os"
5+
6+
flags "github.com/jessevdk/go-flags"
7+
"github.com/src-d/lookout/util/grpchelper"
8+
)
9+
10+
// CommonOptions contains common flags for all commands
11+
type CommonOptions struct {
12+
GrpcMaxMsgSize int `long:"grpc-max-message-size" default:"100" env:"LOOKOUT_GRPC_MAX_MSG_SIZE" description:"max. message size to send/receive to/from clients (in MB)"`
13+
}
14+
15+
// GetGrpcMaxMsgSize implements GrpcMaxMsgSizer interface
16+
func (o *CommonOptions) GetGrpcMaxMsgSize() int {
17+
return o.GrpcMaxMsgSize
18+
}
19+
20+
// GrpcMaxMsgSizer is used to get gRPC maximum message size
21+
type GrpcMaxMsgSizer interface {
22+
GetGrpcMaxMsgSize() int
23+
}
24+
25+
// NewParser returns new flags.Parser
26+
func NewParser() *flags.Parser {
27+
parser := flags.NewParser(nil, flags.Default)
28+
parser.CommandHandler = func(command flags.Commander, args []string) error {
29+
if s, ok := command.(GrpcMaxMsgSizer); ok {
30+
grpchelper.SetMaxMessageSize(s.GetGrpcMaxMsgSize())
31+
}
32+
return command.Execute(args)
33+
}
34+
return parser
35+
}
36+
37+
// RunMain parses arguments and runs commands
38+
func RunMain(parser *flags.Parser) {
39+
if _, err := parser.Parse(); err != nil {
40+
if err, ok := err.(*flags.Error); ok {
41+
if err.Type == flags.ErrHelp {
42+
os.Exit(0)
43+
}
44+
45+
parser.WriteHelp(os.Stdout)
46+
}
47+
48+
os.Exit(1)
49+
}
50+
}

common.go renamed to util/grpchelper/helper.go

+37-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,30 @@
1-
package lookout
1+
package grpchelper
22

33
import (
4+
"context"
45
"fmt"
56
"net"
67
"net/url"
8+
"os"
9+
10+
"google.golang.org/grpc"
11+
log "gopkg.in/src-d/go-log.v1"
712
)
813

14+
var maxMessageSize = 100 * 1024 * 1024 // 100mb
15+
16+
// SetMaxMessageSize overrides default grpc max. message size to send/receive to/from clients
17+
func SetMaxMessageSize(size int) {
18+
if size >= 2048 {
19+
// Setting the hard limit of message size to less than 2GB since
20+
// it may overflow an int value, and it should be big enough
21+
log.Errorf(fmt.Errorf("max-message-size too big (limit is 2047MB): %d", size), "SetMaxMessageSize")
22+
os.Exit(1)
23+
}
24+
25+
maxMessageSize = size * 1024 * 1024
26+
}
27+
928
//TODO: https://github.com/grpc/grpc-go/issues/1911
1029

1130
// ToNetListenerAddress converts a gRPC URL to a network+address consumable by
@@ -65,3 +84,20 @@ func Listen(address string) (net.Listener, error) {
6584

6685
return net.Listen(n, a)
6786
}
87+
88+
// NewGrpcServer creates new grpc.Server with custom message size
89+
func NewServer(opts ...grpc.ServerOption) *grpc.Server {
90+
opts = append(opts, grpc.MaxRecvMsgSize(maxMessageSize), grpc.MaxSendMsgSize(maxMessageSize))
91+
92+
return grpc.NewServer(opts...)
93+
}
94+
95+
// GrpcDialContext creates a client connection to the given target with custom message size
96+
func DialContext(ctx context.Context, target string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
97+
opts = append(opts, grpc.WithDefaultCallOptions(
98+
grpc.MaxCallRecvMsgSize(maxMessageSize),
99+
grpc.MaxCallSendMsgSize(maxMessageSize),
100+
))
101+
102+
return grpc.DialContext(ctx, target, opts...)
103+
}

0 commit comments

Comments
 (0)