Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
*: parse the data source directly into data and skip the KV encoder
Browse files Browse the repository at this point in the history
This skips the more complex pingcap/parser, and speeds up parsing speed
by 50%.
  • Loading branch information
kennytm committed Mar 14, 2019
1 parent e15468e commit ce043f9
Show file tree
Hide file tree
Showing 18 changed files with 4,302 additions and 2,852 deletions.
7 changes: 6 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@ require (
github.com/coreos/go-semver v0.2.0
github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142 // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/cznic/golex v0.0.0-20181122101858-9c343928389c // indirect
github.com/cznic/mathutil v0.0.0-20181021201202-eba54fb065b7
github.com/cznic/parser v0.0.0-20181122101858-d773202d5b1f
github.com/cznic/sortutil v0.0.0-20150617083342-4c7342852e65
github.com/cznic/strutil v0.0.0-20181122101858-275e90344537
github.com/cznic/y v0.0.0-20181122101901-b05e8c2e8d7b
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/etcd-io/gofail v0.0.0-20180808172546-51ce9a71510a
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.2.0
github.com/gorilla/context v1.1.1 // indirect
Expand All @@ -23,6 +27,7 @@ require (
github.com/pingcap/parser v0.0.0-20190305073013-4f60445a0550
github.com/pingcap/tidb v0.0.0-20190309032432-ea9970968c73
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20180910045846-371b48b15d93
github.com/prometheus/client_golang v0.9.2
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910
github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446 // indirect
Expand Down
45 changes: 45 additions & 0 deletions go.sum

Large diffs are not rendered by default.

13 changes: 11 additions & 2 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/BurntSushi/toml"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-tools/pkg/filter"
)
Expand All @@ -41,9 +42,11 @@ type DBStore struct {
Psw string `toml:"password" json:"-"`
StatusPort int `toml:"status-port" json:"status-port"`
PdAddr string `toml:"pd-addr" json:"pd-addr"`
SQLMode string `toml:"sql-mode" json:"sql-mode"`
StrSQLMode string `toml:"sql-mode" json:"sql-mode"`
LogLevel string `toml:"log-level" json:"log-level"`

SQLMode mysql.SQLMode `json:"-"`

DistSQLScanConcurrency int `toml:"distsql-scan-concurrency" json:"distsql-scan-concurrency"`
BuildStatsConcurrency int `toml:"build-stats-concurrency" json:"build-stats-concurrency"`
IndexSerialScanConcurrency int `toml:"index-serial-scan-concurrency" json:"index-serial-scan-concurrency"`
Expand Down Expand Up @@ -161,7 +164,8 @@ func NewConfig() *Config {
CheckRequirements: true,
},
TiDB: DBStore{
SQLMode: "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION",
StrSQLMode: "STRICT_TRANS_TABLES,NO_ENGINE_SUBSTITUTION",
SQLMode: mysql.ModeStrictTransTables | mysql.ModeNoEngineSubstitution,
BuildStatsConcurrency: 20,
DistSQLScanConcurrency: 100,
IndexSerialScanConcurrency: 20,
Expand Down Expand Up @@ -226,6 +230,11 @@ func (cfg *Config) Load() error {
return errors.New("invalid config: `mydumper.csv.delimiter` must be one byte long or empty")
}

cfg.TiDB.SQLMode, err = mysql.GetSQLMode(cfg.TiDB.StrSQLMode)
if err != nil {
return errors.New("invalid config: `mydumper.tidb.sql_mode` must be a valid SQL_MODE")
}

// handle mydumper
if cfg.Mydumper.BatchSize <= 0 {
cfg.Mydumper.BatchSize = 100 * _G
Expand Down
225 changes: 225 additions & 0 deletions lightning/kv/session.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package kv

import (
"context"
"fmt"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
kvec "github.com/pingcap/tidb/util/kvencoder"
binlog "github.com/pingcap/tipb/go-binlog"
)

// transaction is a trimmed down Transaction type which only supports adding a
// new KV pair.
type transaction struct {
kvPairs []kvec.KvPair
}

func (t *transaction) Get(k kv.Key) ([]byte, error) {
panic("unexpected Get() call")
}

func (t *transaction) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
panic("unexpected Iter() call")
}

func (t *transaction) IterReverse(k kv.Key) (kv.Iterator, error) {
panic("unexpected IterReverse() call")
}

func (t *transaction) Set(k kv.Key, v []byte) error {
t.kvPairs = append(t.kvPairs, kvec.KvPair{
Key: k.Clone(),
Val: append([]byte{}, v...),
})
return nil
}

func (t *transaction) Delete(k kv.Key) error {
panic("unexpected Delete() call")
}

func (t *transaction) Size() int {
panic("unexpected Size() call")
}

func (t *transaction) Len() int {
panic("unexpected Len() call")
}

func (t *transaction) Reset() {
panic("unexpected Reset() call")
}

func (t *transaction) SetCap(cap int) {
panic("unexpected SetCap() call")
}

func (t *transaction) Commit(context.Context) error {
panic("unexpected Commit() call")
}

func (t *transaction) Rollback() error {
panic("unexpected Rollback() call")
}

func (t *transaction) String() string {
panic("unexpected String() call")
}

func (t *transaction) LockKeys(keys ...kv.Key) error {
panic("unexpected LockKeys() call")
}

func (t *transaction) SetOption(opt kv.Option, val interface{}) {}

func (t *transaction) DelOption(kv.Option) {}

func (t *transaction) IsReadOnly() bool {
panic("unexpected IsReadOnly() call")
}

func (t *transaction) StartTS() uint64 {
panic("unexpected StartTS() call")
}

func (t *transaction) Valid() bool {
panic("unexpected Valid() call")
}

func (t *transaction) GetMemBuffer() kv.MemBuffer {
panic("unexpected GetMemBuffer() call")
}

func (t *transaction) SetVars(vars *kv.Variables) {
panic("unexpected SetVars() call")
}

func (t *transaction) BatchGet(keys []kv.Key) (map[string][]byte, error) {
panic("unexpected BatchGet() call")
}

func (t *transaction) GetSnapshot() kv.Snapshot {
panic("unexpected GetSnapshot() call")
}

//------------------------------------------------------------------------------

// transaction is a trimmed down Transaction type which only supports adding a
// new KV pair.
type session struct {
txn transaction
vars *variable.SessionVars
}

func newSession(sqlMode mysql.SQLMode) *session {
vars := variable.NewSessionVars()
vars.LightningMode = true
vars.SkipUTF8Check = true
vars.StmtCtx.InInsertStmt = true
vars.StmtCtx.BadNullAsWarning = !sqlMode.HasStrictMode()
vars.StmtCtx.TruncateAsWarning = !sqlMode.HasStrictMode()
vars.StmtCtx.IgnoreZeroInDate = !sqlMode.HasStrictMode()
vars.StmtCtx.TimeZone = vars.Location()
return &session{
txn: transaction{},
vars: vars,
}
}

func (se *session) takeKvPairs() []kvec.KvPair {
pairs := se.txn.kvPairs
se.txn.kvPairs = make([]kvec.KvPair, 0, len(pairs))
return pairs
}

func (se *session) NewTxn() error {
panic("unexpected NewTxn() call")
}

func (se *session) Txn(active bool) (kv.Transaction, error) {
return &se.txn, nil
}

func (se *session) GetClient() kv.Client {
panic("unexpected GetClient() call")
}

func (se *session) SetValue(key fmt.Stringer, value interface{}) {
panic("unexpected SetValue() call")
}

func (se *session) Value(key fmt.Stringer) interface{} {
panic("unexpected Value() call")
}

func (se *session) ClearValue(key fmt.Stringer) {
panic("unexpected ClearValue() call")
}

func (se *session) GetSessionVars() *variable.SessionVars {
return se.vars
}

func (se *session) GetSessionManager() util.SessionManager {
panic("unexpected GetSessionManager() call")
}

func (se *session) RefreshTxnCtx(context.Context) error {
panic("unexpected RefreshTxnCtx() call")
}

func (se *session) InitTxnWithStartTS(startTS uint64) error {
panic("unexpected InitTxnWithStartTS() call")
}

func (se *session) GetStore() kv.Storage {
panic("unexpected GetStore() call")
}

func (se *session) PreparedPlanCache() *kvcache.SimpleLRUCache {
panic("unexpected PreparedPlanCache() call")
}

func (se *session) StoreQueryFeedback(feedback interface{}) {
panic("unexpected StoreQueryFeedback() call")
}

func (se *session) StmtCommit() error {
panic("unexpected StmtCommit() call")
}

func (se *session) StmtRollback() {
panic("unexpected StmtRollback() call")
}

func (se *session) StmtGetMutation(int64) *binlog.TableMutation {
panic("unexpected StmtGetMutation() call")
}

func (se *session) StmtAddDirtyTableOP(op int, physicalID int64, handle int64, row []types.Datum) {
panic("unexpected StmtAddDirtyTableOP() call")
}

func (se *session) DDLOwnerChecker() owner.DDLOwnerChecker {
panic("unexpected DDLOwnerChecker() call")
}
Loading

0 comments on commit ce043f9

Please sign in to comment.