Skip to content

Commit a76da0d

Browse files
authored
Merge pull request #1202 from uptrace/add-config-with-conn-buffer-size
feat(pg): allow user config buffer size of pg's connect
2 parents f3bb177 + e2f2650 commit a76da0d

File tree

5 files changed

+36
-4
lines changed

5 files changed

+36
-4
lines changed

driver/pgdriver/config.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ type Config struct {
4848

4949
// Enable tracing
5050
EnableTracing bool
51+
52+
// size of reader buffer, default is 4096
53+
BufferSize int
5154
}
5255

5356
func newDefaultConfig() *Config {
@@ -67,6 +70,8 @@ func newDefaultConfig() *Config {
6770
WriteTimeout: 5 * time.Second,
6871

6972
EnableTracing: true,
73+
74+
BufferSize: 4096,
7075
}
7176

7277
conf.Dialer = func(ctx context.Context, network, addr string) (net.Conn, error) {
@@ -90,6 +95,12 @@ func WithOptions(opts ...Option) Option {
9095
}
9196
}
9297

98+
func WithConfig(after *Config) Option {
99+
return func(before *Config) {
100+
*before = *after
101+
}
102+
}
103+
93104
func WithNetwork(network string) Option {
94105
if network == "" {
95106
panic("network is empty")
@@ -186,6 +197,12 @@ func WithWriteTimeout(writeTimeout time.Duration) Option {
186197
}
187198
}
188199

200+
func WithBufferSize(size int) Option {
201+
return func(conf *Config) {
202+
conf.BufferSize = size
203+
}
204+
}
205+
189206
// WithResetSessionFunc configures a function that is called prior to executing
190207
// a query on a connection that has been used before.
191208
// If the func returns driver.ErrBadConn, the connection is discarded.

driver/pgdriver/config_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func TestParseDSN(t *testing.T) {
2828
ReadTimeout: 10 * time.Second,
2929
WriteTimeout: 5 * time.Second,
3030
EnableTracing: true,
31+
BufferSize: 4096,
3132
},
3233
},
3334
{
@@ -42,6 +43,7 @@ func TestParseDSN(t *testing.T) {
4243
ReadTimeout: 2 * time.Second,
4344
WriteTimeout: 3 * time.Second,
4445
EnableTracing: true,
46+
BufferSize: 4096,
4547
},
4648
},
4749
{
@@ -59,6 +61,7 @@ func TestParseDSN(t *testing.T) {
5961
ReadTimeout: 10 * time.Second,
6062
WriteTimeout: 5 * time.Second,
6163
EnableTracing: true,
64+
BufferSize: 4096,
6265
},
6366
},
6467
{
@@ -73,6 +76,7 @@ func TestParseDSN(t *testing.T) {
7376
ReadTimeout: 10 * time.Second,
7477
WriteTimeout: 5 * time.Second,
7578
EnableTracing: true,
79+
BufferSize: 4096,
7680
},
7781
},
7882
{
@@ -87,6 +91,7 @@ func TestParseDSN(t *testing.T) {
8791
ReadTimeout: 10 * time.Second,
8892
WriteTimeout: 5 * time.Second,
8993
EnableTracing: true,
94+
BufferSize: 4096,
9095
},
9196
},
9297
{
@@ -101,6 +106,7 @@ func TestParseDSN(t *testing.T) {
101106
ReadTimeout: 10 * time.Second,
102107
WriteTimeout: 5 * time.Second,
103108
EnableTracing: true,
109+
BufferSize: 4096,
104110
},
105111
},
106112
{
@@ -115,6 +121,7 @@ func TestParseDSN(t *testing.T) {
115121
ReadTimeout: 10 * time.Second,
116122
WriteTimeout: 5 * time.Second,
117123
EnableTracing: true,
124+
BufferSize: 4096,
118125
},
119126
},
120127
}

driver/pgdriver/driver.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ func newConn(ctx context.Context, conf *Config) (*Conn, error) {
123123
cn := &Conn{
124124
conf: conf,
125125
netConn: netConn,
126-
rd: newReader(netConn),
126+
rd: newReader(netConn, conf.BufferSize),
127127
}
128128

129129
if conf.TLSConfig != nil {

driver/pgdriver/listener.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,19 @@ type Listener struct {
3636
}
3737

3838
func NewListener(db *bun.DB) *Listener {
39-
return &Listener{
39+
ln := &Listener{
4040
db: db,
4141
driver: db.Driver().(Driver).connector,
4242
exit: make(chan struct{}),
4343
}
44+
if conf := ln.driver.Config(); conf.BufferSize < 8000 {
45+
// https://github.com/uptrace/bun/issues/1201
46+
// listener's payloads can be up to 8000 bytes
47+
newConf := *conf
48+
newConf.BufferSize = 8192
49+
ln.driver = NewConnector(WithConfig(&newConf))
50+
}
51+
return ln
4452
}
4553

4654
// Close closes the listener, releasing any open resources.

driver/pgdriver/proto.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ type reader struct {
8484
buf []byte
8585
}
8686

87-
func newReader(r io.Reader) *reader {
87+
func newReader(r io.Reader, size int) *reader {
8888
return &reader{
89-
Reader: bufio.NewReader(r),
89+
Reader: bufio.NewReaderSize(r, size),
9090
buf: make([]byte, 128),
9191
}
9292
}

0 commit comments

Comments
 (0)