diff --git a/go/vt/vttablet/tabletserver/messager/message_manager.go b/go/vt/vttablet/tabletserver/messager/message_manager.go index f418d8557e9..c3bb38f65b1 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager.go @@ -165,6 +165,8 @@ type messageManager struct { fieldResult *sqltypes.Result ackWaitTime time.Duration purgeAfter time.Duration + minBackoff time.Duration + maxBackoff time.Duration batchSize int pollerTicks *timer.Timer purgeTicks *timer.Timer @@ -223,6 +225,8 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos }, ackWaitTime: table.MessageInfo.AckWaitDuration, purgeAfter: table.MessageInfo.PurgeAfterDuration, + minBackoff: table.MessageInfo.MinBackoff, + maxBackoff: table.MessageInfo.MaxBackoff, batchSize: table.MessageInfo.BatchSize, cache: newCache(table.MessageInfo.CacheSize), pollerTicks: timer.NewTimer(table.MessageInfo.PollInterval), @@ -246,11 +250,19 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos mm.ackQuery = sqlparser.BuildParsedQuery( "update %v set time_acked = %a, time_next = null where id in %a and time_acked is null", mm.name, ":time_acked", "::ids") - mm.postponeQuery = sqlparser.BuildParsedQuery( - "update %v set time_next = %a+(%a< 0 { + mm.postponeQuery = sqlparser.BuildParsedQuery( + "update %v set time_next = %a+if(%a< %a, %a, %a< 0 { + bvs["max_backoff"] = sqltypes.Int64BindVariable(int64(mm.maxBackoff)) } + + return mm.postponeQuery.Query, bvs } // GeneratePurgeQuery returns the query and bind vars for purging messages. diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index 1cee1b698f5..c7b41a3e632 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -65,6 +65,24 @@ func newMMTable() *schema.Table { Fields: testFields, AckWaitDuration: 1 * time.Second, PurgeAfterDuration: 3 * time.Second, + MinBackoff: 1 * time.Second, + BatchSize: 1, + CacheSize: 10, + PollInterval: 1 * time.Second, + }, + } +} + +func newMMTableWithBackoff() *schema.Table { + return &schema.Table{ + Name: sqlparser.NewTableIdent("foo"), + Type: schema.Message, + MessageInfo: &schema.MessageInfo{ + Fields: testFields, + AckWaitDuration: 10 * time.Second, + PurgeAfterDuration: 3 * time.Second, + MinBackoff: 1 * time.Second, + MaxBackoff: 4 * time.Second, BatchSize: 1, CacheSize: 10, PollInterval: 1 * time.Second, @@ -732,7 +750,7 @@ func TestMMGenerate(t *testing.T) { } query, bv = mm.GeneratePostponeQuery([]string{"1", "2"}) - wantQuery = "update foo set time_next = :time_now+(:wait_time<