Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM/mariadb: sync the gtid executed in slave during master-slave replication #10753

Merged
merged 38 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
411a0f9
add upstream_switch_mariadb
okJiang Feb 7, 2024
f2f9475
save
okJiang Mar 6, 2024
9c77ce0
Merge branch 'master' of github.com:pingcap/tiflow into test-mariadb
okJiang Mar 6, 2024
76e83e2
Add upstream-database-switch-mariadb job to workflows***
okJiang Mar 6, 2024
228a68f
Update dm_upstream_switch.yaml workflow dispatch inputs
okJiang Mar 6, 2024
9a3217c
Commented out upstream-database-switch workflow
okJiang Mar 6, 2024
ed1618b
Commented out code for checking out code by workflow dispatch
okJiang Mar 6, 2024
58d3d2a
Fix upstream-database-switch-mariadb job indentation
okJiang Mar 6, 2024
8c03de1
Update MySQL Docker-compose configurations to enable GTID strict mode
okJiang Mar 6, 2024
18cce21
Add push trigger for test branches in Upstream Database Switch workflow
okJiang Mar 6, 2024
94cbd9a
Update TEST_NAME and add DM_MASTER_EXTRA_ARG
okJiang Mar 6, 2024
a5bd4e8
Update MySQL configuration for upstream switch
okJiang Mar 6, 2024
0289253
test
okJiang Mar 7, 2024
6279d4e
Refactor GitHub workflow for upstream database switch
okJiang Mar 7, 2024
8c1c75f
test
okJiang Mar 7, 2024
100b6e8
test
okJiang Mar 7, 2024
579d3c7
test 10.1
okJiang Mar 7, 2024
0977cd3
test no gtid-strict-mode
okJiang Mar 7, 2024
1a3f661
test 10.2
okJiang Mar 7, 2024
ba30788
test 10.2 gtid-strict-mode=ON
okJiang Mar 7, 2024
448c3e3
test 10.1.9 with gtid-strict-mode=off
okJiang Mar 8, 2024
469effb
Update MySQL version in docker-compose.yml
okJiang Mar 8, 2024
aa425fa
add mariadb_master_down_and_up
okJiang Mar 8, 2024
7bc1ffb
add test_master_down_and_up case
okJiang Mar 11, 2024
c9916e0
update go-mysql&&fix update
okJiang Mar 11, 2024
d7e5610
revert dm_upstream_switch
okJiang Mar 11, 2024
325fe65
Fix installation of sync_diff in mariadb_master_down_and_up test case
okJiang Mar 11, 2024
2344aad
fix ci
okJiang Mar 11, 2024
db9b07a
fix ci
okJiang Mar 11, 2024
5630ff8
Update dm/tests/mariadb_master_down_and_up/docker-compose.yml
okJiang Mar 11, 2024
e03fa9d
fix lint
okJiang Mar 12, 2024
805fcba
Merge branch 'test-mariadb-gtid-switch' of github.com:okJiang/tiflow …
okJiang Mar 12, 2024
4c9cda5
update go mod
okJiang Mar 14, 2024
b277c74
Merge branch 'master' of github.com:pingcap/tiflow into test-mariadb-…
okJiang Mar 14, 2024
13bce4c
add relay test case
okJiang Mar 15, 2024
b0cd482
remove comment
okJiang Mar 15, 2024
8b7b6ad
go mod tidy
okJiang Mar 15, 2024
3b094a0
fix comment
okJiang Mar 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions .github/workflows/dm_mariadb_master_down_and_up.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
name: Mariadb Master Down and Up

on:
push:
branches:
- test-*
schedule:
- cron: '0 17-23 * * *' # run at minute 0 every hour from 01:00 ~ 07:00 UTC+8
workflow_dispatch:

jobs:
mariadb-master-down-and-up:
name: mariadb-master-down-and-up
runs-on: ubuntu-20.04

steps:
- name: Set up Go env
uses: actions/setup-go@v3
with:
go-version: '1.21'

- name: Check out code
uses: actions/checkout@v2

- name: Cache go modules
uses: actions/cache@v2
with:
path: ~/go/pkg/mod
key: ${{ runner.os }}-ticdc-${{ hashFiles('go.sum') }}

- name: Cache Tools
id: cache-tools
uses: actions/cache@v2
with:
path: tools/bin
key: ${{ runner.os }}-ticdc-tools-${{ hashFiles('tools/check/go.sum') }}

- name: Build DM binary
run: make dm_integration_test_build

- name: Setup containers
run: |
docker-compose -f ./dm/tests/mariadb_master_down_and_up/docker-compose.yml up -d

- name: Run test cases
run: |
bash ./dm/tests/mariadb_master_down_and_up/case.sh

- name: Copy logs to hack permission
if: ${{ always() }}
run: |
mkdir ./logs
sudo cp -r -L /tmp/dm_test/mariadb_master_down_and_up/master/log ./logs/master
sudo cp -r -L /tmp/dm_test/mariadb_master_down_and_up/worker1/log ./logs/worker1
sudo chown -R runner ./logs

# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
continue-on-error: true
uses: actions/upload-artifact@v2
if: ${{ always() }}
with:
name: upstream-switch-logs
path: |
./logs
18 changes: 13 additions & 5 deletions dm/pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,11 @@ func GTIDIncrease(flavor string, gSet gmysql.GTIDSet) (gmysql.GTIDSet, error) {
mariaGTID := singleGTID.(*gmysql.MariadbGTID)
mariaGTID.SequenceNumber++
gtidSet := new(gmysql.MariadbGTIDSet)
gtidSet.Sets = map[uint32]*gmysql.MariadbGTID{mariaGTID.DomainID: mariaGTID}
gtidSet.Sets = map[uint32]map[uint32]*gmysql.MariadbGTID{
mariaGTID.DomainID: {
mariaGTID.ServerID: mariaGTID,
},
}
clone = gtidSet
default:
err = terror.ErrBinlogGTIDSetNotValid.Generate(gSet, flavor)
Expand Down Expand Up @@ -203,11 +207,15 @@ func verifySingleGTID(flavor string, gSet gmysql.GTIDSet) (interface{}, error) {
if !ok {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(gSet)
}
if len(mariaGTIDs.Sets) != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(len(mariaGTIDs.Sets), gSet)
}
gtidCount := 0
var mariaGTID *gmysql.MariadbGTID
for _, mariaGTID = range mariaGTIDs.Sets {
for _, set := range mariaGTIDs.Sets {
gtidCount += len(set)
for _, mariaGTID = range set {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check len is 1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can check len(set) before the for loop

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can't get the right gtid numbers in the error message

}
}
if gtidCount != 1 {
return nil, terror.ErrBinlogOnlyOneGTIDSupport.Generate(gtidCount, gSet)
}
return mariaGTID, nil
default:
Expand Down
2 changes: 1 addition & 1 deletion dm/pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestGenCommonFileHeader(t *testing.T) {

// MariaDB
flavor = gmysql.MariaDBFlavor
gSetStr = "1-2-12,2-2-3,3-3-8,4-4-4"
gSetStr = "1-2-12,2-2-3,3-3-8,3-4-9"

gSet, err = gtid.ParserGTID(flavor, gSetStr)
require.Nil(t, err)
Expand Down
37 changes: 21 additions & 16 deletions dm/pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,27 +730,32 @@
payload := new(bytes.Buffer)

// Number of GTIDs, 4 bytes
numOfGTIDs := uint32(len(mariaDBGSet.Sets))
numOfGTIDs := uint32(0)
for _, set := range mariaDBGSet.Sets {
numOfGTIDs += uint32(len(set))
}
err := binary.Write(payload, binary.LittleEndian, numOfGTIDs)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Number of GTIDs %d", numOfGTIDs)
}

for _, mGTID := range mariaDBGSet.Sets {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
for _, set := range mariaDBGSet.Sets {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
for _, mGTID := range set {
// Replication Domain ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.DomainID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Replication Domain ID %d", mGTID.DomainID)
}

Check warning on line 748 in dm/pkg/binlog/event/event.go

View check run for this annotation

Codecov / codecov/patch

dm/pkg/binlog/event/event.go#L747-L748

Added lines #L747 - L748 were not covered by tests
// Server_ID, 4 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.ServerID)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write Server_ID %d", mGTID.ServerID)
}

Check warning on line 753 in dm/pkg/binlog/event/event.go

View check run for this annotation

Codecov / codecov/patch

dm/pkg/binlog/event/event.go#L752-L753

Added lines #L752 - L753 were not covered by tests
// GTID sequence, 8 bytes
err = binary.Write(payload, binary.LittleEndian, mGTID.SequenceNumber)
if err != nil {
return nil, terror.ErrBinlogWriteBinaryData.AnnotateDelegate(err, "write GTID sequence %d", mGTID.SequenceNumber)
}

Check warning on line 758 in dm/pkg/binlog/event/event.go

View check run for this annotation

Codecov / codecov/patch

dm/pkg/binlog/event/event.go#L757-L758

Added lines #L757 - L758 were not covered by tests
}
}

Expand Down
9 changes: 5 additions & 4 deletions dm/pkg/binlog/event/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,6 @@ func TestGenRowsEvent(t *testing.T) {
require.Equal(t, tableID, rowsEvBody.TableID)
require.Equal(t, uint64(len(rows[0])), rowsEvBody.ColumnCount)
require.Equal(t, 0, rowsEvBody.Version) // WRITE_ROWS_EVENTv0
require.Nil(t, rowsEvBody.ExtraData)
require.Equal(t, rows, rowsEvBody.Rows)

// multi rows, with different length, invalid
Expand Down Expand Up @@ -664,10 +663,10 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.True(t, ok)
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 1)
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID], gtidListEvBody.GTIDs[0])
require.Equal(t, *mGSet.Sets[gtidListEvBody.GTIDs[0].DomainID][gtidListEvBody.GTIDs[0].ServerID], gtidListEvBody.GTIDs[0])

// valid gSet with multi GTIDs
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,4-4-4")
gSet, err = gtid.ParserGTID(gmysql.MariaDBFlavor, "1-2-12,2-2-3,3-3-8,3-4-4")
require.Nil(t, err)
require.NotNil(t, gSet)
mGSet, ok = gSet.(*gmysql.MariadbGTIDSet)
Expand All @@ -684,7 +683,9 @@ func TestGenMariaDBGTIDListEvent(t *testing.T) {
require.NotNil(t, gtidListEvBody)
require.Len(t, gtidListEvBody.GTIDs, 4)
for _, mGTID := range gtidListEvBody.GTIDs {
mGTID2, ok := mGSet.Sets[mGTID.DomainID]
set, ok := mGSet.Sets[mGTID.DomainID]
require.True(t, ok)
mGTID2, ok := set[mGTID.ServerID]
require.True(t, ok)
require.Equal(t, *mGTID2, mGTID)
}
Expand Down
6 changes: 5 additions & 1 deletion dm/pkg/binlog/event/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ func newGenerator(flavor, version string, serverID uint32, latestPos uint32, lat
if !ok || prevGSet == nil {
return nil, terror.ErrBinlogGTIDMariaDBNotValid.Generate(previousGTIDs)
}
prevGTID, ok := prevGSet.Sets[mariaGTID.DomainID]
set, ok := prevGSet.Sets[mariaGTID.DomainID]
if !ok {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
prevGTID, ok := set[mariaGTID.ServerID]
if !ok || prevGTID.ServerID != mariaGTID.ServerID || prevGTID.SequenceNumber != mariaGTID.SequenceNumber {
return nil, terror.ErrBinlogLatestGTIDNotInPrev.Generate(latestGTID, previousGTIDs)
}
Expand Down
16 changes: 1 addition & 15 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,21 +548,7 @@ func (r *BinlogReader) parseFile(
Pos: uint32(ev.Position),
}
r.tctx.L().Info("rotate binlog", zap.Stringer("position", currentPos))
case *replication.GTIDEvent:
if r.prevGset == nil {
state.latestPos = int64(e.Header.LogPos)
break
}
gtidStr, err2 := event.GetGTIDStr(e)
if err2 != nil {
return errors.Trace(err2)
}
state.skipGTID, err = r.advanceCurrentGtidSet(gtidStr)
if err != nil {
return errors.Trace(err)
}
state.latestPos = int64(e.Header.LogPos)
case *replication.MariadbGTIDEvent:
case *replication.GTIDEvent, *replication.MariadbGTIDEvent:
if r.prevGset == nil {
state.latestPos = int64(e.Header.LogPos)
break
Expand Down
150 changes: 150 additions & 0 deletions dm/tests/mariadb_master_down_and_up/case.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
#!/bin/bash

set -exu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
PATH=$CUR/../_utils:$PATH # for sync_diff_inspector

source $CUR/lib.sh

function clean_data() {
echo "-------clean_data--------"

exec_sql $slave_port "stop slave;"
exec_sql $slave_port "reset master;"

exec_sql $master_port "drop database if exists db1;"
exec_sql $master_port "drop database if exists db2;"
exec_sql $master_port "drop database if exists ${db};"
exec_sql $slave_port "drop database if exists db1;"
exec_sql $slave_port "drop database if exists db2;"
exec_sql $slave_port "drop database if exists ${db};"
exec_sql $slave_port "reset master;"
exec_tidb $tidb_port "drop database if exists db1;"
exec_tidb $tidb_port "drop database if exists db2;"
exec_tidb $tidb_port "drop database if exists ${db};"
rm -rf /tmp/dm_test
}

function cleanup_process() {
echo "-------cleanup_process--------"
pkill -hup dm-worker.test 2>/dev/null || true
pkill -hup dm-master.test 2>/dev/null || true
pkill -hup dm-syncer.test 2>/dev/null || true
}

function setup_replica() {
echo "-------setup_replica--------"

master_status=($(get_master_status))
master_gtid=$(exec_sql $master_port "select binlog_gtid_pos('${master_status[0]}', ${master_status[1]})" | awk 'NR==2')
exec_sql $slave_port "set global gtid_slave_pos = '$master_gtid';"

# master --> slave
change_master_to_gtid $slave_port $master_port
}

function run_dm_components_and_create_source() {
echo "-------run_dm_components--------"

pkill -9 dm-master || true
pkill -9 dm-worker || true

run_dm_master $WORK_DIR/master $MASTER_PORT $CUR/conf/dm-master.toml
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"alive" 1

run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $CUR/conf/dm-worker1.toml
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"free" 1
if [ "$1" = "relay" ]; then
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $CUR/conf/source1_relay.yaml" \
"\"result\": true" 2
else
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $CUR/conf/source1.yaml" \
"\"result\": true" 2
fi

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"list-member" \
"alive" 1 \
"bound" 1
}

function gen_full_data() {
echo "-------gen_full_data--------"

exec_sql $master_port "create database ${db} collate latin1_bin;"
exec_sql $master_port "create table ${db}.${tb}(id int primary key, a int);"
for i in $(seq 1 100); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done
}

function start_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $CUR/conf/task-pessimistic.yaml --remove-meta" \
"\"result\": true" 2
}

function verify_result() {
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml
}

function clean_task() {
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task task_pessimistic" \
"\"result\": true" 2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source stop mysql-replica-01" \
"\"result\": true" 2
}

function test_master_down_and_up() {
cleanup_process
clean_data
install_sync_diff
setup_replica
gen_full_data
run_dm_components_and_create_source $1
start_task
verify_result
echo "-------start test--------"

for i in $(seq 201 250); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done
verify_result

# make master down
docker-compose -f $CUR/docker-compose.yml pause mariadb_master
# execute sqls in slave
for i in $(seq 401 450); do
exec_sql $slave_port "insert into ${db}.${tb} values($i,$i);"
done
verify_result

# make master up
docker-compose -f $CUR/docker-compose.yml unpause mariadb_master
for i in $(seq 501 550); do
exec_sql $master_port "insert into ${db}.${tb} values($i,$i);"
done

verify_result

clean_task
echo "CASE=test_master_down_and_up $1 success"
}

function run() {
wait_mysql 3306 1
wait_mysql 3307 2
test_master_down_and_up no_relay
test_master_down_and_up relay
}

run
Loading
Loading