From a7cf20ab7d71fa96a6055483650f0b1c904acc6c Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Thu, 26 Mar 2020 22:18:16 -0600 Subject: [PATCH 1/3] messaging: support min/max backoff Signed-off-by: Derek Perkins --- .../tabletserver/messager/message_manager.go | 33 +++++++++--- .../messager/message_manager_test.go | 52 +++++++++++++++++-- .../tabletserver/schema/load_table.go | 10 ++++ go/vt/vttablet/tabletserver/schema/schema.go | 8 +++ 4 files changed, 93 insertions(+), 10 deletions(-) diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index f418d8557e9..c3bb38f65b1 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -165,6 +165,8 @@ type messageManager struct { fieldResult *sqltypes.Result ackWaitTime time.Duration purgeAfter time.Duration + minBackoff time.Duration + maxBackoff time.Duration batchSize int pollerTicks *timer.Timer purgeTicks *timer.Timer @@ -223,6 +225,8 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos }, ackWaitTime: table.MessageInfo.AckWaitDuration, purgeAfter: table.MessageInfo.PurgeAfterDuration, + minBackoff: table.MessageInfo.MinBackoff, + maxBackoff: table.MessageInfo.MaxBackoff, batchSize: table.MessageInfo.BatchSize, cache: newCache(table.MessageInfo.CacheSize), pollerTicks: timer.NewTimer(table.MessageInfo.PollInterval), @@ -246,11 +250,19 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos mm.ackQuery = sqlparser.BuildParsedQuery( "update %v set time_acked = %a, time_next = null where id in %a and time_acked is null", mm.name, ":time_acked", "::ids") - mm.postponeQuery = sqlparser.BuildParsedQuery( - "update %v set time_next = %a+(%a< 0 { + mm.postponeQuery = sqlparser.BuildParsedQuery( + "update %v set time_next = %a+if(%a< %a, %a, %a< 0 { + bvs["max_backoff"] = sqltypes.Int64BindVariable(int64(mm.maxBackoff)) } + + return mm.postponeQuery.Query, bvs } // GeneratePurgeQuery returns the query and bind vars for purging messages. diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 1cee1b698f5..c7b41a3e632 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -65,6 +65,24 @@ func newMMTable() *schema.Table { Fields: testFields, AckWaitDuration: 1 * time.Second, PurgeAfterDuration: 3 * time.Second, + MinBackoff: 1 * time.Second, + BatchSize: 1, + CacheSize: 10, + PollInterval: 1 * time.Second, + }, + } +} + +func newMMTableWithBackoff() *schema.Table { + return &schema.Table{ + Name: sqlparser.NewTableIdent("foo"), + Type: schema.Message, + MessageInfo: &schema.MessageInfo{ + Fields: testFields, + AckWaitDuration: 10 * time.Second, + PurgeAfterDuration: 3 * time.Second, + MinBackoff: 1 * time.Second, + MaxBackoff: 4 * time.Second, BatchSize: 1, CacheSize: 10, PollInterval: 1 * time.Second, @@ -732,7 +750,7 @@ func TestMMGenerate(t *testing.T) { } query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = :time_now+(:wait_time< Date: Thu, 26 Mar 2020 23:17:34 -0600 Subject: [PATCH 2/3] messaging: fix load table test Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/schema/load_table_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index 02c9af95e18..0624479e29c 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -123,6 +123,7 @@ func TestLoadTableMessage(t *testing.T) { }}, AckWaitDuration: 30 * time.Second, PurgeAfterDuration: 120 * time.Second, + MinBackoff: 30 * time.Second, BatchSize: 1, CacheSize: 10, PollInterval: 30 * time.Second, @@ -130,6 +131,12 @@ func TestLoadTableMessage(t *testing.T) { } assert.Equal(t, want, table) + // Test loading min/max backoff + table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + want.MessageInfo.MinBackoff = 10 * time.Second + want.MessageInfo.MaxBackoff = 100 * time.Second + assert.Equal(t, want, table) + // Missing property _, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30", db) wanterr := "not specified for message table" From b0f362acd214e7d5d48108fc786012e426bbbc33 Mon Sep 17 00:00:00 2001 From: Derek Perkins Date: Fri, 27 Mar 2020 07:42:19 -0600 Subject: [PATCH 3/3] messaging: fix engine test Signed-off-by: Derek Perkins --- go/vt/vttablet/tabletserver/schema/engine_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 2af7a9dc2a3..62269d76891 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -402,6 +402,7 @@ func initialSchema() map[string]*Table { }}, AckWaitDuration: 30 * time.Second, PurgeAfterDuration: 120 * time.Second, + MinBackoff: 30 * time.Second, BatchSize: 1, CacheSize: 10, PollInterval: 30 * time.Second,