Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner: add lock control in owner table info writer #154

Merged
merged 7 commits into from
Dec 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cdc/kv/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func GetSubChangeFeedInfos(ctx context.Context, client *clientv3.Client, changef
if err != nil {
return nil, err
}
info.ModRevision = rawKv.ModRevision
pinfo[captureID] = info
}
return pinfo, nil
Expand All @@ -183,7 +184,7 @@ func GetSubChangeFeedInfo(
return 0, nil, errors.Trace(err)
}
if resp.Count == 0 {
return 0, nil, errors.Errorf("subchangefeed info %s.%s not exists", changefeedID, captureID)
return 0, nil, errors.Annotatef(model.ErrSubChangeFeedInfoNotExists, "changefeed: %s, capture: %s", changefeedID, captureID)
}
info := &model.SubChangeFeedInfo{}
err = info.Unmarshal(resp.Kvs[0].Value)
Expand Down
7 changes: 5 additions & 2 deletions cdc/model/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ import (

// common errors
var (
ErrWriteTsConflict = errors.New("write ts conflict")
ErrChangeFeedNotExists = errors.New("changefeed not exists")
ErrWriteTsConflict = errors.New("write ts conflict")
ErrChangeFeedNotExists = errors.New("changefeed not exists")
ErrSubChangeFeedInfoNotExists = errors.New("subchangefeedinfo not exists")
ErrWriteSubChangeFeedInfoConlict = errors.New("write subchangefeedinfo conflict")
ErrFindPLockNotCommit = errors.New("subchangefeedinfo has p-lock not commited")
)
17 changes: 14 additions & 3 deletions cdc/model/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ type TableLock struct {
CheckpointTs uint64 `json:"checkpoint-ts"`
}

// TableLockStatus for the table lock in SubChangeFeedInfo
type TableLockStatus int

// Table lock status
const (
TableNoLock TableLockStatus = iota + 1
TablePLock
TablePLockCommited
)

// SubChangeFeedInfo records the process information of a capture
type SubChangeFeedInfo struct {
// The maximum event CommitTs that has been synchronized. This is updated by corresponding processor.
Expand All @@ -48,9 +58,10 @@ type SubChangeFeedInfo struct {
ResolvedTs uint64 `json:"resolved-ts"`
// Table information list, containing tables that processor should process, updated by ownrer, processor is read only.
// TODO change to be a map for easy update.
TableInfos []*ProcessTableInfo `json:"table-infos"`
TablePLock *TableLock `json:"table-p-lock"`
TableCLock *TableLock `json:"table-c-lock"`
TableInfos []*ProcessTableInfo `json:"table-infos"`
TablePLock *TableLock `json:"table-p-lock"`
TableCLock *TableLock `json:"table-c-lock"`
ModRevision int64 `json:"-"`
}

// String implements fmt.Stringer interface.
Expand Down
73 changes: 48 additions & 25 deletions cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pingcap/log"
pmodel "github.com/pingcap/parser/model"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/roles"
"github.com/pingcap/ticdc/cdc/roles/storage"
Expand Down Expand Up @@ -72,6 +71,7 @@ type changeFeedInfo struct {
tables map[uint64]schema.TableName
orphanTables map[uint64]model.ProcessTableInfo
toCleanTables map[uint64]struct{}
infoWriter *storage.OwnerSubCFInfoEtcdWriter
}

// String implements fmt.Stringer interface.
Expand Down Expand Up @@ -161,9 +161,14 @@ func (c *changeFeedInfo) tryBalance(ctx context.Context, captures map[string]*mo
c.banlanceOrphanTables(ctx, captures)
}

func (c *changeFeedInfo) restoreTableInfos(infoSnapshot *model.SubChangeFeedInfo, captureID string) {
c.ProcessorInfos[captureID].TableInfos = infoSnapshot.TableInfos
}

func (c *changeFeedInfo) cleanTables(ctx context.Context) {
var cleanIDs []uint64

cleanLoop:
for id := range c.toCleanTables {
captureID, subInfo, ok := findSubChangefeedWithTable(c.ProcessorInfos, id)
if !ok {
Expand All @@ -172,21 +177,30 @@ func (c *changeFeedInfo) cleanTables(ctx context.Context) {
continue
}

removedTable, _ := subInfo.RemoveTable(id)
err := kv.PutSubChangeFeedInfo(ctx, c.client, c.ID, captureID, subInfo)
if err != nil {
subInfo.TableInfos = append(subInfo.TableInfos, removedTable)
infoClone := subInfo.Clone()
subInfo.RemoveTable(id)

newInfo, err := c.infoWriter.Write(ctx, c.ID, captureID, subInfo, true)
if err == nil {
c.ProcessorInfos[captureID] = newInfo
}
switch errors.Cause(err) {
case model.ErrFindPLockNotCommit:
c.restoreTableInfos(infoClone, captureID)
log.Info("write table info delay, wait plock resolve",
zap.String("changefeed", c.ID),
zap.String("capture", captureID))
case nil:
log.Info("cleanup table success",
zap.Uint64("table id", id),
zap.String("capture id", captureID))
log.Debug("after remove", zap.Stringer("subchangefeed info", subInfo))
cleanIDs = append(cleanIDs, id)
default:
c.restoreTableInfos(infoClone, captureID)
log.Error("fail to put sub changefeed info", zap.Error(err))
break
break cleanLoop
}

log.Info("cleanup table success",
zap.Uint64("table id", id),
zap.String("capture id", captureID))

log.Debug("after remove", zap.Stringer("subchangefeed info", subInfo))

cleanIDs = append(cleanIDs, id)
}

for _, id := range cleanIDs {
Expand Down Expand Up @@ -221,25 +235,33 @@ func (c *changeFeedInfo) banlanceOrphanTables(ctx context.Context, captures map[
if info == nil {
info = new(model.SubChangeFeedInfo)
}
info.CheckPointTs = 0
info.ResolvedTs = 0
infoClone := info.Clone()
info.TableInfos = append(info.TableInfos, &model.ProcessTableInfo{
ID: tableID,
StartTs: orphan.StartTs,
})

err := kv.PutSubChangeFeedInfo(ctx, c.client, c.ID, captureID, info)
if err != nil {
newInfo, err := c.infoWriter.Write(ctx, c.ID, captureID, info, false)
if err == nil {
c.ProcessorInfos[captureID] = newInfo
}
switch errors.Cause(err) {
case model.ErrFindPLockNotCommit:
c.restoreTableInfos(infoClone, captureID)
log.Info("write table info delay, wait plock resolve",
zap.String("changefeed", c.ID),
zap.String("capture", captureID))
case nil:
log.Info("dispatch table success",
zap.Uint64("table id", tableID),
zap.Uint64("start ts", orphan.StartTs),
zap.String("capture", captureID))
delete(c.orphanTables, tableID)
default:
c.restoreTableInfos(infoClone, captureID)
log.Error("fail to put sub changefeed info", zap.Error(err))
return
}

log.Info("dispatch table success",
zap.Uint64("table id", tableID),
zap.Uint64("start ts", orphan.StartTs),
zap.String("capture", captureID))
c.ProcessorInfos[captureID] = info
delete(c.orphanTables, tableID)
}
}

Expand Down Expand Up @@ -427,6 +449,7 @@ func (o *ownerImpl) loadChangeFeedInfos(ctx context.Context) error {
TargetTs: targetTs,
ProcessorInfos: etcdChangeFeedInfo,
DDLCurrentIndex: 0,
infoWriter: storage.NewOwnerSubCFInfoEtcdWriter(o.etcdClient),
}
}

Expand Down
143 changes: 143 additions & 0 deletions cdc/roles/storage/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@ package storage

import (
"context"
"time"

"github.com/cenkalti/backoff"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/embed"
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/kv"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/retry"
"github.com/pingcap/ticdc/pkg/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -218,3 +223,141 @@ func (rw *ProcessorTsEtcdRWriter) ReadGlobalResolvedTs(ctx context.Context) (uin
func (rw *ProcessorTsEtcdRWriter) GetSubChangeFeedInfo() *model.SubChangeFeedInfo {
return rw.info
}

// OwnerSubCFInfoEtcdWriter encapsulates SubChangeFeedInfo write operation
type OwnerSubCFInfoEtcdWriter struct {
etcdClient *clientv3.Client
}

// NewOwnerSubCFInfoEtcdWriter returns a new `*OwnerSubCFInfoEtcdWriter` instance
func NewOwnerSubCFInfoEtcdWriter(cli *clientv3.Client) *OwnerSubCFInfoEtcdWriter {
return &OwnerSubCFInfoEtcdWriter{
etcdClient: cli,
}
}

// updateInfo updates the local SubChangeFeedInfo with etcd value, except for TableInfos and TablePLock
func (ow *OwnerSubCFInfoEtcdWriter) updateInfo(
ctx context.Context, changefeedID, captureID string, oldInfo *model.SubChangeFeedInfo,
) (newInfo *model.SubChangeFeedInfo, err error) {
modRevision, info, err := kv.GetSubChangeFeedInfo(ctx, ow.etcdClient, changefeedID, captureID)
if err != nil {
return
}

// TableInfos and TablePLock is updated by owner only
tableInfos := oldInfo.TableInfos
pLock := oldInfo.TablePLock
newInfo = info
newInfo.TableInfos = tableInfos
newInfo.TablePLock = pLock
newInfo.ModRevision = modRevision

if newInfo.TablePLock != nil {
if newInfo.TableCLock == nil {
err = model.ErrFindPLockNotCommit
} else {
// clean lock
newInfo.TablePLock = nil
newInfo.TableCLock = nil
}
}
return
}

// checkLock checks whether there exists p-lock or whether p-lock is committed if it exists
func (ow *OwnerSubCFInfoEtcdWriter) checkLock(
ctx context.Context, changefeedID, captureID string,
) (status model.TableLockStatus, err error) {
_, info, err := kv.GetSubChangeFeedInfo(ctx, ow.etcdClient, changefeedID, captureID)
if err != nil {
if errors.Cause(err) == model.ErrSubChangeFeedInfoNotExists {
return model.TableNoLock, nil
}
return
}

// in most cases there is no p-lock
if info.TablePLock == nil {
status = model.TableNoLock
return
}

if info.TableCLock != nil {
status = model.TablePLockCommited
} else {
status = model.TablePLock
}

return
}

// Write persists given `SubChangeFeedInfo` into etcd.
// If returned err is not nil, don't use the returned newInfo as it may be not a reasonable value.
func (ow *OwnerSubCFInfoEtcdWriter) Write(
ctx context.Context,
changefeedID, captureID string,
info *model.SubChangeFeedInfo,
writePLock bool,
) (newInfo *model.SubChangeFeedInfo, err error) {

// check p-lock not exists or is already resolved
lockStatus, err := ow.checkLock(ctx, changefeedID, captureID)
if err != nil {
return
}
newInfo = info
switch lockStatus {
case model.TableNoLock:
case model.TablePLockCommited:
newInfo.TablePLock = nil
newInfo.TableCLock = nil
case model.TablePLock:
err = errors.Trace(model.ErrFindPLockNotCommit)
return
}

if writePLock {
newInfo.TablePLock = &model.TableLock{
Ts: oracle.EncodeTSO(time.Now().UnixNano() / int64(time.Millisecond)),
CreatorID: util.CaptureIDFromCtx(ctx),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer explicitly param here, but ok with this too now.

}
}

key := kv.GetEtcdKeySubChangeFeed(changefeedID, captureID)
err = retry.Run(func() error {
value, err := newInfo.Marshal()
if err != nil {
return err
}

resp, err := ow.etcdClient.KV.Txn(ctx).If(
clientv3.Compare(clientv3.ModRevision(key), "=", newInfo.ModRevision),
).Then(
clientv3.OpPut(key, value),
).Commit()

if err != nil {
return errors.Trace(err)
}

if !resp.Succeeded {
log.Info("outdated table infos, update table and retry")
newInfo, err = ow.updateInfo(ctx, changefeedID, captureID, info)
switch errors.Cause(err) {
case model.ErrFindPLockNotCommit:
return backoff.Permanent(err)
case nil:
return model.ErrWriteSubChangeFeedInfoConlict
default:
return err
}
}

newInfo.ModRevision = resp.Header.Revision

return nil
}, 5)

return
}
Loading