Skip to content

Commit

Permalink
Add a separate write timeout
Browse files Browse the repository at this point in the history
Using the same write timeout and read timeout means that the query
can take cfg.Timeout * 2 time. We want a shorter write timeout.
  • Loading branch information
martin-sucha committed Mar 18, 2022
1 parent 830d6d0 commit 1268384
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
1 change: 1 addition & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type ClusterConfig struct {
ProtoVersion int
Timeout time.Duration // connection timeout (default: 600ms)
ConnectTimeout time.Duration // initial connection timeout, used during initial dial to server (default: 600ms)
WriteTimeout time.Duration // timeout for writing a query. defaults to Timeout if not specified.
Port int // port (default: 9042)
Keyspace string // initial keyspace (optional)
NumConns int // number of connections per host (default: 2)
Expand Down
12 changes: 10 additions & 2 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ type ConnConfig struct {
ProtoVersion int
CQLVersion string
Timeout time.Duration
WriteTimeout time.Duration
ConnectTimeout time.Duration
Dialer Dialer
Compressor Compressor
Expand Down Expand Up @@ -166,6 +167,7 @@ type Conn struct {
w contextWriter

timeout time.Duration
writeTimeout time.Duration
cfg *ConnConfig
frameObserver FrameHeaderObserver
streamObserver StreamObserver
Expand Down Expand Up @@ -272,6 +274,11 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
conn = tconn
}

writeTimeout := cfg.Timeout
if cfg.WriteTimeout > 0 {
writeTimeout = cfg.WriteTimeout
}

ctx, cancel := context.WithCancel(ctx)
c := &Conn{
conn: conn,
Expand All @@ -288,14 +295,15 @@ func (s *Session) dialWithoutObserver(ctx context.Context, host *HostInfo, cfg *
frameObserver: s.frameObserver,
w: &deadlineContextWriter{
w: conn,
timeout: cfg.Timeout,
timeout: writeTimeout,
semaphore: make(chan struct{}, 1),
quit: make(chan struct{}),
},
ctx: ctx,
cancel: cancel,
logger: cfg.logger(),
streamObserver: s.streamObserver,
writeTimeout: writeTimeout,
}

if err := c.init(ctx); err != nil {
Expand Down Expand Up @@ -332,7 +340,7 @@ func (c *Conn) init(ctx context.Context) error {

// dont coalesce startup frames
if c.session.cfg.WriteCoalesceWaitTime > 0 && !c.cfg.disableCoalesce {
c.w = newWriteCoalescer(c.conn, c.timeout, c.session.cfg.WriteCoalesceWaitTime, ctx.Done())
c.w = newWriteCoalescer(c.conn, c.writeTimeout, c.session.cfg.WriteCoalesceWaitTime, ctx.Done())
}

go c.serve(ctx)
Expand Down
1 change: 1 addition & 0 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func connConfig(cfg *ClusterConfig) (*ConnConfig, error) {
ProtoVersion: cfg.ProtoVersion,
CQLVersion: cfg.CQLVersion,
Timeout: cfg.Timeout,
WriteTimeout: cfg.WriteTimeout,
ConnectTimeout: cfg.ConnectTimeout,
Dialer: cfg.Dialer,
Compressor: cfg.Compressor,
Expand Down

0 comments on commit 1268384

Please sign in to comment.