Skip to content

Commit

Permalink
Merge branch 'master' into upgrade-kvproto
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 16, 2023
2 parents 15715e2 + 0f75293 commit 95ae9a5
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 52 deletions.
48 changes: 24 additions & 24 deletions pkg/storage/endpoint/gc_key_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,33 +29,33 @@ import (
"go.uber.org/zap"
)

// KeySpaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeySpaceGCSafePoint struct {
// KeyspaceGCSafePoint is gcWorker's safepoint for specific key-space
type KeyspaceGCSafePoint struct {
SpaceID string `json:"space_id"`
SafePoint uint64 `json:"safe_point,omitempty"`
}

// KeySpaceGCSafePointStorage defines the storage operations on KeySpaces' safe points
type KeySpaceGCSafePointStorage interface {
// KeyspaceGCSafePointStorage defines the storage operations on Keyspaces' safe points
type KeyspaceGCSafePointStorage interface {
// Service safe point interfaces.
SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error
LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error)
LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error)
RemoveServiceSafePoint(spaceID, serviceID string) error
// GC safe point interfaces.
SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeySpaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error)
SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error
LoadKeyspaceGCSafePoint(spaceID string) (uint64, error)
LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error)
}

var _ KeySpaceGCSafePointStorage = (*StorageEndpoint)(nil)
var _ KeyspaceGCSafePointStorage = (*StorageEndpoint)(nil)

// SaveServiceSafePoint saves service safe point under given key-space.
func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafePoint) error {
if ssp.ServiceID == "" {
return errors.New("service id of service safepoint cannot be empty")
}
key := KeySpaceServiceSafePointPath(spaceID, ssp.ServiceID)
key := KeyspaceServiceSafePointPath(spaceID, ssp.ServiceID)
value, err := json.Marshal(ssp)
if err != nil {
return err
Expand All @@ -66,7 +66,7 @@ func (se *StorageEndpoint) SaveServiceSafePoint(spaceID string, ssp *ServiceSafe
// LoadServiceSafePoint reads ServiceSafePoint for the given key-space ID and service name.
// Return nil if no safepoint exist for given service or just expired.
func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*ServiceSafePoint, error) {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
key := KeyspaceServiceSafePointPath(spaceID, serviceID)
value, err := se.Load(key)
if err != nil || value == "" {
return nil, err
Expand All @@ -90,7 +90,7 @@ func (se *StorageEndpoint) LoadServiceSafePoint(spaceID, serviceID string) (*Ser
// Note that gc worker safe point are store separately.
// If no service safe point exist for the given key-space or all the service safe points just expired, return nil.
func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time) (*ServiceSafePoint, error) {
prefix := KeySpaceServiceSafePointPrefix(spaceID)
prefix := KeyspaceServiceSafePointPrefix(spaceID)
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
Expand Down Expand Up @@ -141,20 +141,20 @@ func (se *StorageEndpoint) LoadMinServiceSafePoint(spaceID string, now time.Time

// RemoveServiceSafePoint removes target ServiceSafePoint
func (se *StorageEndpoint) RemoveServiceSafePoint(spaceID, serviceID string) error {
key := KeySpaceServiceSafePointPath(spaceID, serviceID)
key := KeyspaceServiceSafePointPath(spaceID, serviceID)
return se.Remove(key)
}

// SaveKeySpaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeySpaceGCSafePoint(spaceID string, safePoint uint64) error {
// SaveKeyspaceGCSafePoint saves GCSafePoint to the given key-space.
func (se *StorageEndpoint) SaveKeyspaceGCSafePoint(spaceID string, safePoint uint64) error {
value := strconv.FormatUint(safePoint, 16)
return se.Save(KeySpaceGCSafePointPath(spaceID), value)
return se.Save(KeyspaceGCSafePointPath(spaceID), value)
}

// LoadKeySpaceGCSafePoint reads GCSafePoint for the given key-space.
// LoadKeyspaceGCSafePoint reads GCSafePoint for the given key-space.
// Returns 0 if target safepoint not exist.
func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeySpaceGCSafePointPath(spaceID))
func (se *StorageEndpoint) LoadKeyspaceGCSafePoint(spaceID string) (uint64, error) {
value, err := se.Load(KeyspaceGCSafePointPath(spaceID))
if err != nil || value == "" {
return 0, err
}
Expand All @@ -165,23 +165,23 @@ func (se *StorageEndpoint) LoadKeySpaceGCSafePoint(spaceID string) (uint64, erro
return safePoint, nil
}

// LoadAllKeySpaceGCSafePoints returns slice of KeySpaceGCSafePoint.
// LoadAllKeyspaceGCSafePoints returns slice of KeyspaceGCSafePoint.
// If withGCSafePoint set to false, returned safePoints will be 0.
func (se *StorageEndpoint) LoadAllKeySpaceGCSafePoints(withGCSafePoint bool) ([]*KeySpaceGCSafePoint, error) {
prefix := KeySpaceSafePointPrefix()
func (se *StorageEndpoint) LoadAllKeyspaceGCSafePoints(withGCSafePoint bool) ([]*KeyspaceGCSafePoint, error) {
prefix := KeyspaceSafePointPrefix()
prefixEnd := clientv3.GetPrefixRangeEnd(prefix)
suffix := KeySpaceGCSafePointSuffix()
suffix := KeyspaceGCSafePointSuffix()
keys, values, err := se.LoadRange(prefix, prefixEnd, 0)
if err != nil {
return nil, err
}
safePoints := make([]*KeySpaceGCSafePoint, 0, len(values))
safePoints := make([]*KeyspaceGCSafePoint, 0, len(values))
for i := range keys {
// skip non gc safe points
if !strings.HasSuffix(keys[i], suffix) {
continue
}
safePoint := &KeySpaceGCSafePoint{}
safePoint := &KeyspaceGCSafePoint{}
spaceID := strings.TrimPrefix(keys[i], prefix)
spaceID = strings.TrimSuffix(spaceID, suffix)
safePoint.SpaceID = spaceID
Expand Down
22 changes: 11 additions & 11 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,33 +148,33 @@ func ExternalTimestampPath() string {
return path.Join(clusterPath, externalTimeStamp)
}

// KeySpaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// KeyspaceServiceSafePointPrefix returns the prefix of given service's service safe point.
// Prefix: /keyspaces/gc_safepoint/{space_id}/service/
func KeySpaceServiceSafePointPrefix(spaceID string) string {
func KeyspaceServiceSafePointPrefix(spaceID string) string {
return path.Join(keyspaceSafePointPrefix, spaceID, "service") + "/"
}

// KeySpaceGCSafePointPath returns the gc safe point's path of the given key-space.
// KeyspaceGCSafePointPath returns the gc safe point's path of the given key-space.
// Path: /keyspaces/gc_safepoint/{space_id}/gc
func KeySpaceGCSafePointPath(spaceID string) string {
func KeyspaceGCSafePointPath(spaceID string) string {
return path.Join(keyspaceSafePointPrefix, spaceID, keyspaceGCSafePointSuffix)
}

// KeySpaceServiceSafePointPath returns the path of given service's service safe point.
// KeyspaceServiceSafePointPath returns the path of given service's service safe point.
// Path: /keyspaces/gc_safepoint/{space_id}/service/{service_id}
func KeySpaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeySpaceServiceSafePointPrefix(spaceID), serviceID)
func KeyspaceServiceSafePointPath(spaceID, serviceID string) string {
return path.Join(KeyspaceServiceSafePointPrefix(spaceID), serviceID)
}

// KeySpaceSafePointPrefix returns prefix for all key-spaces' safe points.
// KeyspaceSafePointPrefix returns prefix for all key-spaces' safe points.
// Path: /keyspaces/gc_safepoint/
func KeySpaceSafePointPrefix() string {
func KeyspaceSafePointPrefix() string {
return keyspaceSafePointPrefix + "/"
}

// KeySpaceGCSafePointSuffix returns the suffix for any gc safepoint.
// KeyspaceGCSafePointSuffix returns the suffix for any gc safepoint.
// Postfix: /gc
func KeySpaceGCSafePointSuffix() string {
func KeyspaceGCSafePointSuffix() string {
return "/" + keyspaceGCSafePointSuffix
}

Expand Down
2 changes: 1 addition & 1 deletion server/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Storage interface {
endpoint.GCSafePointStorage
endpoint.MinResolvedTSStorage
endpoint.ExternalTSStorage
endpoint.KeySpaceGCSafePointStorage
endpoint.KeyspaceGCSafePointStorage
endpoint.KeyspaceStorage
endpoint.ResourceGroupStorage
}
Expand Down
32 changes: 16 additions & 16 deletions server/storage/storage_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,28 +97,28 @@ func TestLoadMinServiceSafePoint(t *testing.T) {
{ServiceID: "2", ExpiredAt: expireAt3, SafePoint: 300},
}

testKeySpace := "test"
testKeyspace := "test"
for _, serviceSafePoint := range serviceSafePoints {
re.NoError(storage.SaveServiceSafePoint(testKeySpace, serviceSafePoint))
re.NoError(storage.SaveServiceSafePoint(testKeyspace, serviceSafePoint))
}
// enabling failpoint to make expired key removal immediately observable
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys", "return(true)"))
minSafePoint, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime)
minSafePoint, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime)
re.NoError(err)
re.Equal(serviceSafePoints[0], minSafePoint)

// the safePoint with ServiceID 0 should be removed due to expiration
minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(150*time.Second))
minSafePoint2, err := storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(150*time.Second))
re.NoError(err)
re.Equal(serviceSafePoints[1], minSafePoint2)

// verify that service safe point with ServiceID 0 has been removed
ssp, err := storage.LoadServiceSafePoint(testKeySpace, "0")
ssp, err := storage.LoadServiceSafePoint(testKeyspace, "0")
re.NoError(err)
re.Nil(ssp)

// all remaining service safePoints should be removed due to expiration
ssp, err = storage.LoadMinServiceSafePoint(testKeySpace, currentTime.Add(500*time.Second))
ssp, err = storage.LoadMinServiceSafePoint(testKeyspace, currentTime.Add(500*time.Second))
re.NoError(err)
re.Nil(ssp)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/storage/endpoint/removeExpiredKeys"))
Expand Down Expand Up @@ -151,23 +151,23 @@ func TestSaveLoadGCSafePoint(t *testing.T) {
for i := range testSpaceIDs {
testSpaceID := testSpaceIDs[i]
testSafePoint := testSafePoints[i]
err := storage.SaveKeySpaceGCSafePoint(testSpaceID, testSafePoint)
err := storage.SaveKeyspaceGCSafePoint(testSpaceID, testSafePoint)
re.NoError(err)
loaded, err := storage.LoadKeySpaceGCSafePoint(testSpaceID)
loaded, err := storage.LoadKeyspaceGCSafePoint(testSpaceID)
re.NoError(err)
re.Equal(testSafePoint, loaded)
}
}

func TestLoadAllKeySpaceGCSafePoints(t *testing.T) {
func TestLoadAllKeyspaceGCSafePoints(t *testing.T) {
re := require.New(t)
storage := NewStorageWithMemoryBackend()
testSpaceIDs, testSafePoints := testGCSafePoints()
for i := range testSpaceIDs {
err := storage.SaveKeySpaceGCSafePoint(testSpaceIDs[i], testSafePoints[i])
err := storage.SaveKeyspaceGCSafePoint(testSpaceIDs[i], testSafePoints[i])
re.NoError(err)
}
loadedSafePoints, err := storage.LoadAllKeySpaceGCSafePoints(true)
loadedSafePoints, err := storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
Expand All @@ -181,15 +181,15 @@ func TestLoadAllKeySpaceGCSafePoints(t *testing.T) {
}

// verify that service safe points do not interfere with gc safe points.
loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(true)
loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
re.Equal(testSafePoints[i], loadedSafePoints[i].SafePoint)
}

// verify that when withGCSafePoint set to false, returned safePoints is 0
loadedSafePoints, err = storage.LoadAllKeySpaceGCSafePoints(false)
loadedSafePoints, err = storage.LoadAllKeyspaceGCSafePoints(false)
re.NoError(err)
for i := range loadedSafePoints {
re.Equal(testSpaceIDs[i], loadedSafePoints[i].SpaceID)
Expand All @@ -202,17 +202,17 @@ func TestLoadEmpty(t *testing.T) {
storage := NewStorageWithMemoryBackend()

// loading non-existing GC safepoint should return 0
gcSafePoint, err := storage.LoadKeySpaceGCSafePoint("testKeySpace")
gcSafePoint, err := storage.LoadKeyspaceGCSafePoint("testKeyspace")
re.NoError(err)
re.Equal(uint64(0), gcSafePoint)

// loading non-existing service safepoint should return nil
serviceSafePoint, err := storage.LoadServiceSafePoint("testKeySpace", "testService")
serviceSafePoint, err := storage.LoadServiceSafePoint("testKeyspace", "testService")
re.NoError(err)
re.Nil(serviceSafePoint)

// loading empty key spaces should return empty slices
safePoints, err := storage.LoadAllKeySpaceGCSafePoints(true)
safePoints, err := storage.LoadAllKeyspaceGCSafePoints(true)
re.NoError(err)
re.Empty(safePoints)
}

0 comments on commit 95ae9a5

Please sign in to comment.