Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

relay log for oracle #1104

Merged
merged 4 commits into from
Dec 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arbiter/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ func syncBinlogs(ctx context.Context, source <-chan *reader.Message, ld loader.L
}
receivedTs = msg.Binlog.CommitTs

txn, err := loader.SecondaryBinlogToTxn(msg.Binlog)
txn, err := loader.SecondaryBinlogToTxn(msg.Binlog, nil, false)
if err != nil {
log.Error("transfer binlog failed, program will stop handling data from loader", zap.Error(err))
return err
Expand Down
32 changes: 26 additions & 6 deletions drainer/relay.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package drainer

import (
"database/sql"

router "github.com/pingcap/tidb-tools/pkg/table-router"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
Expand Down Expand Up @@ -42,8 +46,12 @@ func feedByRelayLogIfNeed(cfg *Config) error {
if err != nil {
return errors.Annotate(err, "failed to create reader")
}

db, err := loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
var db *sql.DB
if cfg.SyncerCfg.DestDBType == "oracle" {
db, err = loader.CreateOracleDB(cfg.SyncerCfg.To.User, cfg.SyncerCfg.To.Password, scfg.To.Host, scfg.To.Port, cfg.SyncerCfg.To.OracleServiceName, cfg.SyncerCfg.To.OracleConnectString)
} else {
db, err = loader.CreateDBWithSQLMode(scfg.To.User, scfg.To.Password, scfg.To.Host, scfg.To.Port, scfg.To.TLS, scfg.StrSQLMode, scfg.To.Params, scfg.To.ReadTimeout)
}
if err != nil {
return errors.Annotate(err, "failed to create SQL db")
}
Expand All @@ -56,7 +64,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
return errors.Annotate(err, "failed to create loader")
}

err = feedByRelayLog(reader, ld, cp)
err = feedByRelayLog(reader, ld, cp, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -65,7 +73,7 @@ func feedByRelayLogIfNeed(cfg *Config) error {
}

// feedByRelayLog will take over the `ld loader.Loader`.
func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint) error {
func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint, cfg *Config) error {
checkpointTS := cp.TS()
lastSuccessTS := checkpointTS
r.Run()
Expand All @@ -88,6 +96,17 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)

loaderClosed := false

var tableRouter *router.Table = nil
upperColName := false
var routerErr error
if cfg.SyncerCfg.DestDBType == "oracle" {
upperColName = true
tableRouter, _, routerErr = genRouterAndBinlogEvent(cfg.SyncerCfg)
if routerErr != nil {
return errors.Annotate(routerErr, "when feed by relay log, gen router and filter failed")
}
}

for {
// when reader is drained and all txn has been push into loader
// we close cloader.
Expand All @@ -112,8 +131,9 @@ func feedByRelayLog(r relay.Reader, ld loader.Loader, cp checkpoint.CheckPoint)
if sbinlog.CommitTs <= checkpointTS {
continue
}

txn, err := loader.SecondaryBinlogToTxn(sbinlog)
var txn *loader.Txn
var err error
txn, err = loader.SecondaryBinlogToTxn(sbinlog, tableRouter, upperColName)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion drainer/relay/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *testReaderSuite) readBinlogAndCheck(c *C, dir string, expectedNumber in
number := 0
for txn := range relayReader.Binlogs() {
number++
loaderTxn, err := loader.SecondaryBinlogToTxn(txn)
loaderTxn, err := loader.SecondaryBinlogToTxn(txn, nil, false)
c.Assert(err, IsNil)
lastTxn = loaderTxn
}
Expand Down
70 changes: 69 additions & 1 deletion drainer/relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pingcap/check"
"github.com/pingcap/tidb-binlog/drainer/checkpoint"
"github.com/pingcap/tidb-binlog/drainer/relay"
dsync "github.com/pingcap/tidb-binlog/drainer/sync"
"github.com/pingcap/tidb-binlog/drainer/translator"
"github.com/pingcap/tidb-binlog/pkg/binlogfile"
"github.com/pingcap/tidb-binlog/pkg/loader"
Expand Down Expand Up @@ -100,7 +101,74 @@ func (s *relaySuite) TestFeedByRealyLog(c *check.C) {
reader, err := relay.NewReader(relayDir, 1)
c.Assert(err, check.IsNil)

err = feedByRelayLog(reader, ld, cp)
cfg := &Config{}
cfg.SyncerCfg = &SyncerConfig{
DestDBType: "tidb",
To: &dsync.DBConfig{
Checkpoint: dsync.CheckpointConfig{
Type: "oracle",
Host: "host-1",
User: "user-1",
Password: "password-1",
OracleServiceName: "oracle-service-name-1",
OracleConnectString: "oracle-connect-string-1",
Table: "new_checkpoint_table",
},
},
}
err = feedByRelayLog(reader, ld, cp, cfg)
c.Assert(err, check.IsNil)

ts := cp.TS()
c.Assert(ts, check.Equals, int64(90) /* latest commit ts */)
c.Assert(cp.IsConsistent(), check.Equals, true)
}

func (s *relaySuite) TestFeedByRealyLogForOracle(c *check.C) {
cp, err := checkpoint.NewFile(0 /* initialCommitTS */, c.MkDir()+"/checkpoint")
c.Assert(err, check.IsNil)
err = cp.Save(0, 0, false, 0)
c.Assert(err, check.IsNil)
c.Assert(cp.IsConsistent(), check.Equals, false)

ld := newNoOpLoader()

// write some relay log
gen := &translator.BinlogGenerator{}
relayDir := c.MkDir()
relayer, err := relay.NewRelayer(relayDir, binlogfile.SegmentSizeBytes, gen)
c.Assert(err, check.IsNil)

for i := 0; i < 10; i++ {
gen.SetInsert(c)
gen.TiBinlog.StartTs = int64(i)
gen.TiBinlog.CommitTs = int64(i) * 10
_, err = relayer.WriteBinlog(gen.Schema, gen.Table, gen.TiBinlog, gen.PV)
c.Assert(err, check.IsNil)
}

relayer.Close()
c.Assert(err, check.IsNil)

reader, err := relay.NewReader(relayDir, 1)
c.Assert(err, check.IsNil)

cfg := &Config{}
cfg.SyncerCfg = &SyncerConfig{
DestDBType: "oracle",
To: &dsync.DBConfig{
Checkpoint: dsync.CheckpointConfig{
Type: "oracle",
Host: "host-1",
User: "user-1",
Password: "password-1",
OracleServiceName: "oracle-service-name-1",
OracleConnectString: "oracle-connect-string-1",
Table: "new_checkpoint_table",
},
},
}
err = feedByRelayLog(reader, ld, cp, cfg)
lichunzhu marked this conversation as resolved.
Show resolved Hide resolved
c.Assert(err, check.IsNil)

ts := cp.TS()
Expand Down
2 changes: 2 additions & 0 deletions drainer/translator/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func genTable(schema string, tableInfo *model.TableInfo) (table *obinlog.Table)
info := new(obinlog.ColumnInfo)
info.Name = col.Name.O
info.MysqlType = types.TypeToStr(col.Tp, col.Charset)
info.Flen = int32(col.Flen)
info.Decimal = int32(col.Decimal)
info.IsPrimaryKey = mysql.HasPriKeyFlag(col.Flag)
columnInfos = append(columnInfos, info)
}
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ require (
github.com/kami-zh/go-capturer v0.0.0-20171211120116-e492ea43421d
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20210425183316-da1aaba5fb63
github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 // indirect
github.com/pingcap/kvproto v0.0.0-20211011042309-a4518fcacbc8
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/tidb v1.1.0-beta.0.20211026030648-c497d5c06348
github.com/pingcap/tidb v1.1.0-beta.0.20211224065547-93d28b9d3ffe
github.com/pingcap/tidb-tools v5.2.3-0.20211101071251-40e8f0cfcb1d+incompatible
github.com/pingcap/tidb/parser v0.0.0-20211026030648-c497d5c06348
github.com/pingcap/tipb v0.0.0-20211026080602-ec68283c1735
github.com/pingcap/tidb/parser v0.0.0-20211224065547-93d28b9d3ffe
github.com/pingcap/tipb v0.0.0-20211201080053-bd104bb270ba
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/client_model v0.2.0
github.com/prometheus/common v0.26.0 // indirect
Expand All @@ -40,8 +40,8 @@ require (
github.com/soheilhy/cmux v0.1.5
github.com/spf13/cobra v1.2.1 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211028082558-c4250227823e
github.com/tikv/pd v1.1.0-beta.0.20211027071649-433d4f2847be
github.com/tikv/client-go/v2 v2.0.0-rc.0.20211223062159-300275dee63e
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/unrolled/render v1.0.1
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/zap v1.19.1
Expand Down
Loading