Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read index retry #12780

Merged
merged 2 commits into from
Mar 23, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 104 additions & 62 deletions server/etcdserver/v3_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
// We should stop accepting new proposals if the gap growing to a certain point.
maxGapBetweenApplyAndCommitIndex = 5000
traceThreshold = 100 * time.Millisecond
readIndexRetryTime = 500 * time.Millisecond
)

type RaftKV interface {
Expand Down Expand Up @@ -706,12 +707,8 @@ func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.In
func (s *EtcdServer) Watchable() mvcc.WatchableKV { return s.KV() }

func (s *EtcdServer) linearizableReadLoop() {
var rs raft.ReadState

for {
ctxToSend := make([]byte, 8)
id1 := s.reqIDGen.Next()
binary.BigEndian.PutUint64(ctxToSend, id1)
requestId := s.reqIDGen.Next()
leaderChangedNotifier := s.LeaderChangedNotify()
select {
case <-leaderChangedNotifier:
Expand All @@ -724,89 +721,134 @@ func (s *EtcdServer) linearizableReadLoop() {
// as a single loop is can unlock multiple reads, it is not very useful
// to propagate the trace from Txn or Range.
trace := traceutil.New("linearizableReadLoop", s.Logger())
nextnr := newNotifier()

nextnr := newNotifier()
s.readMu.Lock()
nr := s.readNotifier
s.readNotifier = nextnr
s.readMu.Unlock()

lg := s.Logger()
cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
if err := s.r.ReadIndex(cctx, ctxToSend); err != nil {
cancel()
if err == raft.ErrStopped {
return
}
lg.Warn("failed to get read index from Raft", zap.Error(err))
readIndexFailed.Inc()
confirmedIndex, err := s.requestCurrentIndex(leaderChangedNotifier, requestId)
if isStopped(err) {
return
}
if err != nil {
nr.notify(err)
continue
}
cancel()

var (
timeout bool
done bool
)
for !timeout && !done {
select {
case rs = <-s.r.readStateC:
done = bytes.Equal(rs.RequestCtx, ctxToSend)
if !done {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
id2 := uint64(0)
if len(rs.RequestCtx) == 8 {
id2 = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", id1),
zap.Uint64("received-request-id", id2),
)
slowReadIndex.Inc()
}
case <-leaderChangedNotifier:
timeout = true
readIndexFailed.Inc()
// return a retryable error.
nr.notify(ErrLeaderChanged)
case <-time.After(s.Cfg.ReqTimeout()):
lg.Warn("timed out waiting for read index response (local node might have slow network)", zap.Duration("timeout", s.Cfg.ReqTimeout()))
nr.notify(ErrTimeout)
timeout = true
slowReadIndex.Inc()
case <-s.stopping:
return
}
}
if !done {
continue
}
trace.Step("read index received")

index := rs.Index
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: index})
trace.AddField(traceutil.Field{Key: "readStateIndex", Value: confirmedIndex})

ai := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(ai, 10)})
appliedIndex := s.getAppliedIndex()
trace.AddField(traceutil.Field{Key: "appliedIndex", Value: strconv.FormatUint(appliedIndex, 10)})

if ai < index {
if appliedIndex < confirmedIndex {
select {
case <-s.applyWait.Wait(index):
case <-s.applyWait.Wait(confirmedIndex):
case <-s.stopping:
return
}
}
// unblock all l-reads requested at indices before rs.Index
// unblock all l-reads requested at indices before confirmedIndex
nr.notify(nil)
trace.Step("applied index is now lower than readState.Index")

trace.LogAllStepsIfLong(traceThreshold)
}
}

func isStopped(err error) bool {
return err == raft.ErrStopped || err == ErrStopped
}

func (s *EtcdServer) requestCurrentIndex(leaderChangedNotifier <-chan struct{}, requestId uint64) (uint64, error) {
ptabor marked this conversation as resolved.
Show resolved Hide resolved
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}

lg := s.Logger()
errorTimer := time.NewTimer(s.Cfg.ReqTimeout())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand it...

Would following approach work:

  • retryTimer instead of errorTimer.
  • both selects merged into a single select.
    ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First, I've decided to use 2 selects instead of one because of this article. In short: it is to guard against picking case at random when multiple channels are unblocked. It seemed reasonable when writing code, however now I can't say any good reason to have it this way, so I'll move it to single select.

Second think is "retryTimer instead of errorTimer". I'm not sure if I understand you correctly, but if you suggest having retryTimer (one that measure 500ms) outside of for loop and putting errorTimer (one that measure 7s) in <-time.After(...), then it would not work as we need to have single errorTimer across potentially multiple retryTimer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A.d. 1. I see. But I think we don't really care which branch will be taken if it happens that multiple activate exactly at the same time. So would go for merging.

A.d. 2: You are right. Potentially you can use top-level 'ticker' (time.NewTicker) to get periodic notification to refresh the request. The benefit is that it automatically will cancel goroutine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A.d. 2:
I see Timer more intuitive than Ticker here, as "ping me after 500ms" describes concept of timeout better than "ping me every 500ms". However I don't understand below part

The benefit is that it automatically will cancel goroutine

Could you elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<-time.After(readIndexRetryTime)

under the cover starts go-routine the sleep's some time and populates the channel.

If the select exits for another reason, the goroutne still exists for 'up to 500ms' to populate the channel (that no-one is waiting for).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I've changed it to single timer initialised outside of loop and I use retryTimer.Reset(readIndexRetryTime) to refresh it.

defer errorTimer.Stop()
retryTimer := time.NewTimer(readIndexRetryTime)
defer retryTimer.Stop()

for {
select {
case rs := <-s.r.readStateC:
requestIdBytes := uint64ToBigEndianBytes(requestId)
gotOwnResponse := bytes.Equal(rs.RequestCtx, requestIdBytes)
if !gotOwnResponse {
// a previous request might time out. now we should ignore the response of it and
// continue waiting for the response of the current requests.
responseId := uint64(0)
if len(rs.RequestCtx) == 8 {
responseId = binary.BigEndian.Uint64(rs.RequestCtx)
}
lg.Warn(
"ignored out-of-date read index response; local node read indexes queueing up and waiting to be in sync with leader",
zap.Uint64("sent-request-id", requestId),
zap.Uint64("received-request-id", responseId),
)
slowReadIndex.Inc()
continue
}
return rs.Index, nil
case <-leaderChangedNotifier:
readIndexFailed.Inc()
// return a retryable error.
return 0, ErrLeaderChanged
case <-retryTimer.C:
lg.Warn(
"waiting for ReadIndex response took too long, retrying",
zap.Uint64("sent-request-id", requestId),
zap.Duration("retry-timeout", readIndexRetryTime),
)
err := s.sendReadIndex(requestId)
if err != nil {
return 0, err
}
retryTimer.Reset(readIndexRetryTime)
continue
case <-errorTimer.C:
lg.Warn(
"timed out waiting for read index response (local node might have slow network)",
zap.Duration("timeout", s.Cfg.ReqTimeout()),
)
slowReadIndex.Inc()
return 0, ErrTimeout
case <-s.stopping:
return 0, ErrStopped
}
}
}

func uint64ToBigEndianBytes(number uint64) []byte {
byteResult := make([]byte, 8)
binary.BigEndian.PutUint64(byteResult, number)
return byteResult
}

func (s *EtcdServer) sendReadIndex(requestIndex uint64) error {
ctxToSend := uint64ToBigEndianBytes(requestIndex)

cctx, cancel := context.WithTimeout(context.Background(), s.Cfg.ReqTimeout())
err := s.r.ReadIndex(cctx, ctxToSend)
cancel()
if err == raft.ErrStopped {
return err
}
if err != nil {
lg := s.Logger()
lg.Warn("failed to get read index from Raft", zap.Error(err))
readIndexFailed.Inc()
return err
}
return nil
}

func (s *EtcdServer) LinearizableReadNotify(ctx context.Context) error {
return s.linearizableReadNotify(ctx)
}
Expand Down