Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions go-client/pegasus/table_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,16 @@ func (p *pegasusTableConnector) updateConf(ctx context.Context) error {
return nil
}

func isPartitionValid(oldCount int, respCount int) bool {
return oldCount == 0 || oldCount == respCount || oldCount*2 == respCount || oldCount == respCount*2
func isPartitionValid(respCount int) bool {
// Check if respCount is greater than or equal to 4 and is a power of 2
return respCount >= 4 && (respCount&(respCount-1)) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about checking respCount/oldCount is power of 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of traffic switching, there is no direct relationship between the partition counts of the old and new tables. I think as long as respCount is valid (a power of 2 and >= 4), it should be fine. We don't really care about oldCount in this scenario.

}

func (p *pegasusTableConnector) handleQueryConfigResp(resp *replication.QueryCfgResponse) error {
if resp.Err.Errno != base.ERR_OK.String() {
return errors.New(resp.Err.Errno)
}
if resp.PartitionCount == 0 || len(resp.Partitions) != int(resp.PartitionCount) || !isPartitionValid(len(p.parts), int(resp.PartitionCount)) {
if resp.PartitionCount == 0 || len(resp.Partitions) != int(resp.PartitionCount) || !isPartitionValid(int(resp.PartitionCount)) {
return fmt.Errorf("invalid table configuration: response [%v]", resp)
}

Expand Down
44 changes: 44 additions & 0 deletions go-client/pegasus/table_connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,50 @@ func TestPegasusTableConnector_QueryConfigRespWhileCancelSplit(t *testing.T) {
ptb.Close()
}

func TestPegasusTableConnector_QueryConfigRespWhileTablePartitionResize(t *testing.T) {
// In certain scenarios, when dealing with partitions that consume excessive disk space,
// we may split a single partition into multiple ones (more than double the original number).
// To achieve this, we typically use an offline approach involving multiple splits,
// followed by reconstruction on another cluster. Meanwhile, we utilize metaproxy to ensure
// seamless traffic switching. As a result, significant changes in the partition count
// are possible (e.g., 8x or 1/8x the original count). Therefore, when updating the configuration,
// we do not require the new partition count to be related to the previous one.
// It only needs to be a valid number.

// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()

client := NewClient(testingCfg)
defer client.Close()

tb, err := client.OpenTable(context.Background(), "temp")
assert.Nil(t, err)
defer tb.Close()
ptb, _ := tb.(*pegasusTableConnector)

partitionCount := len(ptb.parts)
resp := replication.NewQueryCfgResponse()
resp.Err = &base.ErrorCode{Errno: "ERR_OK"}
resp.AppID = ptb.appID
resp.PartitionCount = int32(partitionCount * 8)
resp.Partitions = make([]*replication.PartitionConfiguration, partitionCount*8)
for i := 0; i < partitionCount*8; i++ {
if i < partitionCount {
resp.Partitions[i] = ptb.parts[i].pconf
} else {
conf := replication.NewPartitionConfiguration()
conf.Ballot = -1
conf.Pid = &base.Gpid{ptb.appID, int32(i)}
resp.Partitions[i] = conf
}
}

err = ptb.handleQueryConfigResp(resp)
assert.Nil(t, err)
assert.Equal(t, partitionCount*8, len(ptb.parts))
ptb.Close()
}

func TestPegasusTableConnector_Close(t *testing.T) {
// Ensure loopForAutoUpdate will be closed.
defer leaktest.Check(t)()
Expand Down
Loading