Skip to content

Commit

Permalink
storage.go: Make query tikv for ddl too (#757) (#761)
Browse files Browse the repository at this point in the history
* storage.go: Make query tikv for ddl too (#757)

Co-Authored-By: Ian <ArGregoryIan@gmail.com>
  • Loading branch information
july2993 and IANTHEREAL authored Oct 10, 2019
1 parent fe336ff commit de7bbbe
Showing 1 changed file with 36 additions and 39 deletions.
75 changes: 36 additions & 39 deletions pump/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,52 +405,49 @@ func (a *Append) resolve(startTS int64) bool {

log.Warnf("unknown commit stats start ts: %d", startTS)

if pbinlog.GetDdlJobId() == 0 {
tikvQueryCount.Add(1.0)
primaryKey := pbinlog.GetPrewriteKey()
status, err := a.tiLockResolver.GetTxnStatus(uint64(pbinlog.StartTs), primaryKey)
if err != nil {
log.Error(err)
return false
}
tikvQueryCount.Add(1.0)
primaryKey := pbinlog.GetPrewriteKey()
status, err := a.tiLockResolver.GetTxnStatus(uint64(pbinlog.StartTs), primaryKey)
if err != nil {
log.Error(err)
return false
}

// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts
if status.IsCommitted() {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = int64(status.CommitTS())

req := a.writeBinlog(cbinlog)
if req.err != nil {
log.Error(req.err)
return false
}
// Write a commit binlog myself if the status is committed,
// otherwise we can just ignore it, we will not get the commit binlog while iterator the kv by ts
if status.IsCommitted() {
// write the commit binlog myself
cbinlog := new(pb.Binlog)
cbinlog.Tp = pb.BinlogType_Commit
cbinlog.StartTs = pbinlog.StartTs
cbinlog.CommitTs = int64(status.CommitTS())

// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}
req := a.writeBinlog(cbinlog)
if req.err != nil {
log.Errorf("write missing committed binlog failed: %+v, start ts: %d commit ts: %d isDDL: %v",
req.err, startTS, status.CommitTS(), pbinlog.GetDdlJobId() > 0)
return false
}

err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
if err != nil {
log.Error(err)
return false
}
// when writeBinlog return success, the pointer will be write to kv async,
// but we need to make sure it has been write to kv when we return true in the func, then we can get this commit binlog when
// we update maxCommitTS
// write the ts -> pointer to KV here to make sure it.
pointer, err := req.valuePointer.MarshalBinary()
if err != nil {
panic(err)
}

log.Infof("known txn is committed from tikv, start ts: %d, commit ts: %d", startTS, status.CommitTS())
return true
err = a.metadata.Put(encodeTSKey(req.ts()), pointer, nil)
if err != nil {
log.Errorf("put missing committed binlog into metadata failed: %+v, start ts: %d commit ts: %d isDDL: %v",
err, startTS, status.CommitTS(), pbinlog.GetDdlJobId() > 0)
return false
}
}

log.Errorf("some prewrite DDL items remain single after waiting for a long time, startTs: %d", startTS)
return false
log.Infof("known txn is committed from tikv, start ts: %d, commit ts: %d, isDDL: %v", startTS, status.CommitTS(), pbinlog.GetDdlJobId() > 0)
return true
}

// GetBinlog gets binlog by ts
Expand Down

0 comments on commit de7bbbe

Please sign in to comment.