Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Messaging: Avoid deadlocks related to 0 receiver behavior #10132

Merged
merged 16 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 34 additions & 18 deletions go/test/endtoend/messaging/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,43 @@ var (
userKeyspace = "user"
lookupKeyspace = "lookup"
createShardedMessage = `create table sharded_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked)
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
createUnshardedMessage = `create table unsharded_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked)
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
userVschema = `{
"sharded": true,
"vindexes": {
Expand Down
120 changes: 82 additions & 38 deletions go/test/endtoend/messaging/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,30 @@ import (
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
)

var createMessage = `create table vitess_message(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
message varchar(128),
var testMessage = "{\"message\":\"hello world\"}"
var testShardedMessagef = "{\"message\": \"hello world\", \"id\": %d}"

var createMessage = `
create table vitess_message(
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked))
comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'
`

func TestMessage(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -84,9 +97,12 @@ func TestMessage(t *testing.T) {
wantFields := []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "tenant_id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.VarChar,
Type: sqltypes.TypeJSON,
}}
gotFields, err := streamConn.Fields()
for i, field := range gotFields {
Expand All @@ -99,7 +115,7 @@ func TestMessage(t *testing.T) {
require.NoError(t, err)
cmp.MustMatch(t, wantFields, gotFields)

utils.Exec(t, conn, "insert into vitess_message(id, message) values(1, 'hello world')")
utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message(id, tenant_id, message) values(1, 1, '%s')", testMessage))

// account for jitter in timings, maxJitter uses the current hardcoded value for jitter in message_manager.go
jitter := int64(0)
Expand All @@ -112,7 +128,8 @@ func TestMessage(t *testing.T) {

want := []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewVarChar("hello world"),
sqltypes.NewInt64(1),
sqltypes.TestValue(sqltypes.TypeJSON, testMessage),
}
cmp.MustMatch(t, want, got)

Expand Down Expand Up @@ -163,18 +180,31 @@ func TestMessage(t *testing.T) {
assert.Equal(t, 0, len(qr.Rows))
}

var createThreeColMessage = `create table vitess_message3(
id bigint,
priority bigint default 0,
time_next bigint default 0,
epoch bigint,
time_acked bigint,
var createThreeColMessage = `
create table vitess_message3(
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# custom to this test
msg1 varchar(128),
msg2 bigint,

# required indexes
primary key(id),
index next_idx(priority, time_next desc),
index ack_idx(time_acked))
comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'`
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_ack_wait=1,vt_purge_after=3,vt_batch_size=2,vt_cache_size=10,vt_poller_interval=1'
`

func TestThreeColMessage(t *testing.T) {
ctx := context.Background()
Expand Down Expand Up @@ -202,6 +232,12 @@ func TestThreeColMessage(t *testing.T) {
wantFields := []*querypb.Field{{
Name: "id",
Type: sqltypes.Int64,
}, {
Name: "tenant_id",
Type: sqltypes.Int64,
}, {
Name: "message",
Type: sqltypes.TypeJSON,
}, {
Name: "msg1",
Type: sqltypes.VarChar,
Expand All @@ -220,12 +256,14 @@ func TestThreeColMessage(t *testing.T) {
require.NoError(t, err)
cmp.MustMatch(t, wantFields, gotFields)

utils.Exec(t, conn, "insert into vitess_message3(id, msg1, msg2) values(1, 'hello world', 3)")
utils.Exec(t, conn, fmt.Sprintf("insert into vitess_message3(id, tenant_id, message, msg1, msg2) values(1, 3, '%s', 'hello world', 3)", testMessage))

got, err := streamConn.FetchNext(nil)
require.NoError(t, err)
want := []sqltypes.Value{
sqltypes.NewInt64(1),
sqltypes.NewInt64(3),
sqltypes.TestValue(sqltypes.TypeJSON, testMessage),
sqltypes.NewVarChar("hello world"),
sqltypes.NewInt64(3),
}
Expand Down Expand Up @@ -292,7 +330,8 @@ func TestReparenting(t *testing.T) {
assertClientCount(t, 1, shard0Replica)
assertClientCount(t, 1, shard1Primary)
session := stream.Session("@primary", nil)
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (3,'hello world 3')")
msg3 := fmt.Sprintf(testShardedMessagef, 3)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (3,3,'%s')", msg3))

// validate that we have received inserted message
stream.Next()
Expand Down Expand Up @@ -353,8 +392,10 @@ func TestConnection(t *testing.T) {
// in message stream
session := stream.Session("@primary", nil)
// insert data in primary
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (2,'hello world 2')")
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into sharded_message (id, message) values (5,'hello world 5')")
msg2 := fmt.Sprintf(testShardedMessagef, 2)
msg5 := fmt.Sprintf(testShardedMessagef, 5)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (2,2,'%s')", msg2))
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into sharded_message (id, tenant_id, message) values (5,5,'%s')", msg5))
// validate in msg stream
_, err = stream.Next()
require.Nil(t, err)
Expand All @@ -380,15 +421,18 @@ func testMessaging(t *testing.T, name, ks string) {
defer stream.Close()

session := stream.Session("@primary", nil)
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (4,'hello world 4')")
cluster.ExecuteQueriesUsingVtgate(t, session, "insert into "+name+" (id, message) values (1,'hello world 1')")
msg4 := fmt.Sprintf(testShardedMessagef, 4)
msg1 := fmt.Sprintf(testShardedMessagef, 1)
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (4,4,'%s')", msg4))
cluster.ExecuteQueriesUsingVtgate(t, session, fmt.Sprintf("insert into "+name+" (id, tenant_id, message) values (1,1,'%s')", msg1))

// validate fields
res, err := stream.MessageStream(ks, "", nil, name)
require.Nil(t, err)
require.Equal(t, 2, len(res.Fields))
require.Equal(t, 3, len(res.Fields))
validateField(t, res.Fields[0], "id", query.Type_INT64)
validateField(t, res.Fields[1], "message", query.Type_VARCHAR)
validateField(t, res.Fields[1], "tenant_id", query.Type_INT64)
validateField(t, res.Fields[2], "message", query.Type_JSON)

// validate recieved msgs
resMap := make(map[string]string)
Expand All @@ -406,8 +450,8 @@ func testMessaging(t *testing.T, name, ks string) {
}
}

assert.Equal(t, "hello world 1", resMap["1"])
assert.Equal(t, "hello world 4", resMap["4"])
assert.Equal(t, "1", resMap["1"])
assert.Equal(t, "4", resMap["4"])

resMap = make(map[string]string)
stream.ClearMem()
Expand All @@ -422,7 +466,7 @@ func testMessaging(t *testing.T, name, ks string) {
}
}

assert.Equal(t, "hello world 1", resMap["1"])
assert.Equal(t, "1", resMap["1"])

// validate message ack with 1 and 4, only 1 should be ack
qr, err = session.Execute(context.Background(), "update "+name+" set time_acked = 1, time_next = null where id in (1, 4) and time_acked is null", nil)
Expand Down Expand Up @@ -519,18 +563,18 @@ func assertClientCount(t *testing.T, expected int, vttablet *cluster.Vttablet) {
}

func parseDebugVars(t *testing.T, output interface{}, vttablet *cluster.Vttablet) {
debugVarUrl := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort)
resp, err := http.Get(debugVarUrl)
debugVarURL := fmt.Sprintf("http://%s:%d/debug/vars", vttablet.VttabletProcess.TabletHostname, vttablet.HTTPPort)
resp, err := http.Get(debugVarURL)
if err != nil {
t.Fatalf("failed to fetch %q: %v", debugVarUrl, err)
t.Fatalf("failed to fetch %q: %v", debugVarURL, err)
}

respByte, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarUrl, respByte)
t.Fatalf("status code %d while fetching %q:\n%s", resp.StatusCode, debugVarURL, respByte)
}

if err := json.Unmarshal(respByte, output); err != nil {
t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarUrl, err)
t.Fatalf("failed to unmarshal JSON from %q: %v", debugVarURL, err)
}
}
37 changes: 22 additions & 15 deletions go/test/endtoend/vtgate/godriver/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ limitations under the License.
package godriver

import (
"database/sql"
"flag"
"fmt"
"os"
"strconv"
"testing"
Expand All @@ -42,18 +42,24 @@ var (
KeyspaceName = "customer"
SchemaSQL = `
create table my_message(
time_scheduled bigint,
id bigint,
time_next bigint,
epoch bigint,
time_created bigint,
time_acked bigint,
message varchar(128),
priority tinyint NOT NULL DEFAULT '0',
primary key(time_scheduled, id),
unique index id_idx(id),
index next_idx(priority, time_next)
) comment 'vitess_message,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30';
# required columns
id bigint NOT NULL COMMENT 'often an event id, can also be auto-increment or a sequence',
priority tinyint NOT NULL DEFAULT '50' COMMENT 'lower number priorities process first',
epoch bigint NOT NULL DEFAULT '0' COMMENT 'Vitess increments this each time it sends a message, and is used for incremental backoff doubling',
time_next bigint DEFAULT 0 COMMENT 'the earliest time the message will be sent in epoch nanoseconds. Must be null if time_acked is set',
time_acked bigint DEFAULT NULL COMMENT 'the time the message was acked in epoch nanoseconds. Must be null if time_next is set',

# add as many custom fields here as required
# optional - these are suggestions
tenant_id bigint,
message json,

# required indexes
primary key(id),
index poller_idx(time_acked, priority, time_next desc)

# add any secondary indexes or foreign keys - no restrictions
) comment 'vitess_message,vt_min_backoff=30,vt_max_backoff=3600,vt_ack_wait=30,vt_purge_after=86400,vt_batch_size=10,vt_cache_size=10000,vt_poller_interval=30'
`
VSchema = `
{
Expand All @@ -75,6 +81,8 @@ create table my_message(
}
}
`

testMessage = "{\"message\":\"hello world\"}"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -136,8 +144,7 @@ func TestStreamMessaging(t *testing.T) {
defer db.Close()

// Exec not allowed in streaming
timenow := time.Now().Add(time.Second * 60).UnixNano()
_, err = db.Exec("insert into my_message(id, message, time_scheduled) values(1, 'hello world', :curr_time)", sql.Named("curr_time", timenow))
_, err = db.Exec(fmt.Sprintf("insert into my_message(id, message) values(1, '%s')", testMessage))
require.NoError(t, err)

// for streaming messages
Expand Down
Loading