Skip to content
This repository has been archived by the owner on Sep 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#6114 from derekperkins/messages-ackWait
Browse files Browse the repository at this point in the history
messager: use ackWaitTimeout in postpone query
  • Loading branch information
sougou authored Apr 27, 2020
2 parents 95f8082 + 06ab145 commit 66386cc
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
42 changes: 23 additions & 19 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"fmt"
"io"
"math/rand"
"sync"
"time"

Expand Down Expand Up @@ -264,36 +265,37 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos
func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.Duration) *sqlparser.ParsedQuery {
var args []interface{}

buf := bytes.NewBufferString("update %v set time_next = ")
args = append(args, name)
// since messages are immediately postponed upon sending, we need to add exponential backoff on top
// of the ackWaitTime, otherwise messages will be resent too quickly.
buf := bytes.NewBufferString("update %v set time_next = %a + %a + ")
args = append(args, name, ":time_now", ":wait_time")

// have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages
// whenever this is injected, append (:min_backoff, :time_now)
jitteredBackoff := "FLOOR((%a<<ifnull(epoch, 0))*(.666666 + (RAND(%a) * .666666)))"
// have backoff be +/- 33%, whenever this is injected, append (:min_backoff, :jitter)
jitteredBackoff := "FLOOR((%a<<ifnull(epoch, 0)) * %a)"

//
// if the jittered backoff is less than min_backoff, just set it to time_now + min_backoff
// if the jittered backoff is less than min_backoff, just set it to :min_backoff
//
buf.WriteString(fmt.Sprintf("IF(%s < %%a, %%a + %%a, ", jitteredBackoff))
buf.WriteString(fmt.Sprintf("IF(%s < %%a, %%a, ", jitteredBackoff))
// jitteredBackoff < :min_backoff
args = append(args, ":min_backoff", ":time_now", ":min_backoff")
// if it is less, then use :time_now + :min_backoff
args = append(args, ":time_now", ":min_backoff")
args = append(args, ":min_backoff", ":jitter", ":min_backoff")
// if it is less, then use :min_backoff
args = append(args, ":min_backoff")

// now we are setting the false case on the above IF statement
if maxBackoff == 0 {
// if there is no max_backoff, just use :time_now + jitteredBackoff
buf.WriteString(fmt.Sprintf("%%a + %s", jitteredBackoff))
args = append(args, ":time_now", ":min_backoff", ":time_now")
// if there is no max_backoff, just use jitteredBackoff
buf.WriteString(jitteredBackoff)
args = append(args, ":min_backoff", ":jitter")
} else {
// make sure that it doesn't exceed max_backoff
buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff))
buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a, %s)", jitteredBackoff, jitteredBackoff))
// jitteredBackoff > :max_backoff
args = append(args, ":min_backoff", ":time_now", ":max_backoff")
// if it is greater, then use :time_now + :max_backoff
args = append(args, ":time_now", ":max_backoff")
// otherwise just use :time_now + jitteredBackoff
args = append(args, ":time_now", ":min_backoff", ":time_now")
args = append(args, ":min_backoff", ":jitter", ":max_backoff")
// if it is greater, then use :max_backoff
args = append(args, ":max_backoff")
// otherwise just use jitteredBackoff
args = append(args, ":min_backoff", ":jitter")
}

// close the if statement
Expand Down Expand Up @@ -847,7 +849,9 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin

bvs := map[string]*querypb.BindVariable{
"time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()),
"wait_time": sqltypes.Int64BindVariable(int64(mm.ackWaitTime)),
"min_backoff": sqltypes.Int64BindVariable(int64(mm.minBackoff)),
"jitter": sqltypes.Float64BindVariable(.666666 + rand.Float64()*.666666),
"ids": idbvs,
}

Expand Down
18 changes: 16 additions & 2 deletions go/vt/vttablet/tabletserver/messager/message_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) {
utils.MustMatch(t, wantids, gotids, "did not match")

query, bv = mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery = "update foo set time_next = IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) < :min_backoff, :time_now + :min_backoff, :time_now + FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666)))), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
wantQuery = "update foo set time_next = :time_now + :wait_time + IF(FLOOR((:min_backoff<<ifnull(epoch, 0)) * :jitter) < :min_backoff, :min_backoff, FLOOR((:min_backoff<<ifnull(epoch, 0)) * :jitter)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
Expand All @@ -763,7 +763,14 @@ func TestMMGenerate(t *testing.T) {
// time_now cannot be compared.
delete(bv, "time_now")
}
if _, ok := bv["jitter"]; !ok {
t.Errorf("jitter is absent in %v", bv)
} else {
// jitter cannot be compared.
delete(bv, "jitter")
}
wantbv := map[string]*querypb.BindVariable{
"wait_time": sqltypes.Int64BindVariable(1e9),
"min_backoff": sqltypes.Int64BindVariable(1e9),
"ids": wantids,
}
Expand All @@ -790,7 +797,7 @@ func TestMMGenerateWithBackoff(t *testing.T) {
wantids := sqltypes.TestBindVariable([]interface{}{"1", "2"})

query, bv := mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery := "update foo set time_next = IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) < :min_backoff, :time_now + :min_backoff, IF(FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))) > :max_backoff, :time_now + :max_backoff, :time_now + FLOOR((:min_backoff<<ifnull(epoch, 0))*(.666666 + (RAND(:time_now) * .666666))))), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
wantQuery := "update foo set time_next = :time_now + :wait_time + IF(FLOOR((:min_backoff<<ifnull(epoch, 0)) * :jitter) < :min_backoff, :min_backoff, IF(FLOOR((:min_backoff<<ifnull(epoch, 0)) * :jitter) > :max_backoff, :max_backoff, FLOOR((:min_backoff<<ifnull(epoch, 0)) * :jitter))), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
Expand All @@ -800,7 +807,14 @@ func TestMMGenerateWithBackoff(t *testing.T) {
// time_now cannot be compared.
delete(bv, "time_now")
}
if _, ok := bv["jitter"]; !ok {
t.Errorf("jitter is absent in %v", bv)
} else {
// jitter cannot be compared.
delete(bv, "jitter")
}
wantbv := map[string]*querypb.BindVariable{
"wait_time": sqltypes.Int64BindVariable(1e10),
"min_backoff": sqltypes.Int64BindVariable(1e9),
"max_backoff": sqltypes.Int64BindVariable(4e9),
"ids": wantids,
Expand Down

0 comments on commit 66386cc

Please sign in to comment.