Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

将存放比对过程中的临时结果的数据库从sqlite替换到目标redis #93

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
格式化
  • Loading branch information
tancehao authored and tancehao committed Jun 15, 2020
commit c54293a737fc25023ca68542a1ca46af58a495bf
47 changes: 23 additions & 24 deletions src/full_check/full_check/conflict.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"full_check/client"
"full_check/common"
conf "full_check/configure"
redigoredis "github.com/garyburd/redigo/redis"
"log"
"math/rand"
"os"
redigoredis "github.com/garyburd/redigo/redis"
"sync/atomic"
"time"
)
Expand All @@ -20,17 +20,17 @@ var (
)

type keyInfo struct {
Key string `json:"k"`
Type string `json:"t"`
Key string `json:"k"`
Type string `json:"t"`
ConflictType string `json:"ct"`
Db int32 `json:"db"`
SourceLen int64 `json:"sl"`
TargetLen int64 `json:"tl"`
Db int32 `json:"db"`
SourceLen int64 `json:"sl"`
TargetLen int64 `json:"tl"`
}

type fieldInfo struct {
Key string `json:"k"`
Field string `json:"f"`
Key string `json:"k"`
Field string `json:"f"`
ConflictType string `json:"ct"`
}

Expand All @@ -57,32 +57,32 @@ func (p *FullCheck) WriteConflictKey(conflictKey <-chan *common.Key) {
c := 0
for oneKeyInfo := range conflictKey {
info := &keyInfo{
Key : string(oneKeyInfo.Key),
Type: oneKeyInfo.Tp.Name,
Key: string(oneKeyInfo.Key),
Type: oneKeyInfo.Tp.Name,
ConflictType: oneKeyInfo.ConflictType.String(),
Db: p.currentDB,
SourceLen: oneKeyInfo.SourceAttr.ItemCount,
TargetLen: oneKeyInfo.TargetAttr.ItemCount,
Db: p.currentDB,
SourceLen: oneKeyInfo.SourceAttr.ItemCount,
TargetLen: oneKeyInfo.TargetAttr.ItemCount,
}
infoJson, _ := json.Marshal(info)
total := atomic.AddUint64(&(p.conflictBytesUsed), uint64(len(infoJson)))
if total > uint64(conf.Opts.ResultBytesLimit) {
if total > uint64(conf.Opts.ResultBytesLimit) {
panic(common.Logger.Errorf("too many conflicts!"))
}
_, err := rc.Do("RPUSH", keyList, string(infoJson))
if err != nil {
panic(common.Logger.Errorf("failed to exec rpush command: ", err))
}
if c == 0 {
rc.Do("EXPIRE", keyList, 3600 * 4)
c ++
rc.Do("EXPIRE", keyList, 3600*4)
c++
}
if len(oneKeyInfo.Field) != 0 {
keyFields := []*fieldInfo{}
for i := 0; i < len(oneKeyInfo.Field); i ++ {
for i := 0; i < len(oneKeyInfo.Field); i++ {
keyFields = append(keyFields, &fieldInfo{
Key: info.Key,
Field: string(oneKeyInfo.Field[i].Field),
Key: info.Key,
Field: string(oneKeyInfo.Field[i].Field),
ConflictType: oneKeyInfo.Field[i].ConflictType.String(),
})
if p.times == p.CompareCount {
Expand All @@ -97,7 +97,7 @@ func (p *FullCheck) WriteConflictKey(conflictKey <-chan *common.Key) {
if total > uint64(conf.Opts.ResultBytesLimit) {
panic(common.Logger.Errorf("too many conflicts!"))
}
_, err = rc.Do("SET", fieldsList, string(fieldsInfo), "EX", 3600 * 4)
_, err = rc.Do("SET", fieldsList, string(fieldsInfo), "EX", 3600*4)
} else {
if p.times == p.CompareCount {
if len(conf.Opts.ResultFile) != 0 {
Expand Down Expand Up @@ -136,7 +136,6 @@ func byteSlices(reply interface{}, err error) ([][]byte, error) {
return nil, fmt.Errorf("redigo: unexpected type for ByteSlices, got type %T", reply)
}


func (p *FullCheck) ScanFromDB(allkeys chan<- []*common.Key) {
conflictKeyTableName, conflictFieldTableName := p.GetLastResultTable()
keyList := fmt.Sprintf("fullcheck:%d:%s:key", random, conflictKeyTableName)
Expand All @@ -152,7 +151,7 @@ func (p *FullCheck) ScanFromDB(allkeys chan<- []*common.Key) {
if err == redigoredis.ErrNil || len(result) == 0 || len(result[0]) == 0 {
if len(keyInfoBatch) > 0 {
p.IncrScanStat(len(keyInfoBatch))
allkeys <-keyInfoBatch
allkeys <- keyInfoBatch
}
close(allkeys)
rc.Do("DEL", keyList)
Expand Down Expand Up @@ -211,8 +210,8 @@ func (p *FullCheck) ScanFromDB(allkeys chan<- []*common.Key) {
keyInfoBatch = append(keyInfoBatch, oneKeyInfo)
if len(keyInfoBatch) == p.BatchCount {
p.IncrScanStat(len(keyInfoBatch))
allkeys <-keyInfoBatch
allkeys <- keyInfoBatch
keyInfoBatch = []*common.Key{}
}
}
}
}