Skip to content
This repository has been archived by the owner on Sep 21, 2022. It is now read-only.

Commit

Permalink
Merge pull request vitessio#5979 from derekperkins/min-max-backoff
Browse files Browse the repository at this point in the history
messaging: support min/max backoff
  • Loading branch information
sougou authored Mar 31, 2020
2 parents 1735698 + b0f362a commit bcb293a
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 10 deletions.
33 changes: 26 additions & 7 deletions go/vt/vttablet/tabletserver/messager/message_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ type messageManager struct {
fieldResult *sqltypes.Result
ackWaitTime time.Duration
purgeAfter time.Duration
minBackoff time.Duration
maxBackoff time.Duration
batchSize int
pollerTicks *timer.Timer
purgeTicks *timer.Timer
Expand Down Expand Up @@ -223,6 +225,8 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos
},
ackWaitTime: table.MessageInfo.AckWaitDuration,
purgeAfter: table.MessageInfo.PurgeAfterDuration,
minBackoff: table.MessageInfo.MinBackoff,
maxBackoff: table.MessageInfo.MaxBackoff,
batchSize: table.MessageInfo.BatchSize,
cache: newCache(table.MessageInfo.CacheSize),
pollerTicks: timer.NewTimer(table.MessageInfo.PollInterval),
Expand All @@ -246,11 +250,19 @@ func newMessageManager(tsv TabletService, vs VStreamer, table *schema.Table, pos
mm.ackQuery = sqlparser.BuildParsedQuery(
"update %v set time_acked = %a, time_next = null where id in %a and time_acked is null",
mm.name, ":time_acked", "::ids")
mm.postponeQuery = sqlparser.BuildParsedQuery(
"update %v set time_next = %a+(%a<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null",
mm.name, ":time_now", ":wait_time", "::ids")
mm.purgeQuery = sqlparser.BuildParsedQuery(
"delete from %v where time_acked < %a limit 500", mm.name, ":time_acked")

// if a maxBackoff is set, incorporate it into the update statement
if mm.maxBackoff > 0 {
mm.postponeQuery = sqlparser.BuildParsedQuery(
"update %v set time_next = %a+if(%a<<ifnull(epoch, 0) > %a, %a, %a<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null",
mm.name, ":time_now", ":min_backoff", ":max_backoff", ":max_backoff", ":min_backoff", "::ids")
} else {
mm.postponeQuery = sqlparser.BuildParsedQuery(
"update %v set time_next = %a+(%a<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in %a and time_acked is null",
mm.name, ":time_now", ":min_backoff", "::ids")
}
return mm
}

Expand Down Expand Up @@ -792,11 +804,18 @@ func (mm *messageManager) GeneratePostponeQuery(ids []string) (string, map[strin
Value: []byte(id),
})
}
return mm.postponeQuery.Query, map[string]*querypb.BindVariable{
"time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()),
"wait_time": sqltypes.Int64BindVariable(int64(mm.ackWaitTime)),
"ids": idbvs,

bvs := map[string]*querypb.BindVariable{
"time_now": sqltypes.Int64BindVariable(time.Now().UnixNano()),
"min_backoff": sqltypes.Int64BindVariable(int64(mm.minBackoff)),
"ids": idbvs,
}

if mm.maxBackoff > 0 {
bvs["max_backoff"] = sqltypes.Int64BindVariable(int64(mm.maxBackoff))
}

return mm.postponeQuery.Query, bvs
}

// GeneratePurgeQuery returns the query and bind vars for purging messages.
Expand Down
52 changes: 49 additions & 3 deletions go/vt/vttablet/tabletserver/messager/message_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,24 @@ func newMMTable() *schema.Table {
Fields: testFields,
AckWaitDuration: 1 * time.Second,
PurgeAfterDuration: 3 * time.Second,
MinBackoff: 1 * time.Second,
BatchSize: 1,
CacheSize: 10,
PollInterval: 1 * time.Second,
},
}
}

func newMMTableWithBackoff() *schema.Table {
return &schema.Table{
Name: sqlparser.NewTableIdent("foo"),
Type: schema.Message,
MessageInfo: &schema.MessageInfo{
Fields: testFields,
AckWaitDuration: 10 * time.Second,
PurgeAfterDuration: 3 * time.Second,
MinBackoff: 1 * time.Second,
MaxBackoff: 4 * time.Second,
BatchSize: 1,
CacheSize: 10,
PollInterval: 1 * time.Second,
Expand Down Expand Up @@ -732,7 +750,7 @@ func TestMMGenerate(t *testing.T) {
}

query, bv = mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery = "update foo set time_next = :time_now+(:wait_time<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
wantQuery = "update foo set time_next = :time_now+(:min_backoff<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
Expand All @@ -743,8 +761,8 @@ func TestMMGenerate(t *testing.T) {
delete(bv, "time_now")
}
wantbv := map[string]*querypb.BindVariable{
"wait_time": sqltypes.Int64BindVariable(1e9),
"ids": wantids,
"min_backoff": sqltypes.Int64BindVariable(1e9),
"ids": wantids,
}
if !reflect.DeepEqual(bv, wantbv) {
t.Errorf("gotid: %v, want %v", bv, wantbv)
Expand All @@ -763,6 +781,34 @@ func TestMMGenerate(t *testing.T) {
}
}

func TestMMGenerateWithBackoff(t *testing.T) {
mm := newMessageManager(newFakeTabletServer(), newFakeVStreamer(), newMMTableWithBackoff(), sync2.NewSemaphore(1, 0))
mm.Open()
defer mm.Close()

wantids := sqltypes.TestBindVariable([]interface{}{"1", "2"})

query, bv := mm.GeneratePostponeQuery([]string{"1", "2"})
wantQuery := "update foo set time_next = :time_now+if(:min_backoff<<ifnull(epoch, 0) > :max_backoff, :max_backoff, :min_backoff<<ifnull(epoch, 0)), epoch = ifnull(epoch, 0)+1 where id in ::ids and time_acked is null"
if query != wantQuery {
t.Errorf("GeneratePostponeQuery query: %s, want %s", query, wantQuery)
}
if _, ok := bv["time_now"]; !ok {
t.Errorf("time_now is absent in %v", bv)
} else {
// time_now cannot be compared.
delete(bv, "time_now")
}
wantbv := map[string]*querypb.BindVariable{
"min_backoff": sqltypes.Int64BindVariable(1e9),
"max_backoff": sqltypes.Int64BindVariable(4e9),
"ids": wantids,
}
if !reflect.DeepEqual(bv, wantbv) {
t.Errorf("gotid: %v, want %v", bv, wantbv)
}
}

type fakeTabletServer struct {
postponeCount sync2.AtomicInt64
purgeCount sync2.AtomicInt64
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/schema/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ func initialSchema() map[string]*Table {
}},
AckWaitDuration: 30 * time.Second,
PurgeAfterDuration: 120 * time.Second,
MinBackoff: 30 * time.Second,
BatchSize: 1,
CacheSize: 10,
PollInterval: 30 * time.Second,
Expand Down
10 changes: 10 additions & 0 deletions go/vt/vttablet/tabletserver/schema/load_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ func loadMessageInfo(ta *Table, comment string) error {
if ta.MessageInfo.PollInterval, err = getDuration(keyvals, "vt_poller_interval"); err != nil {
return err
}

// errors are ignored because these fields are optional and 0 is the default value
ta.MessageInfo.MinBackoff, _ = getDuration(keyvals, "vt_min_backoff")
// the original default minimum backoff was based on ack wait timeout, so this preserves that
if ta.MessageInfo.MinBackoff == 0 {
ta.MessageInfo.MinBackoff = ta.MessageInfo.AckWaitDuration
}

ta.MessageInfo.MaxBackoff, _ = getDuration(keyvals, "vt_max_backoff")

for _, col := range requiredCols {
num := ta.FindColumn(sqlparser.NewColIdent(col))
if num == -1 {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vttablet/tabletserver/schema/load_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,20 @@ func TestLoadTableMessage(t *testing.T) {
}},
AckWaitDuration: 30 * time.Second,
PurgeAfterDuration: 120 * time.Second,
MinBackoff: 30 * time.Second,
BatchSize: 1,
CacheSize: 10,
PollInterval: 30 * time.Second,
},
}
assert.Equal(t, want, table)

// Test loading min/max backoff
table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db)
want.MessageInfo.MinBackoff = 10 * time.Second
want.MessageInfo.MaxBackoff = 100 * time.Second
assert.Equal(t, want, table)

// Missing property
_, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30", db)
wanterr := "not specified for message table"
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletserver/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ type MessageInfo struct {
// PollInterval specifies the polling frequency to
// look for messages to be sent.
PollInterval time.Duration

// MinBackoff specifies the shortest duration message manager
// should wait before rescheduling a message
MinBackoff time.Duration

// MaxBackoff specifies the longest duration message manager
// should wait before rescheduling a message
MaxBackoff time.Duration
}

// NewTable creates a new Table.
Expand Down

0 comments on commit bcb293a

Please sign in to comment.