From 502f884e552999c9a8f16f82ef0aba2fb3551d9e Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Tue, 26 Nov 2013 20:49:57 -0800 Subject: [PATCH] The grand purge: delete old binlog code --- .gitignore | 2 - Makefile | 2 - go/cmd/vt_binlog_player/vt_binlog_player.go | 101 --- go/cmd/vt_binlog_server/vt_binlog_server.go | 52 -- go/vt/mysqlctl/binlog_decoder.go | 143 ---- go/vt/mysqlctl/binlog_parser.go | 661 ----------------- go/vt/mysqlctl/binlog_reader.go | 235 ------ go/vt/mysqlctl/binlog_server.go | 769 -------------------- go/vt/mysqlctl/event_streamer.go | 2 +- go/vt/mysqlctl/proto/binlog.go | 115 --- go/vt/mysqlctl/proto/binlog_server.go | 87 --- go/vt/mysqlctl/updatestreamctl.go | 11 +- go/vt/tabletserver/rowcache_invalidator.go | 4 +- go/vt/vttablet/agent.go | 8 - 14 files changed, 8 insertions(+), 2184 deletions(-) delete mode 100644 go/cmd/vt_binlog_player/vt_binlog_player.go delete mode 100644 go/cmd/vt_binlog_server/vt_binlog_server.go delete mode 100644 go/vt/mysqlctl/binlog_decoder.go delete mode 100644 go/vt/mysqlctl/binlog_parser.go delete mode 100644 go/vt/mysqlctl/binlog_reader.go delete mode 100644 go/vt/mysqlctl/binlog_server.go delete mode 100644 go/vt/mysqlctl/proto/binlog.go delete mode 100644 go/vt/mysqlctl/proto/binlog_server.go diff --git a/.gitignore b/.gitignore index 26174d341d0..c0e0d49c4bf 100644 --- a/.gitignore +++ b/.gitignore @@ -19,8 +19,6 @@ third_party/mysql go/cmd/mysqlctl/mysqlctl go/cmd/normalizer/normalizer go/cmd/topo2topo/topo2topo -go/cmd/vt_binlog_player/vt_binlog_player -go/cmd/vt_binlog_server/vt_binlog_server go/cmd/vtaction/vtaction go/cmd/vtgate/vtgate go/cmd/vtclient2/vtclient2 diff --git a/Makefile b/Makefile index b0d08344289..82f224ce891 100644 --- a/Makefile +++ b/Makefile @@ -10,8 +10,6 @@ build: cd go/cmd/mysqlctl; go build cd go/cmd/normalizer; go build cd go/cmd/topo2topo; go build - cd go/cmd/vt_binlog_player; go build - cd go/cmd/vt_binlog_server; go build cd go/cmd/vtaction; go build cd go/cmd/vtgate; go build cd go/cmd/vtclient2; go build diff --git a/go/cmd/vt_binlog_player/vt_binlog_player.go b/go/cmd/vt_binlog_player/vt_binlog_player.go deleted file mode 100644 index 49bc9ec58f7..00000000000 --- a/go/cmd/vt_binlog_player/vt_binlog_player.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -/* -The vt_binlog_player reads data from the a remote host via vt_binlog_server. -This is mostly intended for online data migrations. -This program reads the current status from blp_recovery (by uid), -and updates it. -*/ -package main - -import ( - "encoding/json" - "flag" - "fmt" - "io/ioutil" - _ "net/http/pprof" - "os" - "os/signal" - "syscall" - - log "github.com/golang/glog" - "github.com/youtube/vitess/go/mysql" - "github.com/youtube/vitess/go/vt/key" - "github.com/youtube/vitess/go/vt/mysqlctl" - "github.com/youtube/vitess/go/vt/servenv" -) - -var ( - uid = flag.Uint("uid", 0, "id of the blp_checkpoint row") - start = flag.String("start", "", "keyrange start to use in hex") - end = flag.String("end", "", "keyrange end to use in hex") - port = flag.Int("port", 0, "port for the server") - dbConfigFile = flag.String("db-config-file", "", "json file for db credentials") - debug = flag.Bool("debug", true, "run a debug version - prints the sql statements rather than executing them") -) - -func readDbConfig(dbConfigFile string) (*mysql.ConnectionParams, error) { - dbConfigData, err := ioutil.ReadFile(dbConfigFile) - if err != nil { - return nil, fmt.Errorf("Error %s in reading db-config-file %s", err, dbConfigFile) - } - log.Infof("dbConfigData %v", string(dbConfigData)) - - dbConfig := new(mysql.ConnectionParams) - err = json.Unmarshal(dbConfigData, dbConfig) - if err != nil { - return nil, fmt.Errorf("error in unmarshaling dbconfig data, err '%v'", err) - } - return dbConfig, nil -} - -// TODO: Either write a test for this tool or delete it. -func main() { - flag.Parse() - servenv.Init() - defer servenv.Close() - - keyRange, err := key.ParseKeyRangeParts(*start, *end) - if err != nil { - log.Fatalf("Invalid key range: %v", err) - } - - if *dbConfigFile == "" { - log.Fatalf("Cannot start without db-config-file") - } - dbConfig, err := readDbConfig(*dbConfigFile) - if err != nil { - log.Fatalf("Cannot read db config file: %v", err) - } - - interrupted := make(chan struct{}) - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGTERM) - go func() { - for _ = range c { - close(interrupted) - } - }() - - var vtClient mysqlctl.VtClient - vtClient = mysqlctl.NewDbClient(dbConfig) - err = vtClient.Connect() - if err != nil { - log.Fatalf("error in initializing dbClient: %v", err) - } - brs, err := mysqlctl.ReadStartPosition(vtClient, uint32(*uid)) - if err != nil { - log.Fatalf("Cannot read start position from db: %v", err) - } - if *debug { - vtClient = mysqlctl.NewDummyVtClient() - } - blp := mysqlctl.NewBinlogPlayer(vtClient, fmt.Sprintf("localhost:%d", *port), keyRange, brs) - err = blp.ApplyBinlogEvents(interrupted) - if err != nil { - log.Errorf("Error in applying binlog events, err %v", err) - } - log.Infof("vt_binlog_player done") -} diff --git a/go/cmd/vt_binlog_server/vt_binlog_server.go b/go/cmd/vt_binlog_server/vt_binlog_server.go deleted file mode 100644 index 9d7cf9a04fb..00000000000 --- a/go/cmd/vt_binlog_server/vt_binlog_server.go +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// vt binlog server: Serves binlog for out of band replication. -package main - -import ( - "flag" - - log "github.com/golang/glog" - "github.com/youtube/vitess/go/rpcwrap" - "github.com/youtube/vitess/go/vt/dbconfigs" - "github.com/youtube/vitess/go/vt/mysqlctl" - "github.com/youtube/vitess/go/vt/mysqlctl/proto" - "github.com/youtube/vitess/go/vt/servenv" -) - -var ( - port = flag.Int("port", 6614, "port for the server") - dbname = flag.String("dbname", "", "database name") - mycnfFile = flag.String("mycnf-file", "", "path of mycnf file") -) - -func main() { - dbCredentialsFile := dbconfigs.RegisterCommonFlags() - flag.Parse() - servenv.Init() - - if *mycnfFile == "" { - log.Fatalf("Please specify the path for mycnf file.") - } - mycnf, err := mysqlctl.ReadMycnf(*mycnfFile) - if err != nil { - log.Fatalf("Error reading mycnf file %v", *mycnfFile) - } - dbcfgs, err := dbconfigs.Init(mycnf.SocketFile, *dbCredentialsFile) - if err != nil { - log.Warning(err) - } - mysqld := mysqlctl.NewMysqld(mycnf, dbcfgs.Dba, dbcfgs.Repl) - - binlogServer := mysqlctl.NewBinlogServer(mysqld) - mysqlctl.EnableBinlogServerService(binlogServer, *dbname) - - proto.RegisterBinlogServer(binlogServer) - rpcwrap.RegisterAuthenticated(binlogServer) - servenv.OnClose(func() { - mysqlctl.DisableBinlogServerService(binlogServer) - }) - servenv.Run(*port) -} diff --git a/go/vt/mysqlctl/binlog_decoder.go b/go/vt/mysqlctl/binlog_decoder.go deleted file mode 100644 index 4c32b6e49a8..00000000000 --- a/go/vt/mysqlctl/binlog_decoder.go +++ /dev/null @@ -1,143 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package mysqlctl - -import ( - "fmt" - "io" - "os" - "path" - - log "github.com/golang/glog" - vtenv "github.com/youtube/vitess/go/vt/env" -) - -/* - -d, --database=name List entries for just this database (local log only). - -D, --disable-log-bin - Disable binary log. This is useful, if you enabled - --to-last-log and are sending the output to the same - MySQL server. This way you could avoid an endless loop. - You would also like to use it when restoring after a - crash to avoid duplication of the statements you already - have. NOTE: you will need a SUPER privilege to use this - option. - -F, --force-if-open Force if binlog was not closed properly. - -f, --force-read Force reading unknown binlog events. - -h, --host=name Get the binlog from server. - -l, --local-load=name - Prepare local temporary files for LOAD DATA INFILE in the - specified directory. - -o, --offset=# Skip the first N entries. - -p, --password[=name] - Password to connect to remote server. - -P, --port=# Port number to use for connection or 0 for default to, in - order of preference, my.cnf, $MYSQL_TCP_PORT, - /etc/services, built-in default (3306). - --protocol=name The protocol to use for connection (tcp, socket, pipe, - memory). - -R, --read-from-remote-server - Read binary logs from a MySQL server. - -r, --result-file=name - Direct output to a given file. - --server-id=# Extract only binlog entries created by the server having - the given id. - --set-charset=name Add 'SET NAMES character_set' to the output. - -S, --socket=name The socket file to use for connection. - -j, --start-position=# - Start reading the binlog at position N. Applies to the - first binlog passed on the command line. - --stop-position=# Stop reading the binlog at position N. Applies to the - last binlog passed on the command line. - -t, --to-last-log Requires -R. Will not stop at the end of the requested - binlog but rather continue printing until the end of the - last binlog of the MySQL server. If you send the output - to the same MySQL server, that may lead to an endless - loop. - -u, --user=name Connect to the remote server as username. - -*/ - -type BinlogDecoder struct { - process *os.Process -} - -// findVtMysqlbinlogDir finds the directory that contains vt_mysqlbinlog: -// could be with the mysql distribution, or with the vt distribution -func findVtMysqlbinlogDir() (string, error) { - // first look in VtRoot - dir, err := vtenv.VtRoot() - if err == nil { - if _, err = os.Stat(path.Join(dir, "bin/vt_mysqlbinlog")); err == nil { - return dir, nil - } - } - - // then look in VtMysqlRoot - dir, err = vtenv.VtMysqlRoot() - if err == nil { - if _, err = os.Stat(path.Join(dir, "bin/vt_mysqlbinlog")); err == nil { - return dir, nil - } - } - - // then try current directory + bin/vt_mysqlbinlog - if _, err = os.Stat("bin/vt_mysqlbinlog"); err == nil { - return "", nil - } - - return "", fmt.Errorf("Cannot find vt_mysqlbinlog binary") -} - -// return a Reader from which the decoded binlog can be read -func (decoder *BinlogDecoder) DecodeMysqlBinlog(binlog *os.File) (io.Reader, error) { - dir, err := findVtMysqlbinlogDir() - if err != nil { - return nil, err - } - dir = path.Join(dir, "bin") - name := "vt_mysqlbinlog" - arg := []string{"vt_mysqlbinlog", "-"} - - dataRdFile, dataWrFile, pipeErr := os.Pipe() - if pipeErr != nil { - log.Errorf("DecodeMysqlBinlog: error in creating pipe %v", pipeErr) - return nil, pipeErr - } - // let the caller close the read file - defer dataWrFile.Close() - - fds := []*os.File{ - binlog, - dataWrFile, - os.Stderr, - } - - attrs := &os.ProcAttr{Dir: dir, Files: fds} - - process, err := os.StartProcess(name, arg, attrs) - if err != nil { - log.Errorf("DecodeMysqlBinlog: error in decoding binlog %v", err) - return nil, err - } - decoder.process = process - - go func() { - // just make sure we don't spawn zombies - waitMsg, err := decoder.process.Wait() - if err != nil { - log.Errorf("vt_mysqlbinlog exited: %v err: %v", waitMsg, err) - } else { - log.Infof("vt_mysqlbinlog exited: %v err: %v", waitMsg, err) - } - }() - - return dataRdFile, nil -} - -func (decoder *BinlogDecoder) Kill() error { - //log.Infof("Killing vt_mysqlbinlog pid %v", decoder.process.Pid) - return decoder.process.Kill() -} diff --git a/go/vt/mysqlctl/binlog_parser.go b/go/vt/mysqlctl/binlog_parser.go deleted file mode 100644 index b8023b379bd..00000000000 --- a/go/vt/mysqlctl/binlog_parser.go +++ /dev/null @@ -1,661 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package mysqlctl - -import ( - "bufio" - "bytes" - "encoding/json" - "fmt" - "io" - "os" - "path" - "strconv" - "strings" - "time" - - log "github.com/golang/glog" - "github.com/youtube/vitess/go/vt/mysqlctl/proto" - parser "github.com/youtube/vitess/go/vt/sqlparser" -) - -// Functionality to parse a binlog log and send event streams to clients. - -const ( - MAX_TXN_BATCH = 1024 - DML = "DML" - DDL = "DDL" - BEGIN = "BEGIN" - COMMIT = "COMMIT" - USE = "use" - EOF = "EOF" -) - -var SqlKwMap = map[string]string{ - "create": DDL, - "alter": DDL, - "drop": DDL, - "truncate": DDL, - "rename": DDL, - "insert": DML, - "update": DML, - "delete": DML, - "begin": BEGIN, - "commit": COMMIT, -} - -var ( - BINLOG_DELIMITER = []byte("/*!*/;") - BINLOG_POSITION_PREFIX = []byte("# at ") - BINLOG_ROTATE_TO = []byte("Rotate to ") - BINLOG_BEGIN = []byte("BEGIN") - BINLOG_COMMIT = []byte("COMMIT") - BINLOG_ROLLBACK = []byte("ROLLBACK") - BINLOG_END_LOG_POS = []byte("end_log_pos ") - BINLOG_XID = []byte("Xid = ") - BINLOG_GROUP_ID = []byte("group_id ") - BINLOG_START = []byte("Start: binlog") - BINLOG_DB_CHANGE = []byte(USE) - POS = []byte(" pos: ") - COMMENT = []byte("/*!") - SET_SESSION_VAR = []byte("SET @@session") - DELIMITER = []byte("DELIMITER ") - BINLOG = []byte("BINLOG ") - SEMICOLON_BYTE = []byte(";") -) - -// Api Interface -type UpdateResponse struct { - Coord proto.BinlogPosition - Data EventData -} - -type EventData struct { - SqlType string - TableName string - Sql string - PkColNames []string - PkValues [][]interface{} -} - -// Raw event buffer used to gather data during parsing. -type eventBuffer struct { - Coord proto.BinlogPosition - LogLine []byte - firstKw string -} - -func NewEventBuffer(pos *proto.BinlogPosition, line []byte) *eventBuffer { - buf := &eventBuffer{} - buf.LogLine = make([]byte, len(line)) - written := copy(buf.LogLine, line) - if written < len(line) { - log.Warningf("Logline not properly copied while creating new event written: %v len: %v", written, len(line)) - } - buf.Coord.Position = pos.Position - buf.Coord.Timestamp = pos.Timestamp - return buf -} - -type blpStats struct { - TimeStarted time.Time - RotateEventCount uint16 - TxnCount uint64 - DmlCount uint64 - DdlCount uint64 - LineCount uint64 - BigLineCount uint64 - BufferFullErrors uint64 - LinesPerSecond float64 - TxnsPerSecond float64 -} - -func (stats blpStats) DebugJsonString() string { - elapsed := float64(time.Now().Sub(stats.TimeStarted)) / 1e9 - stats.LinesPerSecond = float64(stats.LineCount) / elapsed - stats.TxnsPerSecond = float64(stats.TxnCount) / elapsed - data, err := json.MarshalIndent(stats, " ", " ") - if err != nil { - return err.Error() - } - return string(data) -} - -type Blp struct { - nextStmtPosition uint64 - inTxn bool - txnLineBuffer []*eventBuffer - responseStream []*UpdateResponse - initialSeek bool - startPosition *proto.ReplicationCoordinates - currentPosition *proto.BinlogPosition - globalState *UpdateStream - blpStats -} - -func NewBlp(startCoordinates *proto.ReplicationCoordinates, updateStream *UpdateStream) *Blp { - blp := &Blp{} - blp.startPosition = startCoordinates - blp.currentPosition = &proto.BinlogPosition{Position: *startCoordinates} - blp.inTxn = false - blp.initialSeek = true - blp.txnLineBuffer = make([]*eventBuffer, 0, MAX_TXN_BATCH) - blp.responseStream = make([]*UpdateResponse, 0, MAX_TXN_BATCH) - blp.globalState = updateStream - return blp -} - -// Error types for update stream -const ( - FATAL = "Fatal" - SERVICE_ERROR = "Service Error" - EVENT_ERROR = "Event Error" - CODE_ERROR = "Code Error" - REPLICATION_ERROR = "Replication Error" - CONNECTION_ERROR = "Connection Error" -) - -type BinlogParseError struct { - errType string - msg string -} - -func NewBinlogParseError(errType, msg string) *BinlogParseError { - return &BinlogParseError{errType: errType, msg: msg} -} - -func (err BinlogParseError) Error() string { - errStr := fmt.Sprintf("%v: %v", err.errType, err.msg) - if err.IsFatal() { - return FATAL + " " + errStr - } - return errStr -} - -func (err *BinlogParseError) IsFatal() bool { - return (err.errType != EVENT_ERROR) -} - -func (err *BinlogParseError) IsEOF() bool { - return err.msg == EOF -} - -type SendUpdateStreamResponse func(response interface{}) error - -func (blp *Blp) ComputeBacklog() int64 { - rp, replErr := blp.globalState.getReplicationPosition() - if replErr != nil { - return -1 - } - pos := int64(blp.currentPosition.Position.MasterPosition) - if rp.MasterFilename != blp.currentPosition.Position.MasterFilename { - pos = 0 - } - return int64(rp.MasterPosition) - pos -} - -// Main entry function for reading and parsing the binlog. -func (blp *Blp) StreamBinlog(sendReply SendUpdateStreamResponse, binlogPrefix string) (err error) { - var lastRun time.Time - count := 0 - for { - count++ - lastRun = time.Now() - err = blp.streamBinlog(sendReply, binlogPrefix) - if err != nil { - log.Errorf("StreamBinlog error @ %v, error: %v", blp.currentPosition.String(), err.Error()) - return err - } - diff := time.Now().Sub(lastRun) - if diff < (100 * time.Millisecond) { - time.Sleep(100*time.Millisecond - diff) - } - *blp.startPosition = blp.currentPosition.Position - } -} - -func (blp *Blp) handleError(err *error) { - reqIdentifier := blp.currentPosition.String() - if x := recover(); x != nil { - serr, ok := x.(*BinlogParseError) - if !ok { - log.Errorf("Uncaught panic for stream @ %v", reqIdentifier) - panic(x) - } - *err = NewBinlogParseError(serr.errType, serr.msg) - } -} - -func (blp *Blp) streamBinlog(sendReply SendUpdateStreamResponse, binlogPrefix string) (err error) { - if blp.globalState.dbname == "" { - return NewBinlogParseError(SERVICE_ERROR, "db name was not specified") - } - mbl := &MysqlBinlog{} - reader, err := mbl.Launch( - blp.globalState.dbname, - path.Join(path.Dir(binlogPrefix), blp.startPosition.MasterFilename), - int64(blp.startPosition.MasterPosition), - ) - if err != nil { - return err - } - defer reader.Close() - err = blp.parseBinlogEvents(sendReply, reader) - if err != nil { - mbl.Kill() - } else { - err = mbl.Wait() - } - return err -} - -// Main parse loop -func (blp *Blp) parseBinlogEvents(sendReply SendUpdateStreamResponse, binlogReader io.Reader) (err error) { - defer blp.handleError(&err) - // read over the stream and buffer up the transactions - var line []byte - bigLine := make([]byte, 0, BINLOG_BLOCK_SIZE) - lineReader := bufio.NewReaderSize(binlogReader, BINLOG_BLOCK_SIZE) - readAhead := false - var event *eventBuffer - - for { - if !blp.globalState.isEnabled() { - panic(NewBinlogParseError(SERVICE_ERROR, "Disconnecting because the Update Stream service has been disabled")) - } - line = line[:0] - bigLine = bigLine[:0] - line, err = blp.readBlpLine(lineReader, bigLine) - if err != nil { - if err == io.EOF { - // end of stream - return nil - } - panic(NewBinlogParseError(REPLICATION_ERROR, fmt.Sprintf("ReadLine err: , %v", err))) - } - if len(line) == 0 { - continue - } - blp.LineCount++ - - if line[0] == '#' { - // parse positional data - line = bytes.TrimSpace(line) - blp.parsePositionData(line) - } else { - // parse event data - - // readAhead is used for reading a log line that is spread across multiple lines and has newlines within it. - if readAhead { - event.LogLine = append(event.LogLine, line...) - } else { - event = NewEventBuffer(blp.currentPosition, line) - } - - if bytes.HasSuffix(event.LogLine, BINLOG_DELIMITER) { - event.LogLine = event.LogLine[:len(event.LogLine)-len(BINLOG_DELIMITER)] - readAhead = false - } else { - readAhead = true - continue - } - - event.LogLine = bytes.TrimSpace(event.LogLine) - event.firstKw = string(bytes.ToLower(bytes.SplitN(event.LogLine, SPACE, 2)[0])) - - // processes statements only for the dbname that it is subscribed to. - blp.parseEventData(sendReply, event) - } - } - return nil -} - -func (blp *Blp) parsePositionData(line []byte) { - if bytes.HasPrefix(line, BINLOG_POSITION_PREFIX) { - // Master Position - if blp.nextStmtPosition == 0 { - return - } - } else if bytes.Index(line, BINLOG_ROTATE_TO) != -1 { - blp.parseRotateEvent(line) - } else if bytes.Index(line, BINLOG_END_LOG_POS) != -1 { - // Ignore the position data that appears at the start line of binlog. - if bytes.Index(line, BINLOG_START) != -1 { - return - } - blp.parseMasterPosition(line) - if blp.nextStmtPosition != 0 { - blp.currentPosition.Position.MasterPosition = blp.nextStmtPosition - } - } - if bytes.Index(line, BINLOG_XID) != -1 { - blp.parseXid(line) - } -} - -func (blp *Blp) parseEventData(sendReply SendUpdateStreamResponse, event *eventBuffer) { - if bytes.HasPrefix(event.LogLine, BINLOG_SET_TIMESTAMP) { - blp.extractEventTimestamp(event) - blp.initialSeek = false - if blp.inTxn { - blp.txnLineBuffer = append(blp.txnLineBuffer, event) - } - } else if bytes.HasPrefix(event.LogLine, BINLOG_SET_INSERT) { - if blp.inTxn { - blp.txnLineBuffer = append(blp.txnLineBuffer, event) - } else { - panic(NewBinlogParseError(EVENT_ERROR, fmt.Sprintf("SET INSERT_ID outside a txn - len %v, dml '%v'", len(blp.txnLineBuffer), string(event.LogLine)))) - } - } else if bytes.HasPrefix(event.LogLine, BINLOG_BEGIN) { - blp.handleBeginEvent(event) - } else if bytes.HasPrefix(event.LogLine, BINLOG_COMMIT) { - blp.handleCommitEvent(sendReply, event) - blp.inTxn = false - blp.txnLineBuffer = blp.txnLineBuffer[0:0] - } else if bytes.HasPrefix(event.LogLine, BINLOG_ROLLBACK) { - blp.inTxn = false - blp.txnLineBuffer = blp.txnLineBuffer[0:0] - } else if len(event.LogLine) > 0 { - sqlType := GetSqlType(event.firstKw) - if blp.inTxn && IsTxnStatement(event.LogLine, event.firstKw) { - blp.txnLineBuffer = append(blp.txnLineBuffer, event) - blp.DmlCount++ - } else if sqlType == DDL { - blp.handleDdlEvent(sendReply, event) - } else { - if sqlType == DML { - panic(NewBinlogParseError(EVENT_ERROR, fmt.Sprintf("DML outside a txn - len %v, dml '%v'", len(blp.txnLineBuffer), string(event.LogLine)))) - } - // Ignore these often occuring statement types. - if !IgnoredStatement(event.LogLine) { - log.Warningf("Unknown statement '%v'", string(event.LogLine)) - } - } - } -} - -// Position Parsing Functions. - -func (blp *Blp) parseMasterPosition(line []byte) { - var err error - rem := bytes.Split(line, BINLOG_END_LOG_POS) - masterPosStr := string(bytes.Split(rem[1], SPACE)[0]) - blp.nextStmtPosition, err = strconv.ParseUint(masterPosStr, 10, 64) - if err != nil { - panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting master position %v", err))) - } -} - -func (blp *Blp) parseXid(line []byte) { - rem := bytes.Split(line, BINLOG_XID) - xid, err := strconv.ParseUint(string(rem[1]), 10, 64) - if err != nil { - panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting Xid position %v", err))) - } - blp.currentPosition.Xid = xid -} - -func (blp *Blp) extractEventTimestamp(event *eventBuffer) { - line := event.LogLine - currentTimestamp, err := strconv.ParseInt(string(line[len(BINLOG_SET_TIMESTAMP):]), 10, 64) - if err != nil { - panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting timestamp %v", err))) - } - blp.currentPosition.Timestamp = currentTimestamp - event.Coord.Timestamp = currentTimestamp -} - -func (blp *Blp) parseRotateEvent(line []byte) { - blp.RotateEventCount++ - rem := bytes.Split(line, BINLOG_ROTATE_TO) - rem2 := bytes.Split(rem[1], POS) - rotateFilename := strings.TrimSpace(string(rem2[0])) - rotatePos, err := strconv.ParseUint(string(rem2[1]), 10, 64) - if err != nil { - panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting rotate pos %v from line %s", err, string(line)))) - } - - // If the file being parsed is a binlog, - // then the rotate events only correspond to itself. - blp.currentPosition.Position.MasterFilename = rotateFilename - blp.currentPosition.Position.MasterPosition = rotatePos -} - -// Data event parsing and handling functions. - -func (blp *Blp) handleBeginEvent(event *eventBuffer) { - if len(blp.txnLineBuffer) != 0 || blp.inTxn { - panic(NewBinlogParseError(EVENT_ERROR, fmt.Sprintf("BEGIN encountered with non-empty trxn buffer, len: %d", len(blp.txnLineBuffer)))) - } - blp.txnLineBuffer = blp.txnLineBuffer[:0] - blp.inTxn = true - blp.txnLineBuffer = append(blp.txnLineBuffer, event) -} - -func (blp *Blp) handleDdlEvent(sendReply SendUpdateStreamResponse, event *eventBuffer) { - ddlStream := createDdlStream(event) - buf := []*UpdateResponse{ddlStream} - sendStream(sendReply, buf) - blp.DdlCount++ -} - -func (blp *Blp) handleCommitEvent(sendReply SendUpdateStreamResponse, commitEvent *eventBuffer) { - commitEvent.Coord.Xid = blp.currentPosition.Xid - blp.txnLineBuffer = append(blp.txnLineBuffer, commitEvent) - // txn block for DMLs, parse it and send events for a txn - var dmlCount uint - blp.responseStream, dmlCount = buildTxnResponse(blp.txnLineBuffer) - - // No dml events - no point in sending this. - if dmlCount == 0 { - return - } - - sendStream(sendReply, blp.responseStream) - blp.TxnCount += 1 -} - -// Other utility functions. - -// This reads a binlog log line. -func (blp *Blp) readBlpLine(lineReader *bufio.Reader, bigLine []byte) (line []byte, err error) { - for { - tempLine, tempErr := lineReader.ReadSlice('\n') - if tempErr == bufio.ErrBufferFull { - blp.BufferFullErrors++ - bigLine = append(bigLine, tempLine...) - continue - } else if tempErr != nil { - err = tempErr - } else if len(bigLine) > 0 { - if len(tempLine) > 0 { - bigLine = append(bigLine, tempLine...) - } - line = bigLine[:len(bigLine)-1] - blp.BigLineCount++ - } else { - line = tempLine[:len(tempLine)-1] - } - break - } - return line, err -} - -// This function streams the raw binlog. -func (blp *Blp) getBinlogStream(writer *os.File, blr *BinlogReader, readErrChan chan error) { - defer func() { - if err := recover(); err != nil { - log.Errorf("getBinlogStream failed: %v", err) - readErrChan <- err.(error) - } - }() - blr.ServeData(writer, blp.startPosition.MasterFilename, int64(blp.startPosition.MasterPosition)) - readErrChan <- nil -} - -// This builds UpdateResponse for each transaction. -func buildTxnResponse(trxnLineBuffer []*eventBuffer) (txnResponseList []*UpdateResponse, dmlCount uint) { - var err error - var line []byte - var dmlType string - var eventNodeTree *parser.Node - var autoincId uint64 - dmlBuffer := make([][]byte, 0, 10) - - for _, event := range trxnLineBuffer { - line = event.LogLine - if bytes.HasPrefix(line, BINLOG_BEGIN) { - streamBuf := new(UpdateResponse) - streamBuf.Coord = event.Coord - streamBuf.Data.SqlType = BEGIN - txnResponseList = append(txnResponseList, streamBuf) - continue - } - if bytes.HasPrefix(line, BINLOG_COMMIT) { - commitEvent := createCommitEvent(event) - txnResponseList = append(txnResponseList, commitEvent) - continue - } - if bytes.HasPrefix(line, BINLOG_SET_INSERT) { - autoincId, err = strconv.ParseUint(string(line[len(BINLOG_SET_INSERT):]), 10, 64) - if err != nil { - panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting AutoInc Id %v", err))) - } - continue - } - - sqlType := GetSqlType(event.firstKw) - if sqlType == DML { - // valid dml - commentIndex := bytes.Index(line, STREAM_COMMENT_START) - // stream comment not found. - if commentIndex == -1 { - if event.firstKw != "insert" { - log.Warningf("Invalid DML - doesn't have a valid stream comment : %v", string(line)) - } - dmlBuffer = dmlBuffer[:0] - continue - // FIXME: track such cases and potentially change them to errors at a later point. - // panic(NewBinlogParseError(fmt.Sprintf("Invalid DML, doesn't have a valid stream comment. Sql: %v", string(line)))) - } - dmlBuffer = append(dmlBuffer, line) - streamComment := string(line[commentIndex+len(STREAM_COMMENT_START):]) - eventNodeTree, err = parseStreamComment(streamComment) - if err != nil { - panic(err) - } - dmlType = GetDmlType(event.firstKw) - response, _ := createUpdateResponse(eventNodeTree, dmlType, event.Coord, autoincId) - txnResponseList = append(txnResponseList, response) - autoincId = 0 - dmlBuffer = dmlBuffer[:0] - dmlCount += 1 - } else { - // add as prefixes to the DML from last DML. - // start a new dml buffer and keep adding to it. - dmlBuffer = append(dmlBuffer, line) - } - } - - return -} - -// This builds UpdateResponse from the parsed tree, also handles a multi-row update. -func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos proto.BinlogPosition, autoincId uint64) (response *UpdateResponse, newid uint64) { - if eventTree.Len() < 3 { - panic(NewBinlogParseError(EVENT_ERROR, fmt.Sprintf("Invalid comment structure, len of tree %v", eventTree.Len()))) - } - - tableName := string(eventTree.At(0).Value) - pkColNamesNode := eventTree.At(1) - pkColNames := make([]string, 0, pkColNamesNode.Len()) - for _, pkCol := range pkColNamesNode.Sub { - if pkCol.Type != parser.ID { - panic(NewBinlogParseError(EVENT_ERROR, "Error in the stream comment, illegal type for column name.")) - } - pkColNames = append(pkColNames, string(pkCol.Value)) - } - pkColLen := pkColNamesNode.Len() - - response = new(UpdateResponse) - response.Coord = blpPos - response.Data.SqlType = dmlType - response.Data.TableName = tableName - response.Data.PkColNames = pkColNames - response.Data.PkValues = make([][]interface{}, 0, len(eventTree.Sub[2:])) - - rowPk := make([]interface{}, pkColLen) - for _, node := range eventTree.Sub[2:] { - rowPk = rowPk[:0] - if node.Len() != pkColLen { - panic(NewBinlogParseError(EVENT_ERROR, "Error in the stream comment, length of pk values doesn't match column names.")) - } - rowPk, _, err := encodePkValues(node.Sub, int64(autoincId)) - if err != nil { - panic(err) - } - response.Data.PkValues = append(response.Data.PkValues, rowPk) - } - return response, autoincId -} - -// This sends the stream to the client. -func sendStream(sendReply SendUpdateStreamResponse, responseBuf []*UpdateResponse) { - for _, event := range responseBuf { - if err := sendReply(event); err != nil { - panic(NewBinlogParseError(CONNECTION_ERROR, fmt.Sprintf("Error in sending event to client %v", err))) - } - } -} - -// This creates the response for COMMIT event. -func createCommitEvent(eventBuf *eventBuffer) (streamBuf *UpdateResponse) { - streamBuf = new(UpdateResponse) - streamBuf.Coord = eventBuf.Coord - streamBuf.Data.SqlType = COMMIT - return -} - -// This creates the response for DDL event. -func createDdlStream(lineBuffer *eventBuffer) (ddlStream *UpdateResponse) { - ddlStream = new(UpdateResponse) - ddlStream.Coord = lineBuffer.Coord - ddlStream.Data.SqlType = DDL - ddlStream.Data.Sql = string(lineBuffer.LogLine) - return ddlStream -} - -func GetSqlType(firstKw string) string { - sqlType, _ := SqlKwMap[firstKw] - return sqlType -} - -func GetDmlType(firstKw string) string { - sqlType, ok := SqlKwMap[firstKw] - if ok && sqlType == DML { - return firstKw - } - return "" -} - -func IgnoredStatement(line []byte) bool { - if bytes.HasPrefix(line, COMMENT) || - bytes.HasPrefix(line, SET_SESSION_VAR) || - bytes.HasPrefix(line, DELIMITER) || - bytes.HasPrefix(line, BINLOG) || - bytes.HasPrefix(line, BINLOG_DB_CHANGE) { - return true - } - return false -} - -func IsTxnStatement(line []byte, firstKw string) bool { - if GetSqlType(firstKw) == DML || - bytes.HasPrefix(line, BINLOG_SET_TIMESTAMP) || - bytes.HasPrefix(line, BINLOG_SET_INSERT) { - return true - } - return false -} diff --git a/go/vt/mysqlctl/binlog_reader.go b/go/vt/mysqlctl/binlog_reader.go deleted file mode 100644 index 567f96b928b..00000000000 --- a/go/vt/mysqlctl/binlog_reader.go +++ /dev/null @@ -1,235 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package mysqlctl - -/* -the binlogreader is intended to "tail -f" a binlog, but be smart enough -to stop tailing it when mysql is done writing to that binlog. The stop -condition is if EOF is reached *and* the next file has appeared. -*/ - -import ( - "encoding/binary" - "fmt" - log "github.com/golang/glog" - "io" - "os" - "path" - "strconv" - "strings" - "time" -) - -const ( - BINLOG_HEADER_SIZE = 4 // copied from mysqlbinlog.cc for mysql 5.0.33 - EVENT_HEADER_SIZE = 19 // 4.0 and above, can be larger in 5.x - BINLOG_BLOCK_SIZE = 16 * 1024 - MYSQLBINLOG_CHUNK = 64 * 1024 - MAX_WAIT_TIMEOUT = 30 * time.Second - LOG_WAIT_TIMEOUT = time.Second / 2 -) - -type brStats struct { - Reads int64 - Bytes int64 - Sleeps int64 - StartTime time.Time -} - -type BinlogReader struct { - binLogPrefix string - - // these parameters will have reasonable default values but can be tuned - BinlogBlockSize int64 - MaxWaitTimeout time.Duration - LogWaitTimeout time.Duration -} - -func (blr *BinlogReader) binLogPathForId(fileId int) string { - return fmt.Sprintf("%v.%06d", blr.binLogPrefix, fileId) -} - -func NewBinlogReader(binLogPrefix string) *BinlogReader { - return &BinlogReader{binLogPrefix: binLogPrefix, - BinlogBlockSize: BINLOG_BLOCK_SIZE, - MaxWaitTimeout: MAX_WAIT_TIMEOUT, - LogWaitTimeout: LOG_WAIT_TIMEOUT} -} - -/* - based on http://forge.mysql.com/wiki/MySQL_Internals_Binary_Log - +=====================================+ - | event | timestamp 0 : 4 | - | header +----------------------------+ - | | type_code 4 : 1 | - | +----------------------------+ - | | server_id 5 : 4 | - | +----------------------------+ - | | event_length 9 : 4 | - | +----------------------------+ - | | next_position 13 : 4 | - | +----------------------------+ - | | flags 17 : 2 | - +=====================================+ - | event | fixed part 19 : y | - | data +----------------------------+ - | | variable part | - +=====================================+ -*/ -func readFirstEventSize(binlog io.ReadSeeker) uint32 { - pos, _ := binlog.Seek(0, 1) - defer binlog.Seek(pos, 0) - - if _, err := binlog.Seek(BINLOG_HEADER_SIZE+9, 0); err != nil { - panic("failed binlog seek: " + err.Error()) - } - - var eventLength uint32 - if err := binary.Read(binlog, binary.LittleEndian, &eventLength); err != nil { - panic("failed binlog read: " + err.Error()) - } - return eventLength -} - -// return open log file and the name of the next log path to watch -func (blr *BinlogReader) open(name string) (*os.File, string) { - ext := path.Ext(name) - fileId, err := strconv.Atoi(ext[1:]) - if err != nil { - panic(fmt.Errorf("bad binlog name: %v", name)) - } - logPath := blr.binLogPathForId(fileId) - if !strings.HasSuffix(logPath, name) { - panic(fmt.Errorf("binlog name mismatch: %v vs %v", logPath, name)) - } - file, err := os.Open(logPath) - if err != nil { - panic(err) - } - nextLog := blr.binLogPathForId(fileId + 1) - return file, nextLog -} - -func (blr *BinlogReader) ServeData(writer io.Writer, filename string, startPosition int64) { - brStats := brStats{StartTime: time.Now()} - - binlogFile, nextLog := blr.open(filename) - defer binlogFile.Close() - positionWaitStart := make(map[int64]time.Time) - - if startPosition > 0 { - size, err := binlogFile.Seek(0, 2) - if err != nil { - panic(fmt.Errorf("BinlogReader.ServeData seek err: %v", err)) - } - if startPosition > size { - panic(fmt.Errorf("BinlogReader.ServeData: start position %v greater than size %v", startPosition, size)) - } - - // inject the header again to fool mysqlbinlog - // FIXME(msolomon) experimentally determine the header size. - // 5.1.50 is 106, 5.0.24 is 98 - firstEventSize := readFirstEventSize(binlogFile) - prefixSize := int64(BINLOG_HEADER_SIZE + firstEventSize) - - position, err := binlogFile.Seek(0, 0) - if err == nil { - _, err = io.CopyN(writer, binlogFile, prefixSize) - } - if err != nil { - panic(fmt.Errorf("BinlogReader.ServeData err: %v", err)) - } - position, err = binlogFile.Seek(startPosition, 0) - if err != nil { - panic(fmt.Errorf("Failed BinlogReader seek to startPosition %v @ %v:%v", startPosition, binlogFile.Name(), position)) - } - } - - buf := make([]byte, blr.BinlogBlockSize) - for { - buf = buf[:0] - position, _ := binlogFile.Seek(0, 1) - written, err := copyBufN(writer, binlogFile, blr.BinlogBlockSize, buf) - if err != nil && err != io.EOF { - panic(fmt.Errorf("BinlogReader.serve err: %v", err)) - } - //log.Infof("BinlogReader copy @ %v:%v,%v", binlogFile.Name(), position, written) - - brStats.Reads++ - brStats.Bytes += written - - //EOF is implicit in this case - either we have reached end of current file - //and are waiting for more data or rotating to next log. - if written != blr.BinlogBlockSize { - if _, statErr := os.Stat(nextLog); statErr == nil { - // swap to next log file - size, _ := binlogFile.Seek(0, 2) - if size == position+written { - //log.Infof("BinlogReader swap log file: %v", nextLog) - binlogFile.Close() - binlogFile, nextLog = blr.open(nextLog) - positionWaitStart = make(map[int64]time.Time) - binlogFile.Seek(BINLOG_HEADER_SIZE, 0) - } - } else { - position, _ := binlogFile.Seek(0, 1) - //log.Infof("BinlogReader wait for more data: %v:%v %v", binlogFile.Name(), position, blr.LogWaitTimeout) - // wait for more data - time.Sleep(blr.LogWaitTimeout) - brStats.Sleeps++ - now := time.Now() - if lastSlept, ok := positionWaitStart[position]; ok { - if now.Sub(lastSlept) > blr.MaxWaitTimeout { - //vt_mysqlbinlog reads in chunks of 64k bytes, the code below pads null bytes so the remaining data - //in the buffer can be flushed before closing this stream. This manifests itself as end of log file, - //and would make the upstream code flow exit gracefully. - nullPadLen := MYSQLBINLOG_CHUNK - written - emptyBuf := make([]byte, MYSQLBINLOG_CHUNK) - _, err = writer.Write(emptyBuf[0:nullPadLen]) - if err != nil { - //log.Warningf("Error in writing pad bytes to vt_mysqlbinlog %v", err) - panic(fmt.Errorf("Error in writing pad bytes to vt_mysqlbinlog %v", err)) - } - log.Infof("MAX_WAIT_TIMEOUT %v exceeded, closing connection", blr.MaxWaitTimeout) - return - //panic(fmt.Errorf("MAX_WAIT_TIMEOUT %v exceeded, closing connection", blr.MaxWaitTimeout)) - } - } else { - positionWaitStart[position] = now - } - } - } - } -} - -func copyBufN(dst io.Writer, src io.Reader, totalLen int64, buf []byte) (written int64, err error) { - for written < totalLen { - toBeRead := totalLen - if diffLen := totalLen - written; diffLen < toBeRead { - toBeRead = diffLen - } - nr, er := src.Read(buf[0:toBeRead]) - if nr > 0 { - nw, ew := dst.Write(buf[0:nr]) - if nw > 0 { - written += int64(nw) - } - if ew != nil { - err = ew - break - } - if nr != nw { - log.Warningf("Short write to dst") - err = io.ErrShortWrite - break - } - } - if er != nil { - err = er - break - } - } - return written, err -} diff --git a/go/vt/mysqlctl/binlog_server.go b/go/vt/mysqlctl/binlog_server.go deleted file mode 100644 index 3cb72544189..00000000000 --- a/go/vt/mysqlctl/binlog_server.go +++ /dev/null @@ -1,769 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// vt binlog server: Serves binlog for out of band replication. -package mysqlctl - -import ( - "bufio" - "bytes" - "fmt" - "io" - "os" - "path" - "strconv" - "strings" - "time" - - log "github.com/golang/glog" - "github.com/youtube/vitess/go/rpcwrap" - "github.com/youtube/vitess/go/stats" - "github.com/youtube/vitess/go/sync2" - "github.com/youtube/vitess/go/vt/key" - "github.com/youtube/vitess/go/vt/mysqlctl/proto" -) - -const ( - BINLOG_SERVER_DISABLED int64 = iota - BINLOG_SERVER_ENABLED -) - -var blsStateNames = map[int64]string{ - BINLOG_SERVER_DISABLED: "Disabled", - BINLOG_SERVER_ENABLED: "Enabled", -} - -var ( - USER_ID = []byte("user_id") - END_COMMENT = []byte("*/") - _VT = []byte("_vt.") - HEARTBEAT = []byte("heartbeat") - ADMIN = []byte("admin") -) - -type blsStats struct { - parseStats *stats.Counters - dmlCount *stats.Counters - txnCount *stats.Counters - queriesPerSec *stats.Rates - txnsPerSec *stats.Rates -} - -func newBlsStats() *blsStats { - bs := &blsStats{} - bs.parseStats = stats.NewCounters("BinlogServerParseEvent") - bs.txnCount = stats.NewCounters("BinlogServerTxnCount") - bs.dmlCount = stats.NewCounters("BinlogServerDmlCount") - bs.queriesPerSec = stats.NewRates("BinlogServerQPS", bs.dmlCount, 15, 60e9) - bs.txnsPerSec = stats.NewRates("BinlogServerTPS", bs.txnCount, 15, 60e9) - return bs -} - -type BinlogServer struct { - dbname string - mysqld *Mysqld - blsStats *blsStats - state sync2.AtomicInt64 - - // interrupted is used to stop the serving clients when the service - // gets interrupted. It is created when the service is enabled. - // It is closed when the service is disabled. - interrupted chan struct{} -} - -//Raw event buffer used to gather data during parsing. -type blsEventBuffer struct { - proto.BinlogPosition - LogLine []byte - firstKw string -} - -func newBlsEventBuffer(pos *proto.BinlogPosition, line []byte) *blsEventBuffer { - buf := &blsEventBuffer{} - buf.LogLine = make([]byte, len(line)) - //buf.LogLine = append(buf.LogLine, line...) - written := copy(buf.LogLine, line) - if written < len(line) { - log.Warningf("Problem in copying logline to new buffer, written %v, len %v", written, len(line)) - } - buf.BinlogPosition = *pos - buf.BinlogPosition.Timestamp = pos.Timestamp - return buf -} - -type Bls struct { - nextStmtPosition uint64 - inTxn bool - txnLineBuffer []*blsEventBuffer - responseStream []*proto.BinlogResponse - initialSeek bool - startPosition *proto.ReplicationCoordinates - currentPosition *proto.BinlogPosition - dbmatch bool - keyRange key.KeyRange - keyrangeTag string - globalState *BinlogServer - binlogPrefix string - //FIXME: this is for debug, remove it. - currentLine string - blsStats *blsStats -} - -func newBls(startCoordinates *proto.ReplicationCoordinates, blServer *BinlogServer, keyRange *key.KeyRange) *Bls { - blp := &Bls{} - blp.startPosition = startCoordinates - blp.keyRange = *keyRange - blp.currentPosition = &proto.BinlogPosition{} - blp.currentPosition.Position = *startCoordinates - blp.inTxn = false - blp.initialSeek = true - blp.txnLineBuffer = make([]*blsEventBuffer, 0, MAX_TXN_BATCH) - blp.responseStream = make([]*proto.BinlogResponse, 0, MAX_TXN_BATCH) - blp.globalState = blServer - //by default assume that the db matches. - blp.dbmatch = true - blp.keyrangeTag = string(keyRange.Start.Hex()) + "-" + string(keyRange.End.Hex()) - return blp -} - -type BinlogServerError struct { - Msg string -} - -func newBinlogServerError(msg string) *BinlogServerError { - return &BinlogServerError{Msg: msg} -} - -func (err BinlogServerError) Error() string { - return err.Msg -} - -func (blp *Bls) streamBinlog(sendReply proto.SendBinlogResponse, interrupted chan struct{}) { - var readErr error - defer func() { - reqIdentifier := fmt.Sprintf("%v, line: '%v'", blp.currentPosition.Position.String(), blp.currentLine) - if x := recover(); x != nil { - serr, ok := x.(*BinlogServerError) - if !ok { - log.Errorf("[%v:%v] Uncaught panic for stream @ %v, err: %v ", blp.keyRange.Start.Hex(), blp.keyRange.End.Hex(), reqIdentifier, x) - panic(x) - } - err := *serr - if readErr != nil { - log.Errorf("[%v:%v] StreamBinlog error @ %v, error: %v, readErr %v", blp.keyRange.Start.Hex(), blp.keyRange.End.Hex(), reqIdentifier, err, readErr) - err = BinlogServerError{Msg: fmt.Sprintf("%v, readErr: %v", err, readErr)} - } else { - log.Errorf("[%v:%v] StreamBinlog error @ %v, error: %v", blp.keyRange.Start.Hex(), blp.keyRange.End.Hex(), reqIdentifier, err) - } - sendError(sendReply, reqIdentifier, err, blp.currentPosition) - } - }() - - blr := NewBinlogReader(blp.binlogPrefix) - blr.MaxWaitTimeout = 10 * time.Second - - var binlogReader io.Reader - var blrReader, blrWriter *os.File - var err, pipeErr error - - blrReader, blrWriter, pipeErr = os.Pipe() - if pipeErr != nil { - panic(newBinlogServerError(pipeErr.Error())) - } - defer blrWriter.Close() - defer blrReader.Close() - - readErrChan := make(chan error, 1) - //This reads the binlogs - read end of data pipeline. - go blp.getBinlogStream(blrWriter, blr, readErrChan) - - //Decode end of the data pipeline. - binlogDecoder := new(BinlogDecoder) - binlogReader, err = binlogDecoder.DecodeMysqlBinlog(blrReader) - if err != nil { - panic(newBinlogServerError(err.Error())) - } - - //This function monitors the exit of read data pipeline. - go func(readErr *error, readErrChan chan error, binlogDecoder *BinlogDecoder) { - select { - case *readErr = <-readErrChan: - //log.Infof("Read data-pipeline returned readErr: '%v'", *readErr) - if *readErr != nil { - binlogDecoder.Kill() - } - case <-interrupted: - *readErr = fmt.Errorf("BinlogServer service disabled") - binlogDecoder.Kill() - } - }(&readErr, readErrChan, binlogDecoder) - - blp.parseBinlogEvents(sendReply, binlogReader) -} - -func (blp *Bls) getBinlogStream(writer *os.File, blr *BinlogReader, readErrChan chan error) { - defer func() { - if err := recover(); err != nil { - log.Errorf("getBinlogStream failed: %v", err) - readErrChan <- err.(error) - } - }() - blr.ServeData(writer, blp.startPosition.MasterFilename, int64(blp.startPosition.MasterPosition)) - readErrChan <- nil -} - -//Main parse loop -func (blp *Bls) parseBinlogEvents(sendReply proto.SendBinlogResponse, binlogReader io.Reader) { - // read over the stream and buffer up the transactions - var err error - var line []byte - bigLine := make([]byte, 0, BINLOG_BLOCK_SIZE) - lineReader := bufio.NewReaderSize(binlogReader, BINLOG_BLOCK_SIZE) - readAhead := false - var event *blsEventBuffer - var delimIndex int - - for { - line = line[:0] - bigLine = bigLine[:0] - line, err = blp.readBlsLine(lineReader, bigLine) - if err != nil { - if err == io.EOF { - //end of stream - blp.globalState.blsStats.parseStats.Add("EOFErrors."+blp.keyrangeTag, 1) - panic(newBinlogServerError(fmt.Sprintf("EOF"))) - } - panic(newBinlogServerError(fmt.Sprintf("ReadLine err: , %v", err))) - } - if len(line) == 0 { - continue - } - - if line[0] == '#' { - //parse positional data - line = bytes.TrimSpace(line) - blp.currentLine = string(line) - blp.parsePositionData(line) - } else { - //parse event data - - if readAhead { - event.LogLine = append(event.LogLine, line...) - } else { - event = newBlsEventBuffer(blp.currentPosition, line) - } - - delimIndex = bytes.LastIndex(event.LogLine, BINLOG_DELIMITER) - if delimIndex != -1 { - event.LogLine = event.LogLine[:delimIndex] - readAhead = false - } else { - readAhead = true - continue - } - - event.LogLine = bytes.TrimSpace(event.LogLine) - event.firstKw = string(bytes.ToLower(bytes.SplitN(event.LogLine, SPACE, 2)[0])) - - blp.currentLine = string(event.LogLine) - - //processes statements only for the dbname that it is subscribed to. - blp.parseDbChange(event) - blp.parseEventData(sendReply, event) - } - } -} - -//This reads a binlog log line. -func (blp *Bls) readBlsLine(lineReader *bufio.Reader, bigLine []byte) (line []byte, err error) { - for { - tempLine, tempErr := lineReader.ReadSlice('\n') - if tempErr == bufio.ErrBufferFull { - bigLine = append(bigLine, tempLine...) - blp.globalState.blsStats.parseStats.Add("BufferFullErrors."+blp.keyrangeTag, 1) - continue - } else if tempErr != nil { - log.Errorf("[%v:%v] Error in reading %v, data read %v", blp.keyRange.Start.Hex(), blp.keyRange.End.Hex(), tempErr, string(tempLine)) - err = tempErr - } else if len(bigLine) > 0 { - if len(tempLine) > 0 { - bigLine = append(bigLine, tempLine...) - } - line = bigLine[:len(bigLine)-1] - blp.globalState.blsStats.parseStats.Add("BigLineCount."+blp.keyrangeTag, 1) - } else { - line = tempLine[:len(tempLine)-1] - } - break - } - return line, err -} - -//Function to set the dbmatch variable, this parses the "Use " statement. -func (blp *Bls) parseDbChange(event *blsEventBuffer) { - if event.firstKw != proto.USE { - return - } - if blp.globalState.dbname == "" { - log.Warningf("dbname is not set, will match all database names") - return - } - blp.globalState.blsStats.parseStats.Add("DBChange."+blp.keyrangeTag, 1) - - new_db := string(bytes.TrimSpace(bytes.SplitN(event.LogLine, BINLOG_DB_CHANGE, 2)[1])) - if new_db != blp.globalState.dbname { - blp.dbmatch = false - } else { - blp.dbmatch = true - } -} - -func (blp *Bls) parsePositionData(line []byte) { - if bytes.HasPrefix(line, BINLOG_POSITION_PREFIX) { - //Master Position - if blp.nextStmtPosition == 0 { - return - } - } else if bytes.Index(line, BINLOG_ROTATE_TO) != -1 { - blp.parseRotateEvent(line) - } else if bytes.Index(line, BINLOG_END_LOG_POS) != -1 { - //Ignore the position data that appears at the start line of binlog. - if bytes.Index(line, BINLOG_START) != -1 { - return - } - blp.parseMasterPosition(line) - if blp.nextStmtPosition != 0 { - blp.currentPosition.Position.MasterPosition = blp.nextStmtPosition - } - } - if bytes.Index(line, BINLOG_XID) != -1 { - blp.parseXid(line) - } - // FIXME(shrutip): group_id is most relevant for commit events - // check how group_id is set for ddls and possibly move this block - // in parseXid - if bytes.Index(line, BINLOG_GROUP_ID) != -1 { - blp.parseGroupId(line) - } -} - -func (blp *Bls) parseEventData(sendReply proto.SendBinlogResponse, event *blsEventBuffer) { - if bytes.HasPrefix(event.LogLine, BINLOG_SET_TIMESTAMP) { - blp.extractEventTimestamp(event) - blp.initialSeek = false - if blp.inTxn { - blp.txnLineBuffer = append(blp.txnLineBuffer, event) - } - } else if bytes.HasPrefix(event.LogLine, BINLOG_BEGIN) { - blp.handleBeginEvent(event) - } else if bytes.HasPrefix(event.LogLine, BINLOG_ROLLBACK) { - blp.inTxn = false - blp.txnLineBuffer = blp.txnLineBuffer[:0] - } else if bytes.HasPrefix(event.LogLine, BINLOG_COMMIT) { - blp.handleCommitEvent(sendReply, event) - blp.inTxn = false - blp.txnLineBuffer = blp.txnLineBuffer[:0] - } else if len(event.LogLine) > 0 { - if blp.inTxn && IsTxnStatement(event.LogLine, event.firstKw) { - blp.txnLineBuffer = append(blp.txnLineBuffer, event) - } else { - sqlType := proto.GetSqlType(event.firstKw) - switch sqlType { - case proto.DDL: - blp.handleDdlEvent(sendReply, event) - case proto.DML: - lineBuf := make([][]byte, 0, 10) - for _, dml := range blp.txnLineBuffer { - lineBuf = append(lineBuf, dml.LogLine) - } - // FIXME(alainjobart) in these cases, we - // probably want to skip that event and keep - // going. Or at least offer the option to do - // so somewhere. - panic(newBinlogServerError(fmt.Sprintf("DML outside a txn - len %v, dml '%v', txn buffer '%v'", len(blp.txnLineBuffer), string(event.LogLine), string(bytes.Join(lineBuf, SEMICOLON_BYTE))))) - default: - //Ignore these often occuring statement types. - if !IgnoredStatement(event.LogLine) { - log.Warningf("Unknown statement '%v'", string(event.LogLine)) - } - } - } - } -} - -/* -Position Parsing Functions. -*/ - -func (blp *Bls) parseMasterPosition(line []byte) { - var err error - rem := bytes.SplitN(line, BINLOG_END_LOG_POS, 2) - masterPosStr := string(bytes.SplitN(rem[1], SPACE, 2)[0]) - blp.nextStmtPosition, err = strconv.ParseUint(masterPosStr, 10, 64) - if err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in extracting master position, %v, sql %v, pos string %v", err, string(line), masterPosStr))) - } -} - -func (blp *Bls) parseXid(line []byte) { - rem := bytes.SplitN(line, BINLOG_XID, 2) - xid, err := strconv.ParseUint(string(rem[1]), 10, 64) - if err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in extracting Xid position %v, sql %v", err, string(line)))) - } - blp.currentPosition.Xid = xid -} - -func (blp *Bls) parseGroupId(line []byte) { - rem := bytes.SplitN(line, BINLOG_GROUP_ID, 2) - rem2 := bytes.SplitN(rem[1], SPACE, 2) - groupId := strings.TrimSpace(string(rem2[0])) - blp.currentPosition.Position.GroupId = groupId -} - -func (blp *Bls) extractEventTimestamp(event *blsEventBuffer) { - line := event.LogLine - timestampStr := string(line[len(BINLOG_SET_TIMESTAMP):]) - if timestampStr == "" { - panic(newBinlogServerError(fmt.Sprintf("Invalid timestamp line %v", string(line)))) - } - currentTimestamp, err := strconv.ParseInt(timestampStr, 10, 64) - if err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in extracting timestamp %v, sql %v", err, string(line)))) - } - blp.currentPosition.Timestamp = currentTimestamp - event.BinlogPosition.Timestamp = currentTimestamp -} - -func (blp *Bls) parseRotateEvent(line []byte) { - rem := bytes.SplitN(line, BINLOG_ROTATE_TO, 2) - rem2 := bytes.SplitN(rem[1], POS, 2) - rotateFilename := strings.TrimSpace(string(rem2[0])) - rotatePos, err := strconv.ParseUint(string(rem2[1]), 10, 64) - if err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in extracting rotate pos %v from line %s", err, string(line)))) - } - - //If the file being parsed is a binlog, - //then the rotate events only correspond to itself. - blp.currentPosition.Position.MasterFilename = rotateFilename - blp.currentPosition.Position.MasterPosition = rotatePos - blp.globalState.blsStats.parseStats.Add("BinlogRotate."+blp.keyrangeTag, 1) -} - -/* -Data event parsing and handling functions. -*/ - -func (blp *Bls) handleBeginEvent(event *blsEventBuffer) { - if len(blp.txnLineBuffer) > 0 { - if blp.inTxn { - lineBuf := make([][]byte, 0, 10) - for _, event := range blp.txnLineBuffer { - lineBuf = append(lineBuf, event.LogLine) - } - panic(newBinlogServerError(fmt.Sprintf("BEGIN encountered with non-empty trxn buffer, len: %d, buf %v", len(blp.txnLineBuffer), string(bytes.Join(lineBuf, SEMICOLON_BYTE))))) - } else { - log.Warningf("Non-zero txn buffer, while inTxn false") - } - } - blp.txnLineBuffer = blp.txnLineBuffer[:0] - blp.inTxn = true - blp.txnLineBuffer = append(blp.txnLineBuffer, event) -} - -//This creates the response for DDL event. -func blsCreateDdlStream(lineBuffer *blsEventBuffer) (ddlStream *proto.BinlogResponse) { - ddlStream = new(proto.BinlogResponse) - ddlStream.Position = lineBuffer.BinlogPosition - ddlStream.Data.SqlType = proto.DDL - ddlStream.Data.Sql = make([]string, 0, 1) - ddlStream.Data.Sql = append(ddlStream.Data.Sql, string(lineBuffer.LogLine)) - return ddlStream -} - -func (blp *Bls) handleDdlEvent(sendReply proto.SendBinlogResponse, event *blsEventBuffer) { - ddlStream := blsCreateDdlStream(event) - buf := []*proto.BinlogResponse{ddlStream} - if err := blsSendStream(sendReply, buf); err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in sending event to client %v", err))) - } - blp.globalState.blsStats.parseStats.Add("DdlCount."+blp.keyrangeTag, 1) -} - -func (blp *Bls) handleCommitEvent(sendReply proto.SendBinlogResponse, commitEvent *blsEventBuffer) { - if !blp.dbmatch { - return - } - - commitEvent.BinlogPosition.Xid = blp.currentPosition.Xid - commitEvent.BinlogPosition.Position.GroupId = blp.currentPosition.Position.GroupId - blp.txnLineBuffer = append(blp.txnLineBuffer, commitEvent) - //txn block for DMLs, parse it and send events for a txn - var dmlCount int64 - //This filters the dmls for keyrange supplied by the client. - blp.responseStream, dmlCount = blp.buildTxnResponse() - - //No dmls matched the keyspace id so return - if dmlCount == 0 { - return - } - - if err := blsSendStream(sendReply, blp.responseStream); err != nil { - panic(newBinlogServerError(fmt.Sprintf("Error in sending event to client %v", err))) - } - - blp.globalState.blsStats.dmlCount.Add("DmlCount."+blp.keyrangeTag, dmlCount) - blp.globalState.blsStats.txnCount.Add("TxnCount."+blp.keyrangeTag, 1) -} - -//This builds BinlogResponse for each transaction. -func (blp *Bls) buildTxnResponse() (txnResponseList []*proto.BinlogResponse, dmlCount int64) { - var line []byte - var keyspaceIdStr string - var keyspaceId key.KeyspaceId - - dmlBuffer := make([]string, 0, 10) - - for _, event := range blp.txnLineBuffer { - line = event.LogLine - if bytes.HasPrefix(line, BINLOG_BEGIN) { - streamBuf := new(proto.BinlogResponse) - streamBuf.Position = event.BinlogPosition - streamBuf.Data.SqlType = proto.BEGIN - txnResponseList = append(txnResponseList, streamBuf) - continue - } - if bytes.HasPrefix(line, BINLOG_COMMIT) { - commitEvent := blsCreateCommitEvent(event) - txnResponseList = append(txnResponseList, commitEvent) - continue - } - sqlType := proto.GetSqlType(event.firstKw) - if sqlType == proto.DML { - keyspaceIdStr, keyspaceId = parseKeyspaceId(line) - if keyspaceIdStr == "" { - continue - } - if !blp.keyRange.Contains(keyspaceId) { - dmlBuffer = dmlBuffer[:0] - continue - } - dmlCount += 1 - dmlBuffer = append(dmlBuffer, string(line)) - dmlEvent := blp.createDmlEvent(event, keyspaceIdStr) - dmlEvent.Data.Sql = make([]string, len(dmlBuffer)) - dmlLines := copy(dmlEvent.Data.Sql, dmlBuffer) - if dmlLines < len(dmlBuffer) { - log.Warningf("The entire dml buffer was not properly copied") - } - txnResponseList = append(txnResponseList, dmlEvent) - dmlBuffer = dmlBuffer[:0] - } else { - //add as prefixes to the DML from last DML. - //start a new dml buffer and keep adding to it. - dmlBuffer = append(dmlBuffer, string(line)) - } - } - return txnResponseList, dmlCount -} - -func (blp *Bls) createDmlEvent(eventBuf *blsEventBuffer, keyspaceId string) (dmlEvent *proto.BinlogResponse) { - //parse keyspace id - //for inserts check for index comments - dmlEvent = new(proto.BinlogResponse) - dmlEvent.Position = eventBuf.BinlogPosition - dmlEvent.Data.SqlType = proto.DML - dmlEvent.Data.KeyspaceId = keyspaceId - return dmlEvent -} - -func controlDbStatement(sql []byte) bool { - sql = bytes.ToLower(sql) - if bytes.Contains(sql, _VT) || (bytes.Contains(sql, ADMIN) && bytes.Contains(sql, HEARTBEAT)) { - return true - } - return false -} - -func parseKeyspaceId(sql []byte) (keyspaceIdStr string, keyspaceId key.KeyspaceId) { - keyspaceIndex := bytes.Index(sql, KEYSPACE_ID_COMMENT) - if keyspaceIndex == -1 { - if controlDbStatement(sql) { - log.Warningf("Ignoring no keyspace id, control db stmt %v", string(sql)) - return - } - panic(newBinlogServerError(fmt.Sprintf("Invalid Sql, doesn't contain keyspace id, sql: %v", string(sql)))) - } - seekIndex := keyspaceIndex + len(KEYSPACE_ID_COMMENT) - keyspaceIdComment := sql[seekIndex:] - keyspaceIdStr = string(bytes.TrimSpace(bytes.SplitN(keyspaceIdComment, USER_ID, 2)[0])) - if keyspaceIdStr == "" { - panic(newBinlogServerError(fmt.Sprintf("Invalid keyspace id, sql %v", string(sql)))) - } - keyspaceIdUint, err := strconv.ParseUint(keyspaceIdStr, 10, 64) - if err != nil { - panic(newBinlogServerError(fmt.Sprintf("Invalid keyspaceid, error converting it, sql %v", string(sql)))) - } - keyspaceId = key.Uint64Key(keyspaceIdUint).KeyspaceId() - return keyspaceIdStr, keyspaceId -} - -//This creates the response for COMMIT event. -func blsCreateCommitEvent(eventBuf *blsEventBuffer) (streamBuf *proto.BinlogResponse) { - streamBuf = new(proto.BinlogResponse) - streamBuf.Position = eventBuf.BinlogPosition - streamBuf.Data.SqlType = proto.COMMIT - return -} - -//This sends the stream to the client. -func blsSendStream(sendReply proto.SendBinlogResponse, responseBuf []*proto.BinlogResponse) (err error) { - for _, event := range responseBuf { - err = sendReply(event) - if err != nil { - return newBinlogServerError(fmt.Sprintf("Error in sending reply to client, %v", err)) - } - } - return nil -} - -//This sends the error to the client. -func sendError(sendReply proto.SendBinlogResponse, reqIdentifier string, inputErr error, blpPos *proto.BinlogPosition) { - var err error - streamBuf := new(proto.BinlogResponse) - streamBuf.Error = inputErr.Error() - if blpPos != nil { - streamBuf.Position = *blpPos - } - buf := []*proto.BinlogResponse{streamBuf} - err = blsSendStream(sendReply, buf) - if err != nil { - log.Errorf("Error in communicating message %v with the client: %v", inputErr, err) - } -} - -// fillAndCheckMasterPosition validates the master position sent. -// If only group_id is set, we try to resolve it. -func (blServer *BinlogServer) fillAndCheckMasterPosition(startCoordinates *proto.ReplicationCoordinates) error { - if startCoordinates.MasterFilename == "" || startCoordinates.MasterPosition <= 0 { - if startCoordinates.GroupId == "" { - return fmt.Errorf("Not a valid StartPosition") - } - - // we only have GroupId, resolve it - qr, err := blServer.mysqld.fetchSuperQuery(fmt.Sprintf("SHOW BINLOG INFO FOR %v", startCoordinates.GroupId)) - if err != nil { - return err - } - if len(qr.Rows) != 1 { - return fmt.Errorf("SHOW BINLOG INFO FOR %v failed with %d rows", startCoordinates.GroupId, len(qr.Rows)) - } - // row has Log_name, Pos, Server_ID - startCoordinates.MasterFilename = qr.Rows[0][0].String() - startCoordinates.MasterPosition, err = qr.Rows[0][1].ParseUint64() - if err != nil { - return fmt.Errorf("SHOW BINLOG INFO FOR %v returned an error parsing Pos: %v", startCoordinates.GroupId, err) - } - log.Infof("Resolved binlog position from GroupId %v to %v:%v", startCoordinates.GroupId, startCoordinates.MasterFilename, startCoordinates.MasterPosition) - } - return nil -} - -func (blServer *BinlogServer) ServeBinlog(req *proto.BinlogServerRequest, sendReply proto.SendBinlogResponse) error { - defer func() { - if x := recover(); x != nil { - // Send the error to the client: - // - If it is a BinlogServerError it's a normal error - // condition, just send it. - _, ok := x.(*BinlogServerError) - if ok { - sendError(sendReply, req.StartPosition.String(), x.(error), nil) - return - } - - // - See if it is a regular error - _, ok = x.(error) - if ok { - log.Errorf("Uncaught panic from error at top-most level: '%v'", x) - sendError(sendReply, req.StartPosition.String(), x.(error), nil) - return - } - - // - This is a panic(xxx) with xxx not an error, send - // it to the client as error - log.Errorf("Uncaught panic at top-most level: '%v'", x) - sendError(sendReply, req.StartPosition.String(), fmt.Errorf("Uncaught panic at top-most level: '%v'", x), nil) - } - }() - - log.Infof("received req: %v %v-%v", req.StartPosition.String(), req.KeyRange.Start.Hex(), req.KeyRange.End.Hex()) - if !blServer.isServiceEnabled() { - return newBinlogServerError("Binlog Server is disabled") - } - - binlogPrefix := blServer.mysqld.config.BinLogPath - logsDir := path.Dir(binlogPrefix) - if err := blServer.fillAndCheckMasterPosition(&req.StartPosition); err != nil { - return newBinlogServerError(fmt.Sprintf("Invalid start position %v, cannot serve the stream, cannot locate start position: %v", req.StartPosition, err)) - } - - blp := newBls(&req.StartPosition, blServer, &req.KeyRange) - blp.binlogPrefix = binlogPrefix - - log.Infof("blp.binlogPrefix %v logsDir %v", blp.binlogPrefix, logsDir) - blp.streamBinlog(sendReply, blServer.interrupted) - return nil -} - -func (blServer *BinlogServer) isServiceEnabled() bool { - return blServer.state.Get() == BINLOG_SERVER_ENABLED -} - -func (blServer *BinlogServer) setState(state int64) { - blServer.state.Set(state) -} - -func NewBinlogServer(mysqld *Mysqld) *BinlogServer { - binlogServer := new(BinlogServer) - binlogServer.mysqld = mysqld - binlogServer.blsStats = newBlsStats() - stats.Publish("BinlogServerState", stats.StringFunc(func() string { - return blsStateNames[binlogServer.state.Get()] - })) - return binlogServer -} - -// RegisterBinlogServerService registers the service for serving and stats. -func RegisterBinlogServerService(blServer *BinlogServer) { - rpcwrap.RegisterAuthenticated(blServer) -} - -// EnableBinlogServerService enabled the service for serving. -func EnableBinlogServerService(blServer *BinlogServer, dbname string) { - if blServer.isServiceEnabled() { - log.Warningf("Binlog Server service is already enabled") - return - } - - blServer.dbname = dbname - blServer.interrupted = make(chan struct{}, 1) - blServer.setState(BINLOG_SERVER_ENABLED) - log.Infof("Binlog Server enabled") -} - -// DisableBinlogServerService disables the service for serving. -func DisableBinlogServerService(blServer *BinlogServer) { - //If the service is already disabled, just return - if !blServer.isServiceEnabled() { - return - } - blServer.setState(BINLOG_SERVER_DISABLED) - close(blServer.interrupted) - log.Infof("Binlog Server Disabled") -} - -func IsBinlogServerEnabled(blServer *BinlogServer) bool { - return blServer.isServiceEnabled() -} diff --git a/go/vt/mysqlctl/event_streamer.go b/go/vt/mysqlctl/event_streamer.go index ef118af008d..dd4b87bbf88 100644 --- a/go/vt/mysqlctl/event_streamer.go +++ b/go/vt/mysqlctl/event_streamer.go @@ -6,11 +6,11 @@ package mysqlctl import ( "bytes" + "encoding/base64" "fmt" "strconv" "time" - "encoding/base64" "github.com/youtube/vitess/go/vt/sqlparser" ) diff --git a/go/vt/mysqlctl/proto/binlog.go b/go/vt/mysqlctl/proto/binlog.go deleted file mode 100644 index 38b79d94289..00000000000 --- a/go/vt/mysqlctl/proto/binlog.go +++ /dev/null @@ -1,115 +0,0 @@ -// Copyright 2012, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package proto - -import ( - "bytes" - "fmt" - - "github.com/youtube/vitess/go/bson" - "github.com/youtube/vitess/go/bytes2" -) - -// ReplicationCoordinates keeps track of a server position. -type ReplicationCoordinates struct { - MasterFilename string - MasterPosition uint64 - GroupId string -} - -func NewReplicationCoordinates(masterFile string, masterPos uint64, groupId string) *ReplicationCoordinates { - return &ReplicationCoordinates{ - MasterFilename: masterFile, - MasterPosition: masterPos, - GroupId: groupId, - } -} - -func (repl *ReplicationCoordinates) String() string { - return fmt.Sprintf("Master %v:%v GroupId %v", repl.MasterFilename, repl.MasterPosition, repl.GroupId) -} - -func (repl *ReplicationCoordinates) MarshalBson(buf *bytes2.ChunkedWriter) { - lenWriter := bson.NewLenWriter(buf) - - bson.EncodeString(buf, "MasterFilename", repl.MasterFilename) - bson.EncodeUint64(buf, "MasterPosition", repl.MasterPosition) - bson.EncodeString(buf, "GroupId", repl.GroupId) - - buf.WriteByte(0) - lenWriter.RecordLen() -} - -func (repl *ReplicationCoordinates) UnmarshalBson(buf *bytes.Buffer) { - bson.Next(buf, 4) - - kind := bson.NextByte(buf) - for kind != bson.EOO { - key := bson.ReadCString(buf) - switch key { - case "MasterFilename": - repl.MasterFilename = bson.DecodeString(buf, kind) - case "MasterPosition": - repl.MasterPosition = bson.DecodeUint64(buf, kind) - case "GroupId": - repl.GroupId = bson.DecodeString(buf, kind) - default: - panic(bson.NewBsonError("Unrecognized tag %s", key)) - } - kind = bson.NextByte(buf) - } -} - -// BinlogPosition keeps track of a server binlog position -type BinlogPosition struct { - Position ReplicationCoordinates - Timestamp int64 - Xid uint64 -} - -func (pos *BinlogPosition) String() string { - return fmt.Sprintf("%v:%v", pos.Position.MasterFilename, pos.Position.MasterPosition) -} - -func (pos *BinlogPosition) Valid() bool { - if pos.Position.MasterFilename == "" || pos.Position.MasterPosition == 0 { - return false - } - return true -} - -func (pos *BinlogPosition) MarshalBson(buf *bytes2.ChunkedWriter) { - lenWriter := bson.NewLenWriter(buf) - - bson.EncodePrefix(buf, bson.Object, "Position") - pos.Position.MarshalBson(buf) - - bson.EncodeInt64(buf, "Timestamp", pos.Timestamp) - bson.EncodeUint64(buf, "Xid", pos.Xid) - - buf.WriteByte(0) - lenWriter.RecordLen() -} - -func (pos *BinlogPosition) UnmarshalBson(buf *bytes.Buffer) { - bson.Next(buf, 4) - - kind := bson.NextByte(buf) - for kind != bson.EOO { - key := bson.ReadCString(buf) - switch key { - case "Position": - pos.Position = ReplicationCoordinates{} - pos.Position.UnmarshalBson(buf) - case "Timestamp": - pos.Timestamp = bson.DecodeInt64(buf, kind) - case "Xid": - pos.Xid = bson.DecodeUint64(buf, kind) - default: - panic(bson.NewBsonError("Unrecognized tag %s", key)) - } - kind = bson.NextByte(buf) - } -} diff --git a/go/vt/mysqlctl/proto/binlog_server.go b/go/vt/mysqlctl/proto/binlog_server.go deleted file mode 100644 index 9dca9a89526..00000000000 --- a/go/vt/mysqlctl/proto/binlog_server.go +++ /dev/null @@ -1,87 +0,0 @@ -// Copyright 2013, Google Inc. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -package proto - -import ( - rpc "github.com/youtube/vitess/go/rpcplus" - "github.com/youtube/vitess/go/vt/key" -) - -// Possible values for SqlType (all lower case) -const ( - // contains all the statements of the given types - DDL = "ddl" - DML = "dml" - - // transation commands - BEGIN = "begin" - COMMIT = "commit" - - // database selection - USE = "use" -) - -var sqlKwMap = map[string]string{ - "alter": DDL, - "create": DDL, - "drop": DDL, - "rename": DDL, - "truncate": DDL, - - "insert": DML, - "update": DML, - "delete": DML, - - "begin": BEGIN, - "commit": COMMIT, - - "use": USE, -} - -// GetSqlType returns one of the possible values for SqlType, or -// "" if it cannot be determined. -// firstKeyword has to be normalized to lower case first. -func GetSqlType(firstKeyword string) string { - return sqlKwMap[firstKeyword] -} - -// BinlogServerRequest represents a request to the BinlogServer service. -type BinlogServerRequest struct { - StartPosition ReplicationCoordinates - KeyRange key.KeyRange -} - -// BinlogResponse is the response from the BinlogServer service. -type BinlogResponse struct { - Error string - Position BinlogPosition - Data BinlogData -} - -// BinlogData is the payload for BinlogResponse -type BinlogData struct { - // SqlType is one of the possible constants defined earlier - SqlType string - - // Sql is the list of statements executed - Sql []string - - // KeyspaceId is used for routing of events - KeyspaceId string -} - -// SendBinlogResponse makes it easier to define this interface -type SendBinlogResponse func(response interface{}) error - -// BinlogServer interface is used to validate the RPC syntax -type BinlogServer interface { - // ServeBinlog is the streaming API entry point. - ServeBinlog(req *BinlogServerRequest, sendReply SendBinlogResponse) error -} - -// RegisterBinlogServer makes sure the server implements the right API -func RegisterBinlogServer(server BinlogServer) { - rpc.Register(server) -} diff --git a/go/vt/mysqlctl/updatestreamctl.go b/go/vt/mysqlctl/updatestreamctl.go index 66a7553fec1..b9d2a378b9f 100644 --- a/go/vt/mysqlctl/updatestreamctl.go +++ b/go/vt/mysqlctl/updatestreamctl.go @@ -13,7 +13,6 @@ import ( "github.com/youtube/vitess/go/sync2" "github.com/youtube/vitess/go/vt/dbconfigs" "github.com/youtube/vitess/go/vt/key" - "github.com/youtube/vitess/go/vt/mysqlctl/proto" ) /* API and config for UpdateStream Service */ @@ -117,7 +116,7 @@ func IsUpdateStreamEnabled() bool { return UpdateStreamRpcService.isEnabled() } -func GetReplicationPosition() (*proto.ReplicationCoordinates, error) { +func GetReplicationPosition() (string, error) { return UpdateStreamRpcService.getReplicationPosition() } @@ -238,16 +237,16 @@ func (updateStream *UpdateStream) StreamKeyrange(req *KeyrangeRequest, sendReply return bls.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), f) } -func (updateStream *UpdateStream) getReplicationPosition() (*proto.ReplicationCoordinates, error) { +func (updateStream *UpdateStream) getReplicationPosition() (string, error) { updateStream.actionLock.Lock() defer updateStream.actionLock.Unlock() if !updateStream.isEnabled() { - return nil, fmt.Errorf("update stream service is not enabled") + return "", fmt.Errorf("update stream service is not enabled") } rp, err := updateStream.mysqld.MasterStatus() if err != nil { - return nil, err + return "", err } - return proto.NewReplicationCoordinates(rp.MasterLogFile, uint64(rp.MasterLogPosition), rp.MasterLogGroupId), nil + return rp.MasterLogGroupId, nil } diff --git a/go/vt/tabletserver/rowcache_invalidator.go b/go/vt/tabletserver/rowcache_invalidator.go index 8d5158e733d..83616f4874c 100644 --- a/go/vt/tabletserver/rowcache_invalidator.go +++ b/go/vt/tabletserver/rowcache_invalidator.go @@ -75,14 +75,14 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() { DisallowQueries() }() - replPos, err := mysqlctl.GetReplicationPosition() + groupId, err := mysqlctl.GetReplicationPosition() if err != nil { log.Errorf("Rowcache invalidator could not start: cannot determine replication position: %v", err) return } log.Infof("Starting rowcache invalidator") - req := &mysqlctl.UpdateStreamRequest{GroupId: replPos.GroupId} + req := &mysqlctl.UpdateStreamRequest{GroupId: groupId} err = mysqlctl.ServeUpdateStream(req, func(reply interface{}) error { return rowCache.processEvent(reply.(*mysqlctl.StreamEvent)) }) diff --git a/go/vt/vttablet/agent.go b/go/vt/vttablet/agent.go index f0cf0d0d9f2..2890cb69aea 100644 --- a/go/vt/vttablet/agent.go +++ b/go/vt/vttablet/agent.go @@ -23,7 +23,6 @@ import ( var ( agent *tm.ActionAgent - binlogServer *mysqlctl.BinlogServer binlogPlayerMap *BinlogPlayerMap ) @@ -54,10 +53,6 @@ func InitAgent( topoServer := topo.GetServer() mysqld := mysqlctl.NewMysqld(mycnf, dbcfgs.Dba, dbcfgs.Repl) - // Start the binlog server service, disabled at start. - binlogServer = mysqlctl.NewBinlogServer(mysqld) - mysqlctl.RegisterBinlogServerService(binlogServer) - // Start the binlog player services, not playing at start. binlogPlayerMap = NewBinlogPlayerMap(topoServer, dbcfgs.App.MysqlParams(), mysqld) RegisterBinlogPlayerMap(binlogPlayerMap) @@ -151,9 +146,6 @@ func CloseAgent() { if agent != nil { agent.Stop() } - if binlogServer != nil { - mysqlctl.DisableBinlogServerService(binlogServer) - } if binlogPlayerMap != nil { binlogPlayerMap.StopAllPlayers() }