-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
owner: add lock control in owner table info writer #154
Merged
Merged
Changes from 4 commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
68a90d0
owner: add lock control in owner table info writer
amyangfei 8ad5c87
add unit test, fix some bug
amyangfei d4dcddc
refine lock status check
amyangfei 3700ec6
remove unused interface
amyangfei e1a9d9d
Merge branch 'master' into remove-table-using-lock
amyangfei f1ad3e4
address comment
amyangfei 11c2a55
refine usage of newInfo returned from infoWriter.Write
amyangfei File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,15 +16,18 @@ package storage | |
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pingcap/ticdc/pkg/retry" | ||
|
||
"github.com/cenkalti/backoff" | ||
"github.com/coreos/etcd/clientv3" | ||
"github.com/coreos/etcd/embed" | ||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"github.com/pingcap/ticdc/cdc/kv" | ||
"github.com/pingcap/ticdc/cdc/model" | ||
"github.com/pingcap/ticdc/pkg/retry" | ||
"github.com/pingcap/ticdc/pkg/util" | ||
"github.com/pingcap/tidb/store/tikv/oracle" | ||
"go.uber.org/zap" | ||
) | ||
|
||
|
@@ -273,3 +276,140 @@ func (rw *ProcessorTsEtcdRWriter) WriteTableCLock(ctx context.Context, checkpoin | |
// when P-lock is not paired, owner can't update TableInfos in SubChangeFeedInfo | ||
return rw.retryWriteData(ctx, func() { rw.info.TableCLock = lock }, rw.updateFullInfo) | ||
} | ||
|
||
// OwnerSubCFInfoEtcdWriter encapsulates SubChangeFeedInfo write operation | ||
type OwnerSubCFInfoEtcdWriter struct { | ||
etcdClient *clientv3.Client | ||
} | ||
|
||
// NewOwnerSubCFInfoEtcdWriter returns a new `*OwnerSubCFInfoEtcdWriter` instance | ||
func NewOwnerSubCFInfoEtcdWriter(cli *clientv3.Client) *OwnerSubCFInfoEtcdWriter { | ||
return &OwnerSubCFInfoEtcdWriter{ | ||
etcdClient: cli, | ||
} | ||
} | ||
|
||
// updateInfo updates the local SubChangeFeedInfo with etcd value, except for TableInfos and TablePLock | ||
func (ow *OwnerSubCFInfoEtcdWriter) updateInfo( | ||
ctx context.Context, changefeedID, captureID string, oldInfo *model.SubChangeFeedInfo, | ||
) (newInfo *model.SubChangeFeedInfo, err error) { | ||
modRevision, info, err := kv.GetSubChangeFeedInfo(ctx, ow.etcdClient, changefeedID, captureID) | ||
if err != nil { | ||
return | ||
} | ||
|
||
// TableInfos and TablePLock is updated by owner only | ||
tableInfos := oldInfo.TableInfos | ||
pLock := oldInfo.TablePLock | ||
newInfo = info | ||
newInfo.TableInfos = tableInfos | ||
newInfo.TablePLock = pLock | ||
newInfo.ModRevision = modRevision | ||
|
||
if newInfo.TablePLock != nil { | ||
if newInfo.TableCLock == nil { | ||
err = model.ErrFindPLockNotCommit | ||
} else { | ||
// clean lock | ||
newInfo.TablePLock = nil | ||
newInfo.TableCLock = nil | ||
} | ||
} | ||
return | ||
} | ||
|
||
// checkLock checks whether there exists p-lock or whether p-lock is committed if it exists | ||
func (ow *OwnerSubCFInfoEtcdWriter) checkLock( | ||
ctx context.Context, changefeedID, captureID string, | ||
) (status model.TableLockStatus, err error) { | ||
_, info, err := kv.GetSubChangeFeedInfo(ctx, ow.etcdClient, changefeedID, captureID) | ||
if err != nil { | ||
if errors.Cause(err) == model.ErrSubChangeFeedInfoNotExists { | ||
return model.TableNoLock, nil | ||
} | ||
return | ||
} | ||
|
||
// in most cases there is no p-lock | ||
if info.TablePLock == nil { | ||
status = model.TableNoLock | ||
return | ||
} | ||
|
||
if info.TableCLock != nil { | ||
status = model.TablePLockCommited | ||
} else { | ||
status = model.TablePLock | ||
} | ||
|
||
return | ||
} | ||
|
||
// Write persists given `SubChangeFeedInfo` into etcd | ||
func (ow *OwnerSubCFInfoEtcdWriter) Write( | ||
ctx context.Context, | ||
changefeedID, captureID string, | ||
info *model.SubChangeFeedInfo, | ||
writePLock bool, | ||
) (newInfo *model.SubChangeFeedInfo, err error) { | ||
|
||
// check p-lock not exists or is already resolved | ||
lockStatus, err := ow.checkLock(ctx, changefeedID, captureID) | ||
if err != nil { | ||
return | ||
} | ||
newInfo = info | ||
switch lockStatus { | ||
case model.TableNoLock: | ||
case model.TablePLockCommited: | ||
newInfo.TablePLock = nil | ||
newInfo.TableCLock = nil | ||
case model.TablePLock: | ||
err = errors.Trace(model.ErrFindPLockNotCommit) | ||
return | ||
} | ||
|
||
if writePLock { | ||
newInfo.TablePLock = &model.TableLock{ | ||
Ts: oracle.EncodeTSO(time.Now().UnixNano() / int64(time.Millisecond)), | ||
CreatorID: util.CaptureIDFromCtx(ctx), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will prefer explicitly param here, but ok with this too now. |
||
} | ||
} | ||
|
||
key := kv.GetEtcdKeySubChangeFeed(changefeedID, captureID) | ||
err = retry.Run(func() error { | ||
value, err := newInfo.Marshal() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
resp, err := ow.etcdClient.KV.Txn(ctx).If( | ||
clientv3.Compare(clientv3.ModRevision(key), "=", newInfo.ModRevision), | ||
).Then( | ||
clientv3.OpPut(key, value), | ||
).Commit() | ||
|
||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
|
||
if !resp.Succeeded { | ||
log.Info("outdated table infos, update table and retry") | ||
newInfo, err = ow.updateInfo(ctx, changefeedID, captureID, info) | ||
switch errors.Cause(err) { | ||
case model.ErrFindPLockNotCommit: | ||
return backoff.Permanent(err) | ||
case nil: | ||
return model.ErrWriteSubChangeFeedInfoConlict | ||
default: | ||
return err | ||
} | ||
} | ||
|
||
newInfo.ModRevision = resp.Header.Revision | ||
|
||
return nil | ||
}, 5) | ||
|
||
return | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only set it when
err != nil
? IfnewInfo
isnil
on error, later calls torestoreTableInfos
may end in runtime error.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed, PTAL