Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica transactions #6244

Merged
merged 33 commits into from
Jun 5, 2020
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
99faad4
replica transactions: initial implementation
deepthi May 22, 2020
b97066b
replica transactions: do not allow if using discoverygateway
deepthi May 23, 2020
65a8a5c
replica transactions: fixed endtoend test
systay May 24, 2020
2e647fc
replica transactions: tabletHealthCheck no longer has a mu
deepthi May 25, 2020
3c14906
Cleaned up test
systay May 26, 2020
8438053
Renamed fake test methods to include legacy
systay May 26, 2020
dccd6b6
Made TabletGateWay use an interface for the HealthCheck
systay May 26, 2020
031527c
Clean up of test
systay May 26, 2020
0b7bc39
replica transaction: fixed tests
systay May 26, 2020
77a274f
replica transactions: protect tabletHealthCheck.Conn with mutex
deepthi May 27, 2020
abf30b7
Merge remote-tracking branch 'upstream/master' into ds-replica-transa…
systay May 27, 2020
72ae94d
replica transactions: commit and rollback should be executed on the c…
deepthi May 28, 2020
be11233
Make tx_engine use read only transactions when in replica mode
systay May 28, 2020
c5cffda
Test cleanup
systay May 28, 2020
ab38d4a
Update test assertions
systay May 28, 2020
c14319a
replica transactions: tx_conn.commitNormal should rollback successful…
deepthi May 28, 2020
015849a
replica transactions: allow with prepared statements, fix commit2PC
deepthi May 28, 2020
a8915b1
replica transactions: disallow DMLs on non-master tablets
deepthi May 29, 2020
7900711
replica transactions: test cleanup
systay May 29, 2020
f13b506
replica transactions: refactor check so it's done in one place
systay May 29, 2020
e149589
replica-transactions: updated tests to work with the new master checking
systay May 29, 2020
a79abb3
test framework: rename funcs, replica tablet type should be set
deepthi May 29, 2020
bb7ac34
replica transaction: test tablet down and up produces correct error a…
deepthi May 29, 2020
bc2d19b
replica transactions: move test up one level since it is no longer ju…
deepthi May 29, 2020
8caab43
add more assertions to commit order tests
deepthi May 29, 2020
935af38
replica transactions: prepared statement test
deepthi May 29, 2020
79e068d
Merge remote-tracking branch 'upstream/master' into ds-replica-transa…
systay May 31, 2020
c123179
fix vtexplain breakage due to replica transaction changes
deepthi Jun 1, 2020
3f85942
Merge remote-tracking branch 'upstream/master' into ds-replica-transa…
systay Jun 3, 2020
e7887c4
Address PR reviews
systay Jun 3, 2020
96d0d0f
replica transactions: add TabletAlias to the grpc Request/Response. g…
deepthi Jun 3, 2020
96aad32
replica transactions: commitNormal should attempt rollback even if co…
deepthi Jun 3, 2020
a7cb8e2
Merge remote-tracking branch 'upstream/master' into ds-replica-transa…
systay Jun 4, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/cmd/vtgate/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ func addStatusParts(vtg *vtgate.VTGate) {
servenv.AddStatusPart("Gateway Status", vtgate.StatusTemplate, func() interface{} {
return vtg.GetGatewayCacheStatus()
})
if *vtgate.GatewayImplementation == vtgate.GatewayImplementationDiscovery {
if vtgate.UsingLegacyGateway() {
servenv.AddStatusPart("Health Check Cache", discovery.LegacyHealthCheckTemplate, func() interface{} {
return legacyHealthCheck.CacheStatus()
})
} else {
servenv.AddStatusPart("Health Check Cache", discovery.HealthCheckTemplate, func() interface{} {
return vtg.Gateway().HealthCheck().CacheStatus()
return vtg.Gateway().TabletsCacheStatus()
})
}
}
5 changes: 5 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,11 @@ func (db *DB) QueryLog() string {
return strings.Join(db.querylog, ";")
}

//ResetQueryLog resets the query log
func (db *DB) ResetQueryLog() {
db.querylog = nil
}

// EnableConnFail makes connection to this fake DB fail.
func (db *DB) EnableConnFail() {
db.mu.Lock()
Expand Down
10 changes: 6 additions & 4 deletions go/test/endtoend/backup/transform/backup_transform_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ var (
"-serving_state_grace_period", "1s"}
)

// TestMainSetup sets up the basic test cluster
func TestMainSetup(m *testing.M, useMysqlctld bool) {
defer cluster.PanicHandler(nil)
flag.Parse()
Expand Down Expand Up @@ -100,8 +101,8 @@ func TestMainSetup(m *testing.M, useMysqlctld bool) {
var mysqlProcs []*exec.Cmd
for i := 0; i < 3; i++ {
tabletType := "replica"
tablet := localCluster.GetVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.GetVtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
tablet.VttabletProcess.ExtraArgs = commonTabletArg
tablet.VttabletProcess.SupportsBackup = true
Expand Down Expand Up @@ -182,6 +183,7 @@ var vtInsertTest = `create table vt_insert_test (
primary key (id)
) Engine=InnoDB`

// TestBackupTransformImpl tests backups with transform hooks
func TestBackupTransformImpl(t *testing.T) {
// insert data in master, validate same in slave
defer cluster.PanicHandler(t)
Expand Down Expand Up @@ -266,8 +268,8 @@ func TestBackupTransformImpl(t *testing.T) {

}

// TestBackupTransformErrorImpl validate backup with test_backup_error
// backup_storage_hook, which should fail.
// TestBackupTransformErrorImpl validates backup behavior with transform hook
// when the hook encounters an error
func TestBackupTransformErrorImpl(t *testing.T) {
// restart the replica with transform hook parameter
defer cluster.PanicHandler(t)
Expand Down
9 changes: 5 additions & 4 deletions go/test/endtoend/backup/vtbackup/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func TestMain(m *testing.M) {
extraArgs := []string{"-db-credentials-file", dbCredentialFile}
commonTabletArg = append(commonTabletArg, "-db-credentials-file", dbCredentialFile)

master = localCluster.GetVttabletInstance("replica", 0, "")
replica1 = localCluster.GetVttabletInstance("replica", 0, "")
replica2 = localCluster.GetVttabletInstance("replica", 0, "")
master = localCluster.NewVttabletInstance("replica", 0, "")
replica1 = localCluster.NewVttabletInstance("replica", 0, "")
replica2 = localCluster.NewVttabletInstance("replica", 0, "")
shard.Vttablets = []*cluster.Vttablet{master, replica1, replica2}

// Start MySql processes
var mysqlProcs []*exec.Cmd
for _, tablet := range shard.Vttablets {
tablet.VttabletProcess = localCluster.GetVtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
tablet.VttabletProcess.ExtraArgs = commonTabletArg
tablet.VttabletProcess.SupportsBackup = true
Expand All @@ -116,6 +116,7 @@ func TestMain(m *testing.M) {
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
return 1, err
} else {
// ignore golint warning, we need the else block to use proc
mysqlProcs = append(mysqlProcs, proc)
}
}
Expand Down
25 changes: 14 additions & 11 deletions go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,20 @@ import (
)

const (
ExtraBackup = iota
XtraBackup = iota
Backup
Mysqlctld
)

var (
master *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
cell = cluster.DefaultCell
master *cluster.Vttablet
replica1 *cluster.Vttablet
replica2 *cluster.Vttablet
localCluster *cluster.LocalProcessCluster
newInitDBFile string
useXtrabackup bool
cell = cluster.DefaultCell

hostname = "localhost"
keyspaceName = "ks"
dbPassword = "VtDbaPass"
Expand Down Expand Up @@ -114,7 +115,7 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
commonTabletArg = append(commonTabletArg, "-db-credentials-file", dbCredentialFile)

// Update arguments for xtrabackup
if setupType == ExtraBackup {
if setupType == XtraBackup {
useXtrabackup = true

xtrabackupArgs := []string{
Expand All @@ -139,8 +140,8 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
if i == 0 {
tabletType = "master"
}
tablet := localCluster.GetVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.GetVtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet := localCluster.NewVttabletInstance(tabletType, 0, cell)
tablet.VttabletProcess = localCluster.VtprocessInstanceFromVttablet(tablet, shard.Name, keyspaceName)
tablet.VttabletProcess.DbPassword = dbPassword
tablet.VttabletProcess.ExtraArgs = commonTabletArg
tablet.VttabletProcess.SupportsBackup = true
Expand Down Expand Up @@ -200,10 +201,12 @@ func LaunchCluster(setupType int, streamMode string, stripes int) (int, error) {
return 0, nil
}

// TearDownCluster shuts down all cluster processes
func TearDownCluster() {
localCluster.Teardown()
}

// TestBackup runs all the backup tests
func TestBackup(t *testing.T, setupType int, streamMode string, stripes int) {

testMethods := []struct {
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/backup/xtrabackup/xtrabackup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ import (

// TestXtraBackup - tests the backup using xtrabackup
func TestXtrabackup(t *testing.T) {
backup.TestBackup(t, backup.ExtraBackup, "tar", 0)
backup.TestBackup(t, backup.XtraBackup, "tar", 0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ import (

// TestXtrabackupStream - tests the backup using xtrabackup with xbstream mode
func TestXtrabackupStream(t *testing.T) {
backup.TestBackup(t, backup.ExtraBackup, "xbstream", 8)
backup.TestBackup(t, backup.XtraBackup, "xbstream", 8)
}
16 changes: 8 additions & 8 deletions go/test/endtoend/cellalias/cell_alias_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func TestMain(m *testing.M) {
return 1, err
}

shard1Master = localCluster.GetVttabletInstance("master", 0, "")
shard1Replica = localCluster.GetVttabletInstance("replica", 0, cell2)
shard1Rdonly = localCluster.GetVttabletInstance("rdonly", 0, cell2)
shard1Master = localCluster.NewVttabletInstance("master", 0, "")
shard1Replica = localCluster.NewVttabletInstance("replica", 0, cell2)
shard1Rdonly = localCluster.NewVttabletInstance("rdonly", 0, cell2)

shard2Master = localCluster.GetVttabletInstance("master", 0, "")
shard2Replica = localCluster.GetVttabletInstance("replica", 0, cell2)
shard2Rdonly = localCluster.GetVttabletInstance("rdonly", 0, cell2)
shard2Master = localCluster.NewVttabletInstance("master", 0, "")
shard2Replica = localCluster.NewVttabletInstance("replica", 0, cell2)
shard2Rdonly = localCluster.NewVttabletInstance("rdonly", 0, cell2)

var mysqlProcs []*exec.Cmd
for _, tablet := range []*cluster.Vttablet{shard1Master, shard1Replica, shard1Rdonly, shard2Master, shard2Replica, shard2Rdonly} {
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestAlias(t *testing.T) {
"region_east_coast")
require.Nil(t, err)

vtgateInstance := localCluster.GetVtgateInstance()
vtgateInstance := localCluster.NewVtgateInstance()
vtgateInstance.CellsToWatch = allCells
vtgateInstance.TabletTypesToWait = "MASTER,REPLICA"
err = vtgateInstance.Setup()
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestAddAliasWhileVtgateUp(t *testing.T) {
sharding.CheckSrvKeyspace(t, cell1, keyspaceName, "", 0, expectedPartitions, *localCluster)
sharding.CheckSrvKeyspace(t, cell2, keyspaceName, "", 0, expectedPartitions, *localCluster)

vtgateInstance := localCluster.GetVtgateInstance()
vtgateInstance := localCluster.NewVtgateInstance()
vtgateInstance.CellsToWatch = allCells
vtgateInstance.TabletTypesToWait = "MASTER,REPLICA,RDONLY"
err = vtgateInstance.Setup()
Expand Down
15 changes: 8 additions & 7 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func (cluster *LocalProcessCluster) StartKeyspace(keyspace Keyspace, shardNames
tabletUID := cluster.GetAndReserveTabletUID()
tablet := &Vttablet{
TabletUID: tabletUID,
Type: "replica",
HTTPPort: cluster.GetAndReservePort(),
GrpcPort: cluster.GetAndReservePort(),
MySQLPort: cluster.GetAndReservePort(),
Expand Down Expand Up @@ -407,15 +408,15 @@ func (cluster *LocalProcessCluster) LaunchCluster(keyspace *Keyspace, shards []S

// StartVtgate starts vtgate
func (cluster *LocalProcessCluster) StartVtgate() (err error) {
vtgateInstance := *cluster.GetVtgateInstance()
vtgateInstance := *cluster.NewVtgateInstance()
cluster.VtgateProcess = vtgateInstance
log.Infof("Starting vtgate on port %d", vtgateInstance.Port)
log.Infof("Vtgate started, connect to mysql using : mysql -h 127.0.0.1 -P %d", cluster.VtgateMySQLPort)
return cluster.VtgateProcess.Setup()
}

// GetVtgateInstance returns an instance of vtgateprocess
func (cluster *LocalProcessCluster) GetVtgateInstance() *VtgateProcess {
// NewVtgateInstance returns an instance of vtgateprocess
func (cluster *LocalProcessCluster) NewVtgateInstance() *VtgateProcess {
vtgateHTTPPort := cluster.GetAndReservePort()
vtgateGrpcPort := cluster.GetAndReservePort()
cluster.VtgateMySQLPort = cluster.GetAndReservePort()
Expand Down Expand Up @@ -647,8 +648,8 @@ func getVtStartPort() int {
return DefaultStartPort
}

// GetVttabletInstance creates a new vttablet object
func (cluster *LocalProcessCluster) GetVttabletInstance(tabletType string, UID int, cell string) *Vttablet {
// NewVttabletInstance creates a new vttablet object
func (cluster *LocalProcessCluster) NewVttabletInstance(tabletType string, UID int, cell string) *Vttablet {
if UID == 0 {
UID = cluster.GetAndReserveTabletUID()
}
Expand All @@ -666,8 +667,8 @@ func (cluster *LocalProcessCluster) GetVttabletInstance(tabletType string, UID i
}
}

// GetVtprocessInstanceFromVttablet creates a new vttablet object
func (cluster *LocalProcessCluster) GetVtprocessInstanceFromVttablet(tablet *Vttablet, shardName string, ksName string) *VttabletProcess {
// VtprocessInstanceFromVttablet creates a new vttablet object
func (cluster *LocalProcessCluster) VtprocessInstanceFromVttablet(tablet *Vttablet, shardName string, ksName string) *VttabletProcess {
return VttabletProcessInstance(tablet.HTTPPort,
tablet.GrpcPort,
tablet.TabletUID,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type VttabletProcess struct {
exit chan error
}

// Setup starts vtctld process with required arguements
// Setup starts vttablet process with required arguements
func (vttablet *VttabletProcess) Setup() (err error) {

vttablet.proc = exec.Command(
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/clustertest/vtgate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func TestVtgateProcess(t *testing.T) {
defer conn.Close()

exec(t, conn, "insert into customer(id, email) values(1,'email1')")

_ = exec(t, conn, "begin")
qr := exec(t, conn, "select id, email from customer")
if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("email1")]]`; got != want {
t.Errorf("select:\n%v want\n%v", got, want)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,14 @@ func initializeCluster(t *testing.T) (int, error) {
for i := 0; i < 2; i++ {
// instantiate vttablet object with reserved ports
tabletUID := clusterInstance.GetAndReserveTabletUID()
tablet := clusterInstance.GetVttabletInstance("replica", tabletUID, cell)
tablet := clusterInstance.NewVttabletInstance("replica", tabletUID, cell)

// Start Mysqlctl process
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory)
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
return 1, err
} else {
// ignore golint warning, we need the else block to use proc
mysqlProcesses = append(mysqlProcesses, proc)
}
// start vttablet process
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,14 @@ func clusterSetUp(t *testing.T) (int, error) {
}
for i := 0; i < 2; i++ {
// instantiate vttablet object with reserved ports
tablet := clusterInstance.GetVttabletInstance("replica", 0, cell)
tablet := clusterInstance.NewVttabletInstance("replica", 0, cell)

// Start Mysqlctl process
tablet.MysqlctlProcess = *cluster.MysqlCtlProcessInstance(tablet.TabletUID, tablet.MySQLPort, clusterInstance.TmpDirectory)
if proc, err := tablet.MysqlctlProcess.StartProcess(); err != nil {
return 1, err
} else {
// ignore golint warning, we need the else block to use proc
mysqlProcesses = append(mysqlProcesses, proc)
}
// start vttablet process
Expand Down
45 changes: 36 additions & 9 deletions go/test/endtoend/preparestmt/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,6 @@ func TestMain(m *testing.M) {
return 1, err
}

// add extra arguments
clusterInstance.VtGateExtraArgs = []string{
"-mysql_auth_server_impl", "static",
"-mysql_server_query_timeout", "1s",
"-mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic,
"-mysql_server_version", "8.0.16-7",
}

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
Expand All @@ -202,10 +194,23 @@ func TestMain(m *testing.M) {
return 1, err
}

vtgateInstance := clusterInstance.NewVtgateInstance()
// set the gateway and other params we want to use
vtgateInstance.GatewayImplementation = "tabletgateway"
vtgateInstance.MySQLAuthServerImpl = "static"
// add extra arguments
vtgateInstance.ExtraArgs = []string{
"-mysql_server_query_timeout", "1s",
"-mysql_auth_server_static_file", clusterInstance.TmpDirectory + "/" + mysqlAuthServerStatic,
"-mysql_server_version", "8.0.16-7",
}

// Start vtgate
if err := clusterInstance.StartVtgate(); err != nil {
if err := vtgateInstance.Setup(); err != nil {
return 1, err
}
// ensure it is torn down during cluster TearDown
clusterInstance.VtgateProcess = *vtgateInstance

dbInfo.Host = clusterInstance.Hostname
dbInfo.Port = uint(clusterInstance.VtgateMySQLPort)
Expand Down Expand Up @@ -296,3 +301,25 @@ func selectWhere(t *testing.T, dbo *sql.DB, where string, params ...interface{})
}
return out
}

// selectWhereWithTx select the row corresponding to the where condition.
func selectWhereWithTx(t *testing.T, tx *sql.Tx, where string, params ...interface{}) []tableData {
var out []tableData
// prepare query
qry := "SELECT msg, data, text_col, t_datetime, t_datetime_micros FROM " + tableName
if where != "" {
qry += " WHERE (" + where + ")"
}

// execute query
r, err := tx.Query(qry, params...)
require.Nil(t, err)

// prepare result
for r.Next() {
var t tableData
r.Scan(&t.Msg, &t.Data, &t.TextCol, &t.DateTime, &t.DateTimeMicros)
out = append(out, t)
}
return out
}
Loading