Skip to content

Commit 44ca396

Browse files
author
Anthony Romano
authored
Merge pull request #8003 from vitalyisaev2/UCS-1381
rafthttp: configurable stream reader retry timeout
2 parents c578ac4 + 0a9092a commit 44ca396

File tree

4 files changed

+58
-29
lines changed

4 files changed

+58
-29
lines changed

rafthttp/peer.go

+4
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/coreos/etcd/raft/raftpb"
2525
"github.com/coreos/etcd/snap"
2626
"golang.org/x/net/context"
27+
"golang.org/x/time/rate"
2728
)
2829

2930
const (
@@ -188,6 +189,7 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
188189
status: status,
189190
recvc: p.recvc,
190191
propc: p.propc,
192+
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
191193
}
192194
p.msgAppReader = &streamReader{
193195
peerID: peerID,
@@ -197,7 +199,9 @@ func startPeer(transport *Transport, urls types.URLs, peerID types.ID, fs *stats
197199
status: status,
198200
recvc: p.recvc,
199201
propc: p.propc,
202+
rl: rate.NewLimiter(transport.DialRetryFrequency, 1),
200203
}
204+
201205
p.msgAppV2Reader.start()
202206
p.msgAppReader.start()
203207

rafthttp/stream.go

+32-27
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"sync"
2626
"time"
2727

28+
"golang.org/x/time/rate"
29+
2830
"github.com/coreos/etcd/etcdserver/stats"
2931
"github.com/coreos/etcd/pkg/httputil"
3032
"github.com/coreos/etcd/pkg/transport"
@@ -243,7 +245,9 @@ func (cw *streamWriter) closeUnlocked() bool {
243245
if !cw.working {
244246
return false
245247
}
246-
cw.closer.Close()
248+
if err := cw.closer.Close(); err != nil {
249+
plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
250+
}
247251
if len(cw.msgc) > 0 {
248252
cw.r.ReportUnreachable(uint64(cw.peerID))
249253
}
@@ -278,25 +282,28 @@ type streamReader struct {
278282
recvc chan<- raftpb.Message
279283
propc chan<- raftpb.Message
280284

285+
rl *rate.Limiter // alters the frequency of dial retrial attempts
286+
281287
errorc chan<- error
282288

283289
mu sync.Mutex
284290
paused bool
285-
cancel func()
286291
closer io.Closer
287292

288-
stopc chan struct{}
289-
done chan struct{}
293+
ctx context.Context
294+
cancel context.CancelFunc
295+
done chan struct{}
290296
}
291297

292-
func (r *streamReader) start() {
293-
r.stopc = make(chan struct{})
294-
r.done = make(chan struct{})
295-
if r.errorc == nil {
296-
r.errorc = r.tr.ErrorC
298+
func (cr *streamReader) start() {
299+
cr.done = make(chan struct{})
300+
if cr.errorc == nil {
301+
cr.errorc = cr.tr.ErrorC
297302
}
298-
299-
go r.run()
303+
if cr.ctx == nil {
304+
cr.ctx, cr.cancel = context.WithCancel(context.Background())
305+
}
306+
go cr.run()
300307
}
301308

302309
func (cr *streamReader) run() {
@@ -311,7 +318,7 @@ func (cr *streamReader) run() {
311318
} else {
312319
cr.status.activate()
313320
plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
314-
err := cr.decodeLoop(rc, t)
321+
err = cr.decodeLoop(rc, t)
315322
plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
316323
switch {
317324
// all data is read out
@@ -322,15 +329,16 @@ func (cr *streamReader) run() {
322329
cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
323330
}
324331
}
325-
select {
326-
// Wait 100ms to create a new stream, so it doesn't bring too much
327-
// overhead when retry.
328-
case <-time.After(100 * time.Millisecond):
329-
case <-cr.stopc:
332+
// Wait for a while before new dial attempt
333+
err = cr.rl.Wait(cr.ctx)
334+
if cr.ctx.Err() != nil {
330335
plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
331336
close(cr.done)
332337
return
333338
}
339+
if err != nil {
340+
plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
341+
}
334342
}
335343
}
336344

@@ -346,7 +354,7 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
346354
plog.Panicf("unhandled stream type %s", t)
347355
}
348356
select {
349-
case <-cr.stopc:
357+
case <-cr.ctx.Done():
350358
cr.mu.Unlock()
351359
if err := rc.Close(); err != nil {
352360
return err
@@ -401,11 +409,8 @@ func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
401409
}
402410

403411
func (cr *streamReader) stop() {
404-
close(cr.stopc)
405412
cr.mu.Lock()
406-
if cr.cancel != nil {
407-
cr.cancel()
408-
}
413+
cr.cancel()
409414
cr.close()
410415
cr.mu.Unlock()
411416
<-cr.done
@@ -429,13 +434,11 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
429434

430435
setPeerURLsHeader(req, cr.tr.URLs)
431436

432-
ctx, cancel := context.WithCancel(context.Background())
433-
req = req.WithContext(ctx)
437+
req = req.WithContext(cr.ctx)
434438

435439
cr.mu.Lock()
436-
cr.cancel = cancel
437440
select {
438-
case <-cr.stopc:
441+
case <-cr.ctx.Done():
439442
cr.mu.Unlock()
440443
return nil, fmt.Errorf("stream reader is stopped")
441444
default:
@@ -497,7 +500,9 @@ func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
497500

498501
func (cr *streamReader) close() {
499502
if cr.closer != nil {
500-
cr.closer.Close()
503+
if err := cr.closer.Close(); err != nil {
504+
plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
505+
}
501506
}
502507
cr.closer = nil
503508
}

rafthttp/stream_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package rafthttp
1616

1717
import (
18+
"context"
1819
"errors"
1920
"fmt"
2021
"io"
@@ -25,6 +26,8 @@ import (
2526
"testing"
2627
"time"
2728

29+
"golang.org/x/time/rate"
30+
2831
"github.com/coreos/etcd/etcdserver/stats"
2932
"github.com/coreos/etcd/pkg/testutil"
3033
"github.com/coreos/etcd/pkg/types"
@@ -113,6 +116,7 @@ func TestStreamReaderDialRequest(t *testing.T) {
113116
peerID: types.ID(2),
114117
tr: &Transport{streamRt: tr, ClusterID: types.ID(1), ID: types.ID(1)},
115118
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
119+
ctx: context.Background(),
116120
}
117121
sr.dial(tt)
118122

@@ -167,6 +171,7 @@ func TestStreamReaderDialResult(t *testing.T) {
167171
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
168172
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
169173
errorc: make(chan error, 1),
174+
ctx: context.Background(),
170175
}
171176

172177
_, err := sr.dial(streamTypeMessage)
@@ -192,6 +197,7 @@ func TestStreamReaderStopOnDial(t *testing.T) {
192197
errorc: make(chan error, 1),
193198
typ: streamTypeMessage,
194199
status: newPeerStatus(types.ID(2)),
200+
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
195201
}
196202
tr.onResp = func() {
197203
// stop() waits for the run() goroutine to exit, but that exit
@@ -246,6 +252,7 @@ func TestStreamReaderDialDetectUnsupport(t *testing.T) {
246252
peerID: types.ID(2),
247253
tr: &Transport{streamRt: tr, ClusterID: types.ID(1)},
248254
picker: mustNewURLPicker(t, []string{"http://localhost:2380"}),
255+
ctx: context.Background(),
249256
}
250257

251258
_, err := sr.dial(typ)
@@ -311,6 +318,7 @@ func TestStream(t *testing.T) {
311318
status: newPeerStatus(types.ID(2)),
312319
recvc: recvc,
313320
propc: propc,
321+
rl: rate.NewLimiter(rate.Every(100*time.Millisecond), 1),
314322
}
315323
sr.start()
316324

rafthttp/transport.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/coreos/pkg/capnslog"
3030
"github.com/xiang90/probing"
3131
"golang.org/x/net/context"
32+
"golang.org/x/time/rate"
3233
)
3334

3435
var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp"))
@@ -94,8 +95,12 @@ type Transporter interface {
9495
// User needs to call Start before calling other functions, and call
9596
// Stop when the Transport is no longer used.
9697
type Transport struct {
97-
DialTimeout time.Duration // maximum duration before timing out dial of the request
98-
TLSInfo transport.TLSInfo // TLS information used when creating connection
98+
DialTimeout time.Duration // maximum duration before timing out dial of the request
99+
// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
100+
// a distinct rate limiter is created per every peer (default value: 10 events/sec)
101+
DialRetryFrequency rate.Limit
102+
103+
TLSInfo transport.TLSInfo // TLS information used when creating connection
99104

100105
ID types.ID // local member ID
101106
URLs types.URLs // local peer URLs
@@ -135,6 +140,13 @@ func (t *Transport) Start() error {
135140
t.remotes = make(map[types.ID]*remote)
136141
t.peers = make(map[types.ID]Peer)
137142
t.prober = probing.NewProber(t.pipelineRt)
143+
144+
// If client didn't provide dial retry frequence, use the default
145+
// (100ms backoff between attempts to create a new stream),
146+
// so it doesn't bring too much overhead when retry.
147+
if t.DialRetryFrequency == 0 {
148+
t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
149+
}
138150
return nil
139151
}
140152

0 commit comments

Comments
 (0)