-
Notifications
You must be signed in to change notification settings - Fork 252
client: support versioned coprocessor routing #1871
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -58,6 +58,7 @@ cmds=( | |
| PhysicalScanLock | ||
| Cop | ||
| BatchCop | ||
| VersionedCop | ||
| MvccGetByKey | ||
| MvccGetByStartTs | ||
| SplitRegion | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -109,6 +109,7 @@ const ( | |
| CmdMPPConn // TODO: These non TiKV RPCs should be moved out of TiKV client | ||
| CmdMPPCancel // TODO: These non TiKV RPCs should be moved out of TiKV client | ||
| CmdMPPAlive // TODO: These non TiKV RPCs should be moved out of TiKV client | ||
| CmdVersionedCop | ||
|
|
||
| CmdMvccGetByKey CmdType = 1024 + iota | ||
| CmdMvccGetByStartTs | ||
|
|
@@ -192,6 +193,8 @@ func (t CmdType) String() string { | |
| return "CopStream" | ||
| case CmdBatchCop: | ||
| return "BatchCop" | ||
| case CmdVersionedCop: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest putting |
||
| return "VersionedCop" | ||
| case CmdMPPTask: | ||
| return "DispatchMPPTask" | ||
| case CmdMPPConn: | ||
|
|
@@ -490,6 +493,11 @@ func (req *Request) Cop() *coprocessor.Request { | |
| return req.Req.(*coprocessor.Request) | ||
| } | ||
|
|
||
| // VersionedCop returns coprocessor request in request. | ||
| func (req *Request) VersionedCop() *coprocessor.Request { | ||
| return req.Req.(*coprocessor.Request) | ||
| } | ||
|
|
||
| // BatchCop returns BatchCop request in request. | ||
| func (req *Request) BatchCop() *coprocessor.BatchRequest { | ||
| return req.Req.(*coprocessor.BatchRequest) | ||
|
|
@@ -697,6 +705,8 @@ func (req *Request) GetSize() int { | |
| size = req.Scan().Size() | ||
| case CmdCop: | ||
| size = req.Cop().Size() | ||
| case CmdVersionedCop: | ||
| size = req.VersionedCop().Size() | ||
| case CmdPrewrite: | ||
| size = req.Prewrite().Size() | ||
| case CmdCommit: | ||
|
|
@@ -1005,7 +1015,7 @@ func GenRegionErrorResp(req *Request, e *errorpb.Error) (*Response, error) { | |
| p = &kvrpcpb.RawChecksumResponse{ | ||
| RegionError: e, | ||
| } | ||
| case CmdCop: | ||
| case CmdCop, CmdVersionedCop: | ||
| p = &coprocessor.Response{ | ||
| RegionError: e, | ||
| } | ||
|
|
@@ -1301,6 +1311,22 @@ func CallDebugRPC(ctx context.Context, client debugpb.DebugClient, req *Request) | |
| return resp, err | ||
| } | ||
|
|
||
| // CallVersionedKV launches a versioned kv rpc call. | ||
| func CallVersionedKV(ctx context.Context, client tikvpb.VersionedKvClient, req *Request) (*Response, error) { | ||
| resp := &Response{} | ||
| var err error | ||
| switch req.Type { | ||
| case CmdVersionedCop: | ||
| resp.Resp, err = client.VersionedCoprocessor(ctx, req.VersionedCop()) | ||
| default: | ||
| return nil, errors.Errorf("invalid request type: %v", req.Type) | ||
| } | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
| } | ||
| return resp, nil | ||
| } | ||
|
|
||
| // Lease is used to implement grpc stream timeout. | ||
| type Lease struct { | ||
| Cancel context.CancelFunc | ||
|
|
@@ -1507,6 +1533,8 @@ func (req *Request) GetStartTS() uint64 { | |
| return req.Cop().GetStartTs() | ||
| case CmdBatchCop: | ||
| return req.BatchCop().GetStartTs() | ||
| case CmdVersionedCop: | ||
| return req.VersionedCop().GetStartTs() | ||
| case CmdMvccGetByStartTs: | ||
| return req.MvccGetByStartTs().GetStartTs() | ||
| default: | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BatchReqwould be used by default at L354, I suppose this may not be reached, have you executed E2E test for this PR?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"ToBatchCommandsRequest" can't converts CmdVersionedCop to BatchReq and return nil. So now CmdVersionedCop always reached this path.
And I have run some sqls in my local and it works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some performance risks. How often is this CmdVersionedCop called? If the frequency is high, I think it's necessary to modify kvproto to include it in the BatchReq and BatchResp.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, this feature does not have high performance requirements, and RPC requests are not the performance bottleneck. We can optimize it later if performance becomes an issue.”