Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tool to generate savepoint file #5

Merged
merged 9 commits into from
Nov 13, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
modify code
  • Loading branch information
WangXiangUSTC committed Nov 13, 2017
commit ee3380dee0c2769d771ff7516b41a6a891b866b8
22 changes: 22 additions & 0 deletions generate_binlog_position/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## gen_meta

gen_meta is a tool to generate fake drainer savepoint file.

## How to use

```
Usage of gen_meta:
--pd-urls string
a comma separated list of PD endpoints
--data-dir string
binlog position data directory path (default "binlog_position")
```

## Example
```
./bin/generate_binlog_position --pd-urls="http://127.0.0.1:2375" --data-dir="data.example"
>cat data.example/savePoint
commitTS = 395986424387338242

[suffixs]
```
81 changes: 81 additions & 0 deletions generate_binlog_position/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"flag"
"os"

"github.com/juju/errors"
)

const (
defaultEtcdURLs = "http://127.0.0.1:2379"
defaultDataDir = "binlog_position"
)

// Config holds the configuration of drainer
type Config struct {
*flag.FlagSet
DataDir string `toml:"data-dir" json:"data-dir"`
EtcdURLs string `toml:"pd-urls" json:"pd-urls"`
}

// NewConfig return an instance of configuration
func NewConfig() *Config {
cfg := &Config{}
cfg.FlagSet = flag.NewFlagSet("generate_binlog_position", flag.ContinueOnError)
fs := cfg.FlagSet
fs.StringVar(&cfg.DataDir, "data-dir", defaultDataDir, "binlog position data directory path (default data.drainer)")
fs.StringVar(&cfg.EtcdURLs, "pd-urls", defaultEtcdURLs, "a comma separated list of PD endpoints")
return cfg
}

// Parse parses all config from command-line flags, environment vars or the configuration file
func (cfg *Config) Parse(args []string) error {
// parse first to get config file
perr := cfg.FlagSet.Parse(args)
switch perr {
case nil:
case flag.ErrHelp:
os.Exit(0)
default:
os.Exit(2)
}

// parse command line options
cfg.FlagSet.Parse(args)
if len(cfg.FlagSet.Args()) > 0 {
return errors.Errorf("'%s' is not a valid flag", cfg.FlagSet.Arg(0))
}
// adjust configuration
adjustString(&cfg.DataDir, defaultDataDir)
return cfg.validate()
}

func adjustString(v *string, defValue string) {
if len(*v) == 0 {
*v = defValue
}
}

// validate checks whether the configuration is valid
func (cfg *Config) validate() error {
// check EtcdEndpoints
_, err := NewURLsValue(cfg.EtcdURLs)
if err != nil {
return errors.Errorf("parse EtcdURLs error: %s, %v", cfg.EtcdURLs, err)
}
return nil
}
99 changes: 99 additions & 0 deletions generate_binlog_position/generate_savepoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"fmt"
"os"
"path"
"strings"
"time"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tipb/go-binlog"
"golang.org/x/net/context"
)

const physicalShiftBits = 18
const slowDist = 30 * time.Millisecond

// GenSavepointInfo generates drainer meta from pd
func GenSavepointInfo(cfg *Config) error {
if err := os.MkdirAll(cfg.DataDir, 0700); err != nil {
return errors.Trace(err)
}

urlv, err := NewURLsValue(cfg.EtcdURLs)
if err != nil {
return errors.Trace(err)
}
tidb.RegisterStore("tikv", tikv.Driver{})
tiPath := fmt.Sprintf("tikv://%s?disableGC=true", urlv.HostString())
tiStore, err := tidb.NewStore(tiPath)
if err != nil {
return errors.Trace(err)
}
defer tiStore.Close()

binlogPos := make(map[string]binlog.Pos)
// get newest ts from pd
commitTS, err := getTSO(cfg)
if err != nil {
log.Errorf("get tso failed: %s", err)
return errors.Trace(err)
}

// generate meta infomation
meta := NewLocalMeta(path.Join(cfg.DataDir, "savePoint"))
err = meta.Save(commitTS, binlogPos)
return errors.Trace(err)
}

func getTSO(cfg *Config) (int64, error) {
now := time.Now()

urlv, err := NewURLsValue(cfg.EtcdURLs)
if err != nil {
return 0, errors.Trace(err)
}

pdCli, err := pd.NewClient(urlv.StringSlice())
physical, logical, err := pdCli.GetTS(context.Background())
if err != nil {
return 0, errors.Trace(err)
}
dist := time.Since(now)
if dist > slowDist {
log.Warnf("get timestamp too slow: %s", dist)
}

return int64(composeTS(physical, logical)), nil
}

func composeTS(physical, logical int64) uint64 {
return uint64((physical << physicalShiftBits) + logical)
}

func parseBinlogName(str string) (index uint64, err error) {
if !strings.HasPrefix(str, "binlog-") {
return 0, errors.Errorf("invalid binlog name %s", str)
}

_, err = fmt.Sscanf(str, "binlog-%016d", &index)
return
}
33 changes: 33 additions & 0 deletions generate_binlog_position/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"os"

"github.com/juju/errors"
"github.com/ngaut/log"
)

func main() {
cfg := NewConfig()
if err := cfg.Parse(os.Args[1:]); err != nil {
log.Infof("verifying flags error, See 'drainer --help'. %s", errors.ErrorStack(err))
}

if err := GenSavepointInfo(cfg); err != nil {
log.Infof("fail to generate savepoint error %v", err)
}
os.Exit(0)
}
147 changes: 147 additions & 0 deletions generate_binlog_position/meta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"bytes"
"fmt"
"os"
"sync"
"time"

"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tipb/go-binlog"
"github.com/siddontang/go/ioutil2"
)

var (
maxSaveTime = 30 * time.Second
)

// Meta is savepoint meta interface
type Meta interface {
// Load loads meta information.
Load() error

// Save saves meta information.
Save(int64, map[string]binlog.Pos) error

// Check checks whether we should save meta.
Check() bool

// Pos gets position information.
Pos() (int64, map[string]binlog.Pos)
}

// LocalMeta is local meta struct.
type localMeta struct {
sync.RWMutex

name string
saveTime time.Time

CommitTS int64 `toml:"commitTS" json:"commitTS"`
// drainer only stores the binlog file suffix
Suffixs map[string]uint64 `toml:"suffixs" json:"suffixs"`
}

// NewLocalMeta creates a new LocalMeta.
func NewLocalMeta(name string) Meta {
return &localMeta{name: name, Suffixs: make(map[string]uint64)}
}

// Load implements Meta.Load interface.
func (lm *localMeta) Load() error {
file, err := os.Open(lm.name)
if err != nil && !os.IsNotExist(errors.Cause(err)) {
return errors.Trace(err)
}
if os.IsNotExist(errors.Cause(err)) {
return nil
}
defer file.Close()

_, err = toml.DecodeReader(file, lm)
return errors.Trace(err)
}

// Save implements Meta.Save interface.
func (lm *localMeta) Save(ts int64, poss map[string]binlog.Pos) error {
log.Infof("local meta save")
lm.Lock()
defer lm.Unlock()

for nodeID, pos := range poss {
// for safe restart, we should forward two binlog files
// make sure drainer can get binlogs larger than commitTS
// this is a simple way , if meet problem we would replace by an accurate algorithm
lm.Suffixs[nodeID] = 0
if pos.Suffix > 2 {
lm.Suffixs[nodeID] = pos.Suffix - 2
}
}

lm.CommitTS = ts

var buf bytes.Buffer
e := toml.NewEncoder(&buf)
err := e.Encode(lm)
if err != nil {
log.Errorf("syncer save meta info to file %s err %v", lm.name, errors.ErrorStack(err))
return errors.Trace(err)
}

err = ioutil2.WriteFileAtomic(lm.name, buf.Bytes(), 0644)
if err != nil {
log.Errorf("syncer save meta info to file %s err %v", lm.name, errors.ErrorStack(err))
return errors.Trace(err)
}

lm.saveTime = time.Now()
return nil
}

// Check implements Meta.Check interface.
func (lm *localMeta) Check() bool {
lm.RLock()
defer lm.RUnlock()

if time.Since(lm.saveTime) >= maxSaveTime {
return true
}

return false
}

// Pos implements Meta.Pos interface.
func (lm *localMeta) Pos() (int64, map[string]binlog.Pos) {
lm.RLock()
defer lm.RUnlock()

poss := make(map[string]binlog.Pos)
for nodeID, suffix := range lm.Suffixs {
poss[nodeID] = binlog.Pos{
Suffix: suffix,
Offset: 0,
}
}
return lm.CommitTS, poss
}

func (lm *localMeta) String() string {
ts, poss := lm.Pos()
return fmt.Sprintf("binlog %s commitTS = %d positions = %+v", lm.name, ts, poss)
}
Loading