Skip to content

Commit

Permalink
Introduce keyspace safepoint interface (#6419)
Browse files Browse the repository at this point in the history
ref #6487

Signed-off-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com>

Co-authored-by: AmoebaProtozoa <8039876+AmoebaProtozoa@users.noreply.github.com>
Co-authored-by: David <8039876+AmoebaProtozoa@users.noreply.github.com>
  • Loading branch information
ystaticy and AmoebaProtozoa authored May 25, 2023
1 parent f8ca1e8 commit ccb0bba
Show file tree
Hide file tree
Showing 31 changed files with 1,154 additions and 438 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type Client interface {
MetaStorageClient
// KeyspaceClient manages keyspace metadata.
KeyspaceClient
// GCClient manages gcSafePointV2 and serviceSafePointV2
GCClient
// ResourceManagerClient manages resource group metadata and token assignment.
ResourceManagerClient
// Close closes the client.
Expand Down
29 changes: 15 additions & 14 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,21 @@ const (

// client errors
var (
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientGetProtoClient = errors.Normalize("failed to get proto client", errors.RFCCodeText("PD:client:ErrClientGetProtoClient"))
ErrClientCreateTSOStream = errors.Normalize("create TSO stream failed, %s", errors.RFCCodeText("PD:client:ErrClientCreateTSOStream"))
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
ErrClientFindGroupByKeyspaceID = errors.Normalize("can't find keyspace group by keyspace id", errors.RFCCodeText("PD:client:ErrClientFindGroupByKeyspaceID"))
ErrClientWatchGCSafePointV2Stream = errors.Normalize("watch gc safe point v2 stream failed, %s", errors.RFCCodeText("PD:client:ErrClientWatchGCSafePointV2Stream"))
)

// grpcutil errors
Expand Down
137 changes: 137 additions & 0 deletions client/gc_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pd

import (
"context"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"go.uber.org/zap"
)

// GCClient is a client for doing GC
type GCClient interface {
UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error)
UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error)
WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error)
}

// UpdateGCSafePointV2 update gc safe point for the given keyspace.
func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateGCSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateGCSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
SafePoint: safePoint,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePointV2(ctx, req)
cancel()

if err = c.respForErr(cmdFailedDurationUpdateGCSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetNewSafePoint(), nil
}

// UpdateServiceSafePointV2 update service safe point for the given keyspace.
func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if span := opentracing.SpanFromContext(ctx); span != nil {
span = opentracing.StartSpan("pdclient.UpdateServiceSafePointV2", opentracing.ChildOf(span.Context()))
defer span.Finish()
}
start := time.Now()
defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }()

ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.UpdateServiceSafePointV2Request{
Header: c.requestHeader(),
KeyspaceId: keyspaceID,
ServiceId: []byte(serviceID),
SafePoint: safePoint,
Ttl: ttl,
}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
protoClient := c.getClient()
if protoClient == nil {
cancel()
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceSafePointV2(ctx, req)
cancel()
if err = c.respForErr(cmdFailedDurationUpdateServiceSafePointV2, start, err, resp.GetHeader()); err != nil {
return 0, err
}
return resp.GetMinSafePoint(), nil
}

// WatchGCSafePointV2 watch gc safe point change.
func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan []*pdpb.SafePointEvent, error) {
SafePointEventsChan := make(chan []*pdpb.SafePointEvent)
req := &pdpb.WatchGCSafePointV2Request{
Header: c.requestHeader(),
Revision: revision,
}

protoClient := c.getClient()
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
stream, err := protoClient.WatchGCSafePointV2(ctx, req)
if err != nil {
close(SafePointEventsChan)
return nil, err
}
go func() {
defer func() {
close(SafePointEventsChan)
if r := recover(); r != nil {
log.Error("[pd] panic in gc client `WatchGCSafePointV2`", zap.Any("error", r))
return
}
}()
for {
select {
case <-ctx.Done():
return
default:
resp, err := stream.Recv()
if err != nil {
log.Error("watch gc safe point v2 error", errs.ZapError(errs.ErrClientWatchGCSafePointV2Stream, err))
return
}
SafePointEventsChan <- resp.GetEvents()
}
}
}()
return SafePointEventsChan, err
}
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 8 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ var (
cmdDurationUpdateKeyspaceState prometheus.Observer
cmdDurationGet prometheus.Observer
cmdDurationPut prometheus.Observer
cmdDurationUpdateGCSafePointV2 prometheus.Observer
cmdDurationUpdateServiceSafePointV2 prometheus.Observer

cmdFailDurationGetRegion prometheus.Observer
cmdFailDurationTSO prometheus.Observer
Expand All @@ -157,6 +159,8 @@ var (
requestDurationTSO prometheus.Observer
cmdFailedDurationGet prometheus.Observer
cmdFailedDurationPut prometheus.Observer
cmdFailedDurationUpdateGCSafePointV2 prometheus.Observer
cmdFailedDurationUpdateServiceSafePointV2 prometheus.Observer
)

func initCmdDurations() {
Expand All @@ -182,6 +186,8 @@ func initCmdDurations() {
cmdDurationUpdateKeyspaceState = cmdDuration.WithLabelValues("update_keyspace_state")
cmdDurationGet = cmdDuration.WithLabelValues("get")
cmdDurationPut = cmdDuration.WithLabelValues("put")
cmdDurationUpdateGCSafePointV2 = cmdDuration.WithLabelValues("update_gc_safe_point_v2")
cmdDurationUpdateServiceSafePointV2 = cmdDuration.WithLabelValues("update_service_safe_point_v2")

cmdFailDurationGetRegion = cmdFailedDuration.WithLabelValues("get_region")
cmdFailDurationTSO = cmdFailedDuration.WithLabelValues("tso")
Expand All @@ -198,6 +204,8 @@ func initCmdDurations() {
requestDurationTSO = requestDuration.WithLabelValues("tso")
cmdFailedDurationGet = cmdFailedDuration.WithLabelValues("get")
cmdFailedDurationPut = cmdFailedDuration.WithLabelValues("put")
cmdFailedDurationUpdateGCSafePointV2 = cmdFailedDuration.WithLabelValues("update_gc_safe_point_v2")
cmdFailedDurationUpdateServiceSafePointV2 = cmdFailedDuration.WithLabelValues("update_service_safe_point_v2")
}

func registerMetrics() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
github.com/pingcap/tidb-dashboard v0.0.0-20230508075335-d6e0218addd5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -422,8 +422,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e h1:IYZyu8k7Su+QIUUcx0EPOnt3o1S5o+uh2gY6MEzUHwc=
github.com/pingcap/kvproto v0.0.0-20230519091446-c8493bee862e/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd h1:0StWSJkXtcxtPSADRz4+SEWTimuD9VMY+D71IdLKzkA=
github.com/pingcap/kvproto v0.0.0-20230522110703-23ba55f281bd/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit ccb0bba

Please sign in to comment.