Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 9 additions & 13 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ func (this *Applier) ReadMigrationRangeValues() error {
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, expectedRowCount int64, err error) {
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
Expand All @@ -684,36 +684,32 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}

rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}
defer rows.Close()

iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len() + 1)
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}

expectedRowCount = (*iterationRangeMaxValues.ValuesPointers[len(iterationRangeMaxValues.ValuesPointers)-1].(*interface{})).(int64)
iterationRangeMaxValues = sql.ToColumnValues(iterationRangeMaxValues.AbstractValues()[:len(iterationRangeMaxValues.AbstractValues())-1])

hasFurtherRange = expectedRowCount > 0
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, expectedRowCount, err
return hasFurtherRange, err
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, expectedRowCount, nil
return hasFurtherRange, nil
}
}
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, expectedRowCount, nil
return hasFurtherRange, nil
}

// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
Expand Down
64 changes: 62 additions & 2 deletions go/logic/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,9 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

hasFurtherRange, expectedRangeSize, err := applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)
suite.Require().Equal(int64(1), expectedRangeSize)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Require().NoError(err)
Expand Down Expand Up @@ -597,6 +596,67 @@ func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQuerySuc
Equal(int64(0), migrationContext.RowsDeltaEstimate)
}

func (suite *ApplierTestSuite) TestPanicOnWarningsInApplyIterationInsertQueryFailsWithTruncationWarning() {
ctx := context.Background()

var err error

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test.testing (id int not null, name varchar(20), primary key(id))")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "CREATE TABLE test._testing_gho (id INT, name varchar(20), primary key(id));")
suite.Require().NoError(err)

_, err = suite.db.ExecContext(ctx, "INSERT INTO test.testing (id, name) VALUES (1, 'this string is long')")
suite.Require().NoError(err)

connectionConfig, err := GetConnectionConfig(ctx, suite.mysqlContainer)
suite.Require().NoError(err)

migrationContext := base.NewMigrationContext()
migrationContext.ApplierConnectionConfig = connectionConfig
migrationContext.DatabaseName = "test"
migrationContext.SkipPortValidation = true
migrationContext.OriginalTableName = "testing"
migrationContext.AlterStatementOptions = "modify column name varchar(10)"
migrationContext.PanicOnWarnings = true
migrationContext.SetConnectionConfig("innodb")

migrationContext.OriginalTableColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.SharedColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.MappedSharedColumns = sql.NewColumnList([]string{"id", "name"})
migrationContext.UniqueKey = &sql.UniqueKey{
Name: "PRIMARY",
NameInGhostTable: "PRIMARY",
Columns: *sql.NewColumnList([]string{"id"}),
}
applier := NewApplier(migrationContext)

err = applier.InitDBConnections()
suite.Require().NoError(err)

err = applier.CreateChangelogTable()
suite.Require().NoError(err)

err = applier.ReadMigrationRangeValues()
suite.Require().NoError(err)

err = applier.AlterGhost()
suite.Require().NoError(err)

hasFurtherRange, err := applier.CalculateNextIterationRangeEndValues()
suite.Require().NoError(err)
suite.Require().True(hasFurtherRange)

_, rowsAffected, _, err := applier.ApplyIterationInsertQuery()
suite.Equal(int64(1), rowsAffected)
suite.Require().NoError(err)

// Verify the warning was recorded and will cause the migrator to panic
suite.Require().NotEmpty(applier.migrationContext.MigrationLastInsertSQLWarnings)
suite.Require().Contains(applier.migrationContext.MigrationLastInsertSQLWarnings[0], "Warning: Data truncated for column 'name' at row 1")
}

func TestApplier(t *testing.T) {
suite.Run(t, new(ApplierTestSuite))
}
9 changes: 3 additions & 6 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1241,9 +1241,8 @@ func (this *Migrator) iterateChunks() error {
// When hasFurtherRange is false, original table might be write locked and CalculateNextIterationRangeEndValues would hangs forever

hasFurtherRange := false
expectedRangeSize := int64(0)
if err := this.retryOperation(func() (e error) {
hasFurtherRange, expectedRangeSize, e = this.applier.CalculateNextIterationRangeEndValues()
hasFurtherRange, e = this.applier.CalculateNextIterationRangeEndValues()
return e
}); err != nil {
return terminateRowIteration(err)
Expand Down Expand Up @@ -1275,10 +1274,8 @@ func (this *Migrator) iterateChunks() error {
for _, warning := range this.migrationContext.MigrationLastInsertSQLWarnings {
this.migrationContext.Log.Infof("ApplyIterationInsertQuery has SQL warnings! %s", warning)
}
if expectedRangeSize != rowsAffected {
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
}
joinedWarnings := strings.Join(this.migrationContext.MigrationLastInsertSQLWarnings, "; ")
terminateRowIteration(fmt.Errorf("ApplyIterationInsertQuery failed because of SQL warnings: [%s]", joinedWarnings))
}
}

Expand Down
63 changes: 12 additions & 51 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,46 +283,25 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
}
}
joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk)
from (
select
%s
from
%s.%s
where
%s and %s
limit
%d
) select_osc_chunk
%s
from
%s.%s
where
%s and %s
order by
%s
limit 1
offset %d`,
databaseName, tableName, hint,
joinedColumnNames, joinedColumnNames,
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
joinedColumnNames,
strings.Join(uniqueKeyColumnNames, ", "),
databaseName, tableName,
rangeStartComparison, rangeEndComparison, chunkSize,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
)
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
return result, explodedArgs, nil
}

func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
Expand Down Expand Up @@ -360,22 +339,8 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}

joinedColumnNames := strings.Join(uniqueKeyColumnNames, ", ")
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s,
(select count(*) from (
select
%s
from
%s.%s
where
%s and %s
order by
%s
limit %d
) select_osc_chunk)
select /* gh-ost %s.%s %s */ %s
from (
select
%s
Expand All @@ -390,17 +355,13 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
order by
%s
limit 1`,
databaseName, tableName, hint, joinedColumnNames,
joinedColumnNames, databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
joinedColumnNames, databaseName, tableName,
databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),
)
// 2x the explodedArgs for the subquery (CTE would be possible but not supported by MySQL 5)
return result, append(explodedArgs, explodedArgs...), nil
return result, explodedArgs, nil
}

func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKey *UniqueKey) (string, error) {
Expand Down
43 changes: 30 additions & 13 deletions go/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,34 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
}
}

func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
func TestBuildUniqueKeyRangeEndPreparedQueryViaOffset(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
var chunkSize int64 = 500
{
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}

query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
require.NoError(t, err)
expected := `
select /* gh-ost mydb.tbl test */
name, position
from
mydb.tbl
where
((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc
limit 1
offset 499`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
}
}

func TestBuildUniqueKeyRangeEndPreparedQueryViaTemptable(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
var chunkSize int64 = 500
Expand All @@ -338,17 +365,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
require.NoError(t, err)
expected := `
select /* gh-ost mydb.tbl test */
name, position,
(select count(*) from (
select
name, position
from
mydb.tbl
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc
limit 500
) select_osc_chunk)
name, position
from (
select
name, position
Expand All @@ -363,7 +380,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
name desc, position desc
limit 1`
require.Equal(t, normalizeQuery(expected), normalizeQuery(query))
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117, 3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
require.Equal(t, []interface{}{3, 3, 17, 103, 103, 117, 103, 117}, explodedArgs)
}
}

Expand Down