Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: further adaptation to PD HTTP client #48606

Merged
merged 5 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix TestIssue44369
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Nov 17, 2023
commit 933b1c788336a54d409de610e7858a765a2b9443
24 changes: 18 additions & 6 deletions pkg/executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1217,7 +1217,11 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx context.Context, sctx ses
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
storesStat, err := tikvHelper.PDHTTPClient().GetStores(ctx)
pdCli, err := tikvHelper.TryGetPDHTTPClient()
if err != nil {
return err
}
storesStat, err := pdCli.GetStores(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1648,7 +1652,11 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx context.Context, sctx
}
}
if !requestByTableRange {
allRegionsInfo, err = tikvHelper.PDHTTPClient().GetRegions(ctx)
pdCli, err := tikvHelper.TryGetPDHTTPClient()
if err != nil {
return err
}
allRegionsInfo, err = pdCli.GetRegions(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -1701,12 +1709,16 @@ func (e *memtableRetriever) getRegionsInfoForTable(ctx context.Context, h *helpe
}

func (*memtableRetriever) getRegionsInfoForSingleTable(ctx context.Context, helper *helper.Helper, tableID int64) (*pd.RegionsInfo, error) {
pdCli, err := helper.TryGetPDHTTPClient()
if err != nil {
return nil, err
}
sk, ek := tablecodec.GetTableHandleKeyRange(tableID)
sRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, sk))
sRegion, err := pdCli.GetRegionByKey(ctx, codec.EncodeBytes(nil, sk))
if err != nil {
return nil, err
}
eRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, ek))
eRegion, err := pdCli.GetRegionByKey(ctx, codec.EncodeBytes(nil, ek))
if err != nil {
return nil, err
}
Expand All @@ -1718,7 +1730,7 @@ func (*memtableRetriever) getRegionsInfoForSingleTable(ctx context.Context, help
if err != nil {
return nil, err
}
return helper.PDHTTPClient().GetRegionsByKey(ctx, sk, ek, -1)
return pdCli.GetRegionsByKey(ctx, sk, ek, -1)
}

func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *pd.RegionInfo, table *helper.TableInfo) {
Expand Down Expand Up @@ -2167,7 +2179,7 @@ func getRemainDurationForAnalyzeStatusHelper(
}
}
if tid > 0 && totalCnt == 0 {
totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(ctx,sctx, tid, dbName, tableName, partitionName)
totalCnt, _ = pdhelper.GlobalPDHelper.GetApproximateTableCountFromStorage(ctx, sctx, tid, dbName, tableName, partitionName)
}
remainingDuration, percentage = calRemainInfoForAnalyzeStatus(ctx, int64(totalCnt), processedRows, duration)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/executor/memtable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,13 +885,17 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx
Store: tikvStore,
RegionCache: tikvStore.GetRegionCache(),
}
pdCli, err := tikvHelper.TryGetPDHTTPClient()
if err != nil {
return nil, err
}

var regionsInfo, regionsInfoByStoreID []pd.RegionInfo
regionMap := make(map[int64]*pd.RegionInfo)
storeMap := make(map[int64]struct{})

if len(e.extractor.StoreIDs) == 0 && len(e.extractor.RegionIDs) == 0 {
regionsInfo, err := tikvHelper.PDHTTPClient().GetRegions(ctx)
regionsInfo, err := pdCli.GetRegions(ctx)
if err != nil {
return nil, err
}
Expand All @@ -902,7 +906,7 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx
// if a region_id located in 1, 4, 7 store we will get all of them when request any store_id,
// storeMap is used to filter peers on unexpected stores.
storeMap[int64(storeID)] = struct{}{}
storeRegionsInfo, err := tikvHelper.PDHTTPClient().GetRegionsByStoreID(ctx, storeID)
storeRegionsInfo, err := pdCli.GetRegionsByStoreID(ctx, storeID)
if err != nil {
return nil, err
}
Expand All @@ -925,7 +929,7 @@ func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx
// if there is storeIDs, target region_id is fetched by storeIDs,
// otherwise we need to fetch it from PD.
if len(e.extractor.StoreIDs) == 0 {
regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(ctx, regionID)
regionInfo, err := pdCli.GetRegionByID(ctx, regionID)
if err != nil {
return nil, err
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -823,8 +823,12 @@ func getRegionInfo(store helper.Storage, regions []regionMeta) ([]regionMeta, er
Store: store,
RegionCache: store.GetRegionCache(),
}
pdCli, err := tikvHelper.TryGetPDHTTPClient()
if err != nil {
return regions, err
}
for i := range regions {
regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(context.TODO(), regions[i].region.Id)
regionInfo, err := pdCli.GetRegionByID(context.TODO(), regions[i].region.Id)
if err != nil {
return nil, err
}
Expand Down
30 changes: 20 additions & 10 deletions pkg/store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,13 @@ func NewHelper(store Storage) *Helper {
}
}

// PDHTTPClient returns the PD HTTP client.
func (h *Helper) PDHTTPClient() pd.Client {
return h.Store.GetPDHTTPClient()
// TryGetPDHTTPClient tries to get a PD HTTP client if it's available.
func (h *Helper) TryGetPDHTTPClient() (pd.Client, error) {
cli := h.Store.GetPDHTTPClient()
if cli == nil {
return nil, errors.New("pd http client unavailable")
}
return cli, nil
}

// MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms.
Expand Down Expand Up @@ -300,15 +304,16 @@ func (h *Helper) ScrapeHotInfo(ctx context.Context, rw string, allSchemas []*mod

// FetchHotRegion fetches the hot region information from PD's http api.
func (h *Helper) FetchHotRegion(ctx context.Context, rw string) (map[uint64]RegionMetric, error) {
var (
regionResp *pd.StoreHotPeersInfos
err error
)
pdCli, err := h.TryGetPDHTTPClient()
if err != nil {
return nil, err
}
var regionResp *pd.StoreHotPeersInfos
switch rw {
case HotRead:
regionResp, err = h.PDHTTPClient().GetHotReadRegions(ctx)
regionResp, err = pdCli.GetHotReadRegions(ctx)
case HotWrite:
regionResp, err = h.PDHTTPClient().GetHotWriteRegions(ctx)
regionResp, err = pdCli.GetHotWriteRegions(ctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -785,6 +790,11 @@ func (h *Helper) GetPDAddr() ([]string, error) {

// GetPDRegionStats get the RegionStats by tableID from PD by HTTP API.
func (h *Helper) GetPDRegionStats(ctx context.Context, tableID int64, noIndexStats bool) (*pd.RegionStats, error) {
pdCli, err := h.TryGetPDHTTPClient()
if err != nil {
return nil, err
}

var startKey, endKey []byte
if noIndexStats {
startKey = tablecodec.GenTableRecordPrefix(tableID)
Expand All @@ -796,7 +806,7 @@ func (h *Helper) GetPDRegionStats(ctx context.Context, tableID int64, noIndexSta
startKey = codec.EncodeBytes([]byte{}, startKey)
endKey = codec.EncodeBytes([]byte{}, endKey)

return h.PDHTTPClient().GetRegionStatusByKey(ctx, startKey, endKey)
return pdCli.GetRegionStatusByKey(ctx, startKey, endKey)
}

// GetTiFlashTableIDFromEndKey computes tableID from pd rule's endKey.
Expand Down
9 changes: 7 additions & 2 deletions pkg/store/helper/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func TestTiKVRegionsInfo(t *testing.T) {
Store: store,
RegionCache: store.GetRegionCache(),
}
regionsInfo, err := h.PDHTTPClient().GetRegions(context.Background())
pdCli, err := h.TryGetPDHTTPClient()
require.NoError(t, err)
regionsInfo, err := pdCli.GetRegions(context.Background())
require.NoError(t, err)
require.Equal(t, getMockTiKVRegionsInfo(), regionsInfo)
}
Expand All @@ -104,7 +106,10 @@ func TestTiKVStoresStat(t *testing.T) {
RegionCache: store.GetRegionCache(),
}

stat, err := h.PDHTTPClient().GetStores(context.Background())
pdCli, err := h.TryGetPDHTTPClient()
require.NoError(t, err)

stat, err := pdCli.GetStores(context.Background())
require.NoError(t, err)

data, err := json.Marshal(stat)
Expand Down