Skip to content

Commit

Permalink
Ensure region is made unavailable in clientDown
Browse files Browse the repository at this point in the history
The clients cache can get out of sync with what regions are actually
associated with a region server. A case has been seen where region
points to a region client that doesn't exist in the clients map. When
this occurs subsequent calls to clientDown are no-ops, and the bad
region continues to point to the incorrect region server.

clientDown will now ensure that the given region is made unavailable,
even if it is not found in the clients list of regions.

closes: #186
  • Loading branch information
aaronbee committed Aug 10, 2023
1 parent 4bda353 commit 5800f86
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 17 deletions.
25 changes: 18 additions & 7 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (c *client) handleResultError(err error, reg hrpc.RegionInfo, rc hrpc.Regio
go c.reestablishRegion(reg)
}
} else {
c.clientDown(rc)
c.clientDown(rc, reg)
}
}
}
Expand Down Expand Up @@ -391,13 +391,24 @@ func (c *client) sendRPCToRegionClient(ctx context.Context, rpc hrpc.Call, rc hr
return res.Msg, res.Error
}

// clientDown removes client from cache and marks
// all the regions sharing this region's
// client as unavailable, and start a goroutine
// clientDown removes client from cache and marks all the regions
// sharing this region's client as unavailable, and start a goroutine
// to reconnect for each of them.
func (c *client) clientDown(client hrpc.RegionClient) {
//
// Due to races filling in the clients cache it may not be completely
// accurate. reg is the region we were trying to access when we saw an
// issue with the region client, so make sure it is marked unavailable
// even if it doesn't appear in the clients cache.
func (c *client) clientDown(client hrpc.RegionClient, reg hrpc.RegionInfo) {
downregions := c.clients.clientDown(client)
if reg.MarkUnavailable() {
reg.SetClient(nil)
go c.reestablishRegion(reg)
}
for downreg := range downregions {
if downreg == reg {
continue
}
if downreg.MarkUnavailable() {
downreg.SetClient(nil)
go c.reestablishRegion(downreg)
Expand Down Expand Up @@ -767,7 +778,7 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
return
} else if _, ok := err.(region.ServerError); ok {
// the client we got died
c.clientDown(client)
c.clientDown(client, reg)
}
} else if err == context.Canceled {
// region is dead
Expand All @@ -777,7 +788,7 @@ func (c *client) establishRegion(reg hrpc.RegionInfo, addr string) {
// otherwise Dial failed, purge the client and retry.
// note that it's safer to reestablish all regions for this client as well
// because they could have ended up setteling for the same client.
c.clientDown(client)
c.clientDown(client, reg)
}

log.WithFields(log.Fields{
Expand Down
26 changes: 16 additions & 10 deletions rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,9 @@ func TestSendRPCToRegionClientDownDelayed(t *testing.T) {
ctrl := test.NewController(t)
defer ctrl.Finish()

// we don't expect any calls to zookeeper
c := newMockClient(nil)
zkClient := mockZk.NewMockClient(ctrl)
zkClient.EXPECT().LocateResource(zk.Meta).Return("regionserver:1", nil).AnyTimes()
c := newMockClient(zkClient)

// create region with mock client
origlReg := region.NewInfo(
Expand All @@ -440,7 +441,7 @@ func TestSendRPCToRegionClientDownDelayed(t *testing.T) {
c.regions.put(origlReg)
rc := mockRegion.NewMockRegionClient(ctrl)
rc.EXPECT().String().Return("mock region client").AnyTimes()
c.clients.put("regionserver:0", origlReg, func() hrpc.RegionClient {
c.clients.put("regionserver:1", origlReg, func() hrpc.RegionClient {
return rc
})
origlReg.SetClient(rc)
Expand All @@ -450,17 +451,15 @@ func TestSendRPCToRegionClientDownDelayed(t *testing.T) {
result := make(chan hrpc.RPCResult, 1)
mockCall.EXPECT().ResultChan().Return(result).Times(1)

rc2 := mockRegion.NewMockRegionClient(ctrl)
rc2.EXPECT().String().Return("mock region client").AnyTimes()
rc2 := newRegionClientFn("regionserver:1")()
rc.EXPECT().QueueRPC(mockCall).Times(1).Do(func(rpc hrpc.Call) {
// remove old client from clients cache
c.clients.clientDown(rc)
// replace client in region with new client
// this simulate other rpc toggling client reestablishment
c.regions.put(origlReg)
c.clients.put("regionserver:0", origlReg, func() hrpc.RegionClient {
return rc2
})
c.clients.put("regionserver:1", origlReg, func() hrpc.RegionClient { return rc2 })

origlReg.SetClient(rc2)

// return ServerError from QueueRPC, to emulate dead client
Expand All @@ -473,10 +472,17 @@ func TestSendRPCToRegionClientDownDelayed(t *testing.T) {
default:
t.Errorf("Got unexpected error: %v", err)
}

// Wait for establishRegion to complete
ch := origlReg.AvailabilityChan()
if ch != nil {
<-ch
}
// check that we did not down new client
if len(c.clients.regions) != 1 {
t.Errorf("There are %d cached clients", len(c.clients.regions))
t.Errorf("There are %d cached clients:", len(c.clients.regions))
for rc := range c.clients.regions {
t.Errorf("%s", rc.String())
}
}
_, ok := c.clients.regions[rc2]
if !ok {
Expand Down

0 comments on commit 5800f86

Please sign in to comment.