From 6626ddcf70cbf80f9cf95879f25644bf380dcdd2 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Sun, 26 Apr 2020 08:42:14 -0600 Subject: [PATCH 1/2] messager: use ackWaitTimeout in postpone query without this, messages will be resent too quickly Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 31 ++++++++++--------- .../messager/message_manager_test.go | 6 ++-- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 5a5d889da90..5151ea8bc73 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -264,36 +264,38 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.Duration) *sqlparser.ParsedQuery { var args []interface{} - buf := bytes.NewBufferString("update %v set time_next = ") - args = append(args, name) + // since messages are immediately postponed upon sending, we need to add exponential backoff on top + // of the ackWaitTime, otherwise messages will be resent too quickly. + buf := bytes.NewBufferString("update %v set time_next = %a + %a + ") + args = append(args, name, ":time_now", ":wait_time") // have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages // whenever this is injected, append (:min_backoff, :time_now) jitteredBackoff := "FLOOR((%a< %%a, %%a + %%a, %%a + %s)", jitteredBackoff, jitteredBackoff)) + buf.WriteString(fmt.Sprintf("IF(%s > %%a, %%a, %s)", jitteredBackoff, jitteredBackoff)) // jitteredBackoff > :max_backoff args = append(args, ":min_backoff", ":time_now", ":max_backoff") - // if it is greater, then use :time_now + :max_backoff - args = append(args, ":time_now", ":max_backoff") - // otherwise just use :time_now + jitteredBackoff - args = append(args, ":time_now", ":min_backoff", ":time_now") + // if it is greater, then use :max_backoff + args = append(args, ":max_backoff") + // otherwise just use jitteredBackoff + args = append(args, ":min_backoff", ":time_now") } // close the if statement @@ -847,6 +849,7 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin bvs := map[string]*querypb.BindVariable{ "time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()), + "wait_time": sqltypes.Int64BindVariable(int64(mm.ackWaitTime)), "min_backoff": sqltypes.Int64BindVariable(int64(mm.minBackoff)), "ids": idbvs, } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 6d9ea391427..51a0f590044 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = IF(FLOOR((:min_backoff< Date: Sun, 26 Apr 2020 10:00:44 -0600 Subject: [PATCH 2/2] messager: move jitter calculation into vttablet this makes the postpone sql easier to follow, and it will make it easier to allow for user provided jitter % in the future Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 15 ++++++++------- .../messager/message_manager_test.go | 16 ++++++++++++++-- 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index 5151ea8bc73..88c949672e5 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -20,6 +20,7 @@ import ( "bytes" "fmt" "io" + "math/rand" "sync" "time" @@ -269,16 +270,15 @@ func buildPostponeQuery(name sqlparser.TableIdent, minBackoff, maxBackoff time.D buf := bytes.NewBufferString("update %v set time_next = %a + %a + ") args = append(args, name, ":time_now", ":wait_time") - // have backoff be +/- 33%, seeded with :time_now to be consistent in multiple usages - // whenever this is injected, append (:min_backoff, :time_now) - jitteredBackoff := "FLOOR((%a< %%a, %%a, %s)", jitteredBackoff, jitteredBackoff)) // jitteredBackoff > :max_backoff - args = append(args, ":min_backoff", ":time_now", ":max_backoff") + args = append(args, ":min_backoff", ":jitter", ":max_backoff") // if it is greater, then use :max_backoff args = append(args, ":max_backoff") // otherwise just use jitteredBackoff - args = append(args, ":min_backoff", ":time_now") + args = append(args, ":min_backoff", ":jitter") } // close the if statement @@ -851,6 +851,7 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin "time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()), "wait_time": sqltypes.Int64BindVariable(int64(mm.ackWaitTime)), "min_backoff": sqltypes.Int64BindVariable(int64(mm.minBackoff)), + "jitter": sqltypes.Float64BindVariable(.666666 + rand.Float64()*.666666), "ids": idbvs, } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 51a0f590044..b3aec44e4ef 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -753,7 +753,7 @@ func TestMMGenerate(t *testing.T) { utils.MustMatch(t, wantids, gotids, "did not match") query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = :time_now + :wait_time + IF(FLOOR((:min_backoff<