Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce keyspace safepoint interface #6419

Merged
merged 50 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5eb85dc
introduce keyspace safepoint interface
ystaticy May 8, 2023
f81e14d
introduce keyspace safepoint interface
ystaticy May 8, 2023
05a5d72
change resp
ystaticy May 17, 2023
367cb26
change resp
ystaticy May 18, 2023
5df0b4b
update kvproto
ystaticy May 18, 2023
73a99cb
remove unused code
ystaticy May 18, 2023
719a4e7
add comments
ystaticy May 18, 2023
c5d768a
add comments
ystaticy May 18, 2023
680a9f8
update go.mod
ystaticy May 18, 2023
82325a4
update go.mod
ystaticy May 18, 2023
114b362
update go.mod
ystaticy May 18, 2023
c8e3219
update storage_gc_test
ystaticy May 18, 2023
7fdaca3
add comments and update go.mod
ystaticy May 18, 2023
99b28c9
add ut
ystaticy May 20, 2023
18469c0
Merge branch 'master' into keyspace_safepoint
ystaticy May 20, 2023
aeb2946
fix comments
ystaticy May 22, 2023
1f5d86b
Merge branch 'keyspace_safepoint' of github.com:ystaticy/pd into keys…
ystaticy May 22, 2023
4e267f1
fix comments
ystaticy May 22, 2023
4c99646
fix comments
ystaticy May 22, 2023
1a563b7
Merge branch 'master' into keyspace_safepoint
ystaticy May 22, 2023
7a738f0
fix comments
ystaticy May 22, 2023
dac2646
fix comments
ystaticy May 22, 2023
52a8996
fix comments
ystaticy May 22, 2023
a93570e
fix comments
ystaticy May 22, 2023
ee6f887
fix comments
ystaticy May 22, 2023
5a28d5f
fix comments
ystaticy May 22, 2023
f241113
fix comments
ystaticy May 22, 2023
bd47d38
fix comments
ystaticy May 22, 2023
89d575a
fix comments
ystaticy May 22, 2023
5cc88f8
add revision for WatchGCSafePointV2 req and reps
ystaticy May 23, 2023
7282bd1
add revision for WatchGCSafePointV2 req and reps
ystaticy May 23, 2023
d9c1c02
add revision for WatchGCSafePointV2 req and reps
ystaticy May 23, 2023
1857851
add license
ystaticy May 23, 2023
1010ef6
merge master
ystaticy May 23, 2023
10da08b
rename file
ystaticy May 23, 2023
03d20a2
fix client nil
ystaticy May 23, 2023
d0d0153
remove unused code
ystaticy May 23, 2023
1cbe95f
add err
ystaticy May 23, 2023
209072b
format code
ystaticy May 23, 2023
d5fe0fe
format code
ystaticy May 23, 2023
9341de3
format code
ystaticy May 23, 2023
7bc803e
format code
ystaticy May 23, 2023
3c92a2b
fix comments
ystaticy May 24, 2023
2cd03a3
fix comments
ystaticy May 24, 2023
109557b
fix comments
ystaticy May 24, 2023
4848ce4
add license
AmoebaProtozoa May 24, 2023
28175ca
refactor test
AmoebaProtozoa May 24, 2023
316e889
Merge branch 'master' into keyspace_safepoint
AmoebaProtozoa May 24, 2023
3755d01
sort import
AmoebaProtozoa May 25, 2023
7d98b8b
Merge branch 'master' into keyspace_safepoint
AmoebaProtozoa May 25, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type Client interface {
MetaStorageClient
// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// GCClient manages gcSafePointV2 and serviceSafePointV2
GCClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// Close closes the client.
Expand Down
29 changes: 15 additions & 14 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,21 @@ const (

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
137 changes: 137 additions & 0 deletions client/gc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// GCClient is a client for doing GC
type GCClient interface {
ystaticy marked this conversation as resolved.
Show resolved Hide resolved
UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error)
UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error)
WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error)
}

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
}

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
ServiceId: []byte(serviceID),
SafePoint: safePoint,
Ttl: ttl,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}

// WatchGCSafePointV2 watch gc safe point change.
func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) {
SafePointEventsChan := make(chan []*pdpb.SafePointEvent)
req := &pdpb.WatchGCSafePointV2Request{
Header: c.requestHeader(),
Revision: revision,
}

protoClient := c.getClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
stream, err := protoClient.WatchGCSafePointV2(ctx, req)
if err != nil {
close(SafePointEventsChan)
return nil, err
}
go func() {
defer func() {
close(SafePointEventsChan)
if r := recover(); r != nil {
log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err))
return
ystaticy marked this conversation as resolved.
Show resolved Hide resolved
ystaticy marked this conversation as resolved.
Show resolved Hide resolved
}
SafePointEventsChan <- resp.GetEvents()
}
}
}()
return SafePointEventsChan, err
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 8 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ var (
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
cmdDurationUpdateServiceSafePointV2 prometheus.Observer

cmdFailDurationGetRegion prometheus.Observer
cmdFailDurationTSO prometheus.Observer
Expand All @@ -157,6 +159,8 @@ var (
requestDurationTSO prometheus.Observer
cmdFailedDurationGet prometheus.Observer
cmdFailedDurationPut prometheus.Observer
cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer
)

func initCmdDurations() {
Expand All @@ -182,6 +186,8 @@ func initCmdDurations() {
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -198,6 +204,8 @@ func initCmdDurations() {
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get")
cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put")
cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")
}

func registerMetrics() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
File renamed without changes.
File renamed without changes.
Loading