Skip to content

Commit

Permalink
api: fix panic when region doesn't have a leader (#7629) (#7650)
Browse files Browse the repository at this point in the history
close #7630

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-chi-bot and rleungx authored Feb 9, 2024
1 parent decd310 commit ae9db49
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 33 deletions.
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,8 @@ func (c *Cluster) collectClusterMetrics() {
func (c *Cluster) resetMetrics() {
statistics.Reset()

c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/schedule/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ func collectHotMetrics(cluster sche.ClusterInformer, stores []*core.StoreInfo, t
}

// ResetHotSpotMetrics resets hot spot metrics.
func (c *Coordinator) ResetHotSpotMetrics() {
func ResetHotSpotMetrics() {
hotSpotStatusGauge.Reset()
schedulers.HotPendingSum.Reset()
}
Expand Down
11 changes: 3 additions & 8 deletions pkg/schedule/schedulers/scheduler_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const maxScheduleRetries = 10

var (
denySchedulersByLabelerCounter = labeler.LabelerEventCounter.WithLabelValues("schedulers", "deny")
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
)

// Controller is used to manage all schedulers.
Expand Down Expand Up @@ -128,21 +126,18 @@ func (c *Controller) CollectSchedulerMetrics() {
}
ruleCnt := ruleMgr.GetRulesCount()
groupCnt := ruleMgr.GetGroupsCount()
rulesCntStatusGauge.Set(float64(ruleCnt))
groupsCntStatusGauge.Set(float64(groupCnt))
ruleStatusGauge.WithLabelValues("rule_count").Set(float64(ruleCnt))
ruleStatusGauge.WithLabelValues("group_count").Set(float64(groupCnt))
}

func (c *Controller) isSchedulingHalted() bool {
return c.cluster.GetSchedulerConfig().IsSchedulingHalted()
}

// ResetSchedulerMetrics resets metrics of all schedulers.
func (c *Controller) ResetSchedulerMetrics() {
func ResetSchedulerMetrics() {
schedulerStatusGauge.Reset()
ruleStatusGauge.Reset()
// create in map again
rulesCntStatusGauge = ruleStatusGauge.WithLabelValues("rule_count")
groupsCntStatusGauge = ruleStatusGauge.WithLabelValues("group_count")
}

// AddSchedulerHandler adds the HTTP handler for a scheduler.
Expand Down
66 changes: 50 additions & 16 deletions server/api/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,13 @@ func (h *regionHandler) GetRegionByID(w http.ResponseWriter, r *http.Request) {
}

regionInfo := rc.GetRegion(regionID)
h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo))
b, err := marshalRegionInfoJSON(r.Context(), regionInfo)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.Data(w, http.StatusOK, b)
}

// @Tags region
Expand Down Expand Up @@ -289,7 +295,13 @@ func (h *regionHandler) GetRegion(w http.ResponseWriter, r *http.Request) {
}

regionInfo := rc.GetRegionByKey([]byte(key))
h.rd.JSON(w, http.StatusOK, NewAPIRegionInfo(regionInfo))
b, err := marshalRegionInfoJSON(r.Context(), regionInfo)
if err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}

h.rd.Data(w, http.StatusOK, b)
}

// @Tags region
Expand Down Expand Up @@ -349,6 +361,24 @@ func newRegionsHandler(svr *server.Server, rd *render.Render) *regionsHandler {
}
}

// marshalRegionInfoJSON marshals region to bytes in `RegionInfo`'s JSON format.
// It is used to reduce the cost of JSON serialization.
func marshalRegionInfoJSON(ctx context.Context, r *core.RegionInfo) ([]byte, error) {
out := &jwriter.Writer{}

region := &RegionInfo{}
select {
case <-ctx.Done():
// Return early, avoid the unnecessary computation.
// See more details in https://github.com/tikv/pd/issues/6835
return nil, ctx.Err()
default:
}

covertAPIRegionInfo(r, region, out)
return out.Buffer.BuildBytes(), out.Error
}

// marshalRegionsInfoJSON marshals regions to bytes in `RegionsInfo`'s JSON format.
// It is used to reduce the cost of JSON serialization.
func marshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]byte, error) {
Expand All @@ -372,27 +402,31 @@ func marshalRegionsInfoJSON(ctx context.Context, regions []*core.RegionInfo) ([]
if i > 0 {
out.RawByte(',')
}
InitRegion(r, region)
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
region.Leader.setDefaultIfNil()
for i := range region.Peers {
region.Peers[i].setDefaultIfNil()
}
for i := range region.PendingPeers {
region.PendingPeers[i].setDefaultIfNil()
}
for i := range region.DownPeers {
region.DownPeers[i].setDefaultIfNil()
}
region.MarshalEasyJSON(out)
covertAPIRegionInfo(r, region, out)
}
out.RawByte(']')

out.RawByte('}')
return out.Buffer.BuildBytes(), out.Error
}

func covertAPIRegionInfo(r *core.RegionInfo, region *RegionInfo, out *jwriter.Writer) {
InitRegion(r, region)
// EasyJSON will not check anonymous struct pointer field and will panic if the field is nil.
// So we need to set the field to default value explicitly when the anonymous struct pointer is nil.
region.Leader.setDefaultIfNil()
for i := range region.Peers {
region.Peers[i].setDefaultIfNil()
}
for i := range region.PendingPeers {
region.PendingPeers[i].setDefaultIfNil()
}
for i := range region.DownPeers {
region.DownPeers[i].setDefaultIfNil()
}
region.MarshalEasyJSON(out)
}

// @Tags region
// @Summary List all regions in the cluster.
// @Produce json
Expand Down
4 changes: 2 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2176,8 +2176,8 @@ func (c *RaftCluster) resetMetrics() {
statistics.Reset()

if !c.isAPIServiceMode {
c.coordinator.GetSchedulersController().ResetSchedulerMetrics()
c.coordinator.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
c.resetClusterMetrics()
}
c.resetHealthStatus()
Expand Down
8 changes: 4 additions & 4 deletions server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2504,8 +2504,8 @@ func TestCollectMetricsConcurrent(t *testing.T) {
controller.CollectSchedulerMetrics()
co.GetCluster().(*RaftCluster).collectClusterMetrics()
}
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetClusterMetrics()
wg.Wait()
}
Expand Down Expand Up @@ -2550,8 +2550,8 @@ func TestCollectMetrics(t *testing.T) {
s.Stats = nil
}
re.Equal(status1, status2)
co.ResetHotSpotMetrics()
controller.ResetSchedulerMetrics()
schedule.ResetHotSpotMetrics()
schedulers.ResetSchedulerMetrics()
co.GetCluster().(*RaftCluster).resetClusterMetrics()
}

Expand Down
53 changes: 53 additions & 0 deletions tests/pdctl/region/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,56 @@ func TestRegion(t *testing.T) {
{core.HexRegionKeyStr(r5.GetEndKey()), ""},
}, *rangeHoles)
}

func TestRegionNoLeader(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 1)
re.NoError(err)
err = cluster.RunInitialServers()
re.NoError(err)
cluster.WaitLeader()
url := cluster.GetConfig().GetClientURL()
stores := []*metapb.Store{
{
Id: 1,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 2,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
{
Id: 3,
State: metapb.StoreState_Up,
LastHeartbeat: time.Now().UnixNano(),
},
}

leaderServer := cluster.GetLeaderServer()
re.NoError(leaderServer.BootstrapCluster())
for i := 0; i < len(stores); i++ {
tests.MustPutStore(re, cluster, stores[i])
}

metaRegion := &metapb.Region{
Id: 100,
StartKey: []byte(""),
EndKey: []byte(""),
Peers: []*metapb.Peer{
{Id: 1, StoreId: 1},
{Id: 5, StoreId: 2},
{Id: 6, StoreId: 3}},
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}
r := core.NewRegionInfo(metaRegion, nil)

cluster.GetLeaderServer().GetRaftCluster().GetBasicCluster().SetRegion(r)

cmd := pdctlCmd.GetRootCmd()
_, err = pdctl.ExecuteCommand(cmd, "-u", url, "region", "100")
re.NoError(err)
}

0 comments on commit ae9db49

Please sign in to comment.