Skip to content

Commit

Permalink
Address PR reviews
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <andres@planetscale.com>
  • Loading branch information
systay committed Jun 3, 2020
1 parent 3f85942 commit e7887c4
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 45 deletions.
62 changes: 44 additions & 18 deletions go/test/endtoend/tabletgateway/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,20 @@ func verifyVtgateVariables(t *testing.T, url string) {
assert.True(t, isMasterTabletPresent(healthCheckConnection), "Atleast one master tablet needs to be present")
}

/*
-begin on replica should explicitly say read only
-tabletserver planner should stop dml (if easy and reasonable)
-vtgate planbuilder should not send dml to replicas
*/
func retryNTimes(t *testing.T, maxRetries int, f func() bool) {
i := 0
for {
res := f()
if res {
return
}
if i > maxRetries {
t.Fatalf("retried %d times and failed", maxRetries)
return
}
i++
}
}

func TestReplicaTransactions(t *testing.T) {
// TODO(deepthi): this test seems to depend on previous test. Fix tearDown so that tests are independent
Expand All @@ -89,39 +98,56 @@ func TestReplicaTransactions(t *testing.T) {
ctx := context.Background()
masterConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer masterConn.Close()

replicaConn, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer masterConn.Close()
defer replicaConn.Close()

replicaConn2, err := mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
defer replicaConn2.Close()

fetchAllCustomers := "select id, email from customer"
checkCustomerRows := func(expectedRows int) func() bool {
return func() bool {
result := exec(t, replicaConn2, fetchAllCustomers, "")
return len(result.Rows) == expectedRows
}
}

// point the replica connections to the replica target
exec(t, replicaConn, "use @replica", "")
exec(t, replicaConn2, "use @replica", "")

// insert a row using master
exec(t, masterConn, "insert into customer(id, email) values(1,'email1')", "")
time.Sleep(1 * time.Second) // we sleep for a bit to make sure that the replication catches up

// we'll run this query a number of times, and then give up if the row count never reaches this value
retryNTimes(t, 10 /*maxRetries*/, checkCustomerRows(1))

// after a short pause, SELECT the data inside a tx on a replica
exec(t, replicaConn, "use @replica", "")
// begin transaction on replica
exec(t, replicaConn, "begin", "")
qr := exec(t, replicaConn, "select id, email from customer", "")
qr := exec(t, replicaConn, fetchAllCustomers, "")
assert.Equal(t, `[[INT64(1) VARCHAR("email1")]]`, fmt.Sprintf("%v", qr.Rows), "select returned wrong result")

// insert more data on master using a transaction
exec(t, masterConn, "begin", "")
exec(t, masterConn, "insert into customer(id, email) values(2,'email2')", "")
exec(t, masterConn, "commit", "")
time.Sleep(1 * time.Second)

retryNTimes(t, 10 /*maxRetries*/, checkCustomerRows(2))

// replica doesn't see new row because it is in a transaction
qr2 := exec(t, replicaConn, "select id, email from customer", "")
qr2 := exec(t, replicaConn, fetchAllCustomers, "")
assert.Equal(t, qr.Rows, qr2.Rows)

// replica should see new row after closing the transaction
exec(t, replicaConn, "commit", "")

qr3 := exec(t, replicaConn, "select id, email from customer", "")
qr3 := exec(t, replicaConn, fetchAllCustomers, "")
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr3.Rows), "we are not seeing the updates after closing the replica transaction")
// since we can't do INSERT/DELETE/UPDATE, commit and rollback both just close the transaction
exec(t, replicaConn, "rollback", "")

// begin transaction on replica
exec(t, replicaConn, "begin", "")
Expand All @@ -144,25 +170,25 @@ func TestReplicaTransactions(t *testing.T) {

// start another transaction
exec(t, replicaConn, "begin", "")
exec(t, replicaConn, "select id, email from customer", "")
exec(t, replicaConn, fetchAllCustomers, "")
// bring down the tablet and try to select again
replicaTablet := clusterInstance.Keyspaces[0].Shards[0].Replica()
// this gives us a "signal: killed" error, ignore it
_ = replicaTablet.VttabletProcess.TearDown()
// Healthcheck interval on tablet is set to 1s, so sleep for 2s
time.Sleep(2 * time.Second)
exec(t, replicaConn, "select id, email from customer", "is either down or nonexistent")
exec(t, replicaConn, fetchAllCustomers, "is either down or nonexistent")

// bring up tablet again
// query using same transaction will fail
_ = replicaTablet.VttabletProcess.Setup()
exec(t, replicaConn, "select id, email from customer", "not found")
exec(t, replicaConn, fetchAllCustomers, "not found")
exec(t, replicaConn, "commit", "")
// create a new connection, should be able to query again
replicaConn, err = mysql.Connect(ctx, &vtParams)
require.NoError(t, err)
exec(t, replicaConn, "begin", "")
qr4 := exec(t, replicaConn, "select id, email from customer", "")
qr4 := exec(t, replicaConn, fetchAllCustomers, "")
assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart")
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/srvtopo/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type ResolvedShard struct {
// Target describes the target shard.
Target *querypb.Target

// QueryService is the actual way to execute the query.
// Gateway is the way to execute a query on this shard
Gateway Gateway
}

Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"sync"
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand Down Expand Up @@ -173,6 +174,10 @@ func (stc *ScatterConn) Execute(
default:
var qs queryservice.QueryService
_, usingLegacy := rs.Gateway.(*DiscoveryGateway)
if transactionID != 0 && usingLegacy && rs.Target.TabletType != topodatapb.TabletType_MASTER {
return 0, nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "replica transactions not supported using the legacy healthcheck")
}

if usingLegacy || transactionID == 0 {
qs = rs.Gateway
} else {
Expand Down
45 changes: 20 additions & 25 deletions go/vt/vtgate/scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"

"golang.org/x/net/context"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -250,16 +252,13 @@ func TestMaxMemoryRows(t *testing.T) {
res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa")
rss, _, err := res.ResolveDestinations(ctx, "TestMaxMemoryRows", topodatapb.TabletType_REPLICA, nil,
[]key.Destination{key.DestinationShard("0"), key.DestinationShard("1")})
if err != nil {
t.Fatalf("ResolveDestination(0) failed: %v", err)
}
require.NoError(t, err)

session := NewSafeSession(&vtgatepb.Session{InTransaction: true})

_, err = sc.Execute(ctx, "query1", nil, rss, session, true, nil, false)
want := "in-memory row count exceeded allowed limit of 3"
if err == nil || err.Error() != want {
t.Errorf("Execute(): %v, want %v", err, want)
}
assert.EqualError(t, err, want)

queries := []*querypb.BoundQuery{{
Sql: "query1",
Expand All @@ -269,10 +268,7 @@ func TestMaxMemoryRows(t *testing.T) {
BindVariables: map[string]*querypb.BindVariable{},
}}
_, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false, false)
err = errs[0]
if err == nil || err.Error() != want {
t.Errorf("Execute(): %v, want %v", err, want)
}
assert.EqualError(t, errs[0], want)
}

func TestMultiExecs(t *testing.T) {
Expand Down Expand Up @@ -499,19 +495,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
sbc0 = hc.AddTestTablet("aa", "0", 1, "TestScatterConnQueryNotInTransaction", "0", topodatapb.TabletType_REPLICA, true, 1, nil)
sbc1 = hc.AddTestTablet("aa", "1", 1, "TestScatterConnQueryNotInTransaction", "1", topodatapb.TabletType_REPLICA, true, 1, nil)
session = NewSafeSession(&vtgatepb.Session{InTransaction: true})
noTxSession := NewSafeSession(&vtgatepb.Session{InTransaction: false})

res = srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa")
rss0, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0"))
if err != nil {
t.Fatalf("ResolveDestination(0) failed: %v", err)
}
require.NoError(t, err)
rss1, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShards([]string{"0", "1"}))
if err != nil {
t.Fatalf("ResolveDestination(1) failed: %v", err)
}
require.NoError(t, err)

sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false)
sc.Execute(ctx, "query1", nil, rss1, session, true, nil, false)
_, err = sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false)
require.NoError(t, err)
_, err = sc.Execute(ctx, "query1", nil, rss1, noTxSession, true, nil, false)
require.NoError(t, err)

wantSession = vtgatepb.Session{
InTransaction: true,
Expand All @@ -528,13 +523,13 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
if !proto.Equal(&wantSession, session.Session) {
t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session)
}
sc.txConn.Commit(ctx, session)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
if execCount0 != 2 || execCount1 != 1 {
t.Errorf("want 2/1, got %d/%d", execCount0, execCount1)
}
err = sc.txConn.Commit(ctx, session)
require.NoError(t, err)

execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
if execCount0 != 2 || execCount1 != 1 {
t.Errorf("want 2/1, got %d/%d", execCount0, execCount1)
}
if commitCount := sbc0.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1089,7 +1089,7 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
}
}

allowOnShutdown := (transactionID != 0)
allowOnShutdown := transactionID != 0
// TODO(sougou): Convert startRequest/endRequest pattern to use wrapper
// function tsv.execRequest() instead.
// Note that below we always return "err" right away and do not call
Expand Down

0 comments on commit e7887c4

Please sign in to comment.