Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix leader changed bug #137

Merged
merged 3 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pkg/controller/component/storaged_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,20 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep
log.Info("drop hosts successfully")
}

if len(spaces) > 0 && nc.Status.Storaged.BalancedSpaces == nil {
nc.Status.Storaged.BalancedSpaces = make([]int32, 0, len(spaces))
}

for _, space := range spaces {
if contains(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not check the spaces every time?
Is it possible that the time between the two executions of this function is relatively long?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A balanced space is not needed balance again.

continue
}
if err := metaClient.BalanceLeader(*space.Id.SpaceID); err != nil {
return err
}
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, nc.Status.Storaged.LastBalanceJob.SpaceID)
}

if err := PvcMark(ss.clientSet.PVC(), nc.StoragedComponent(), oldReplicas, newReplicas); err != nil {
return err
}
Expand All @@ -191,6 +205,7 @@ func (ss *storageScaler) ScaleIn(nc *v1alpha1.NebulaCluster, oldReplicas, newRep

if nc.StoragedComponent().IsReady() {
log.Info("all used pvcs were reclaimed", "storage", nc.StoragedComponent().GetName())
nc.Status.Storaged.BalancedSpaces = nil
nc.Status.Storaged.LastBalanceJob = nil
nc.Status.Storaged.Phase = v1alpha1.RunningPhase
}
Expand Down
216 changes: 119 additions & 97 deletions pkg/nebula/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var ErrNoAvailableMetadEndpoints = errors.New("metadclient: no available endpoin
var _ MetaInterface = (*metaClient)(nil)

type (
ExecFn func(req interface{}) (*meta.ExecResp, error)
Fn func(req interface{}) (interface{}, error)

MetaInterface interface {
GetSpace(spaceName []byte) (*meta.SpaceItem, error)
Expand All @@ -59,7 +59,6 @@ type (
}
)

// TODO: capture ErrorCode_E_LEADER_CHANGED error for all methods
func NewMetaClient(endpoints []string, options ...Option) (MetaInterface, error) {
if len(endpoints) == 0 {
return nil, ErrNoAvailableMetadEndpoints
Expand All @@ -84,99 +83,104 @@ func newMetaConnection(endpoint string, options ...Option) (*metaClient, error)
return mc, nil
}

func (m *metaClient) reconnect(endpoint string, options ...Option) error {
m.mutex.Lock()
defer m.mutex.Unlock()
if err := m.disconnect(); err != nil {
func (m *metaClient) reconnect(endpoint string) error {
if err := m.client.Close(); err != nil {
return err
}
if _, err := newMetaConnection(endpoint, options...); err != nil {

transport, pf, err := buildClientTransport(endpoint)
if err != nil {
return err
}

metaServiceClient := meta.NewMetaServiceClientFactory(transport, pf)
if err := metaServiceClient.Open(); err != nil {
return err
}

if !metaServiceClient.IsOpen() {
return errors.Errorf("transport is not open")
}
m.client = metaServiceClient

return nil
}

func (m *metaClient) connect() error {
log := getLog()
if err := m.client.Open(); err != nil {
log.Error(err, "open transport failed")
return err
}
log.Info("metad connection opened", "isOpen", m.client.IsOpen())
return nil
}

func (m *metaClient) disconnect() error {
if err := m.client.Close(); err != nil {
getLog().Error(err, "close transport failed")
if !m.client.IsOpen() {
return errors.Errorf("transport is not open")
}
return nil
}

func (m *metaClient) Disconnect() error {
return m.disconnect()
return m.client.Close()
}

func (m *metaClient) GetSpace(spaceName []byte) (*meta.SpaceItem, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
req := &meta.GetSpaceReq{SpaceName: spaceName}
resp, err := m.client.GetSpace(req)
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.GetSpace(req.(*meta.GetSpaceReq))
return resp, err
})
if err != nil {
return nil, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return nil, errors.Errorf("GetSpace code %d", resp.Code)
}
return resp.Item, nil
return resp.(*meta.GetSpaceResp).Item, nil
}

func (m *metaClient) ListSpaces() ([]*meta.IdName, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
req := &meta.ListSpacesReq{}
resp, err := m.client.ListSpaces(req)
resp, err := m.retryOnError(meta.NewListSpacesReq(), func(req interface{}) (interface{}, error) {
resp, err := m.client.ListSpaces(req.(*meta.ListSpacesReq))
return resp, err
})
if err != nil {
return nil, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return nil, errors.Errorf("ListSpaces code %d", resp.Code)
}
return resp.Spaces, nil
return resp.(*meta.ListSpacesResp).Spaces, nil
}

func (m *metaClient) AddHosts(hosts []*nebula.HostAddr) error {
req := &meta.AddHostsReq{
Hosts: hosts,
}
return m.retryOnError(req, func(req interface{}) (*meta.ExecResp, error) {
_, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.AddHosts(req.(*meta.AddHostsReq))
return resp, err
})
return err
}

func (m *metaClient) DropHosts(hosts []*nebula.HostAddr) error {
req := &meta.DropHostsReq{
Hosts: hosts,
}
return m.retryOnError(req, func(req interface{}) (*meta.ExecResp, error) {
_, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.DropHosts(req.(*meta.DropHostsReq))
return resp, err
})
return err
}

func (m *metaClient) ListHosts(hostType meta.ListHostType) ([]*meta.HostItem, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
req := &meta.ListHostsReq{Type: hostType}
resp, err := m.client.ListHosts(req)
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.ListHosts(req.(*meta.ListHostsReq))
return resp, err
})
if err != nil {
return nil, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return nil, errors.Errorf("ListHosts code %d", resp.Code)
}
return resp.Hosts, nil
return resp.(*meta.ListHostsResp).Hosts, nil
}

func (m *metaClient) ListParts(spaceID nebula.GraphSpaceID, partIDs []nebula.PartitionID) ([]*meta.PartItem, error) {
Expand All @@ -186,14 +190,14 @@ func (m *metaClient) ListParts(spaceID nebula.GraphSpaceID, partIDs []nebula.Par
SpaceID: spaceID,
PartIds: partIDs,
}
resp, err := m.client.ListParts(req)
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.ListParts(req.(*meta.ListPartsReq))
return resp, err
})
if err != nil {
return nil, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return nil, errors.Errorf("ListParts code %d", resp.Code)
}
return resp.Parts, nil
return resp.(*meta.ListPartsResp).Parts, nil
}

func (m *metaClient) GetSpaceParts() (map[nebula.GraphSpaceID][]*meta.PartItem, error) {
Expand Down Expand Up @@ -277,61 +281,30 @@ func (m *metaClient) BalanceLeader(spaceID nebula.GraphSpaceID) error {
Op: meta.JobOp_ADD,
Type: meta.JobType_LEADER_BALANCE,
}
resp, err := m.client.RunAdminJob(req)
_, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))
return resp, err
})
if err != nil {
return err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
if resp.Code == nebula.ErrorCode_E_LEADER_CHANGED {
log.Info("request leader changed", "host", resp.Leader.Host, "port", resp.Leader.Port)
leader := fmt.Sprintf("%v:%v", resp.Leader.Host, resp.Leader.Port)
if err := m.reconnect(leader); err != nil {
return errors.Errorf("update client failed: %v", err)
}
resp, err := m.client.RunAdminJob(req)
if err != nil {
return err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return errors.Errorf("retry balance leader code %d", resp.Code)
}
return nil
}
return errors.Errorf("BalanceLeader code %d", resp.Code)
}
log.Info("balance leader successfully")
return nil
}

func (m *metaClient) balance(req *meta.AdminJobReq) (int32, error) {
log := getLog()
log.Info("start balance job")
resp, err := m.client.RunAdminJob(req)
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))
return resp, err
})
if err != nil {
log.Info("balance failed")
return 0, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
if resp.Code == nebula.ErrorCode_E_LEADER_CHANGED {
log.Info("request leader changed", "host", resp.Leader.Host, "port", resp.Leader.Port)
leader := fmt.Sprintf("%v:%v", resp.Leader.Host, resp.Leader.Port)
if err := m.reconnect(leader); err != nil {
return 0, errors.Errorf("update client failed: %v", err)
}
resp, err := m.client.RunAdminJob(req)
if err != nil {
return 0, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return 0, errors.Errorf("retry balance code %d", resp.Code)
}
log.Info("balance job running now")
return resp.GetResult_().GetJobID(), utilerrors.ReconcileErrorf("waiting for balance job %d finished", resp.GetResult_().GetJobID())
}
return 0, errors.Errorf("balance code %d", resp.Code)
}
log.Info("balance job running now")
return resp.GetResult_().GetJobID(), utilerrors.ReconcileErrorf("waiting for balance job %d finished", resp.GetResult_().GetJobID())
jobID := resp.(*meta.AdminJobResp).GetResult_().GetJobID()
return jobID, utilerrors.ReconcileErrorf("waiting for balance job %d finished", jobID)
}

func (m *metaClient) BalanceData(spaceID nebula.GraphSpaceID) (int32, error) {
Expand Down Expand Up @@ -368,41 +341,90 @@ func (m *metaClient) BalanceStatus(jobID int32, spaceID nebula.GraphSpaceID) err
Type: meta.JobType_STATS,
Paras: [][]byte{[]byte(strconv.FormatInt(int64(jobID), 10))},
}
resp, err := m.client.RunAdminJob(req)
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))
return resp, err
})
if err != nil {
return err
}
if len(resp.GetResult_().JobDesc) > 0 {
if resp.GetResult_().JobDesc[0].Status == meta.JobStatus_FINISHED {
jobDesc := resp.(*meta.AdminJobResp).GetResult_().JobDesc
if len(jobDesc) > 0 {
if jobDesc[0].Status == meta.JobStatus_FINISHED {
return nil
}
}
log.Info("Balance job in progress")
return &utilerrors.ReconcileError{Msg: fmt.Sprintf("Balance job %d still in progress", jobID)}
}

func (m *metaClient) retryOnError(req interface{}, fn ExecFn) error {
func (m *metaClient) retryOnError(req interface{}, fn Fn) (interface{}, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about ?

type (
    ExecResp interface{
        GetErrorCode() nebula.ErrorCode 
        GetChangedLeader() *nebula.HostAddr
    }
    ExecFn func(req interface{}) (ExecResp, error)
)
func (m *metaClient) retryOnError(req interface{}, fn ExecFn) error {
    ...
}
You can set the `resp` in `ExecFn` to avoid type conversions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's tedious, each method must define a wrapper to implement ExecResp. I prefer using reflect to get the response value.

resp, err := fn(req)
if err != nil {
return err
return resp, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
if resp.Code == nebula.ErrorCode_E_LEADER_CHANGED {
leader := fmt.Sprintf("%v:%v", resp.Leader.Host, resp.Leader.Port)
if err := m.reconnect(leader); err != nil {
return errors.Errorf("update client failed: %v", err)

code := getResponseCode(resp)
if code != nebula.ErrorCode_SUCCEEDED {
if code == nebula.ErrorCode_E_LEADER_CHANGED {
leader := getResponseLeader(resp)
if leader == nil {
return nil, fmt.Errorf("get changed leader failed")
}
newLeader := fmt.Sprintf("%v:%v", leader.Host, leader.Port)
// update leader info
if err := m.reconnect(newLeader); err != nil {
return nil, err
}
resp, err := fn(req)
if err != nil {
return err
return nil, err
}
if resp.Code != nebula.ErrorCode_SUCCEEDED {
return errors.Errorf("retry response code %d", resp.Code)
code := getResponseCode(resp)
if code != nebula.ErrorCode_SUCCEEDED {
return nil, fmt.Errorf("retry response code %d", code)
} else if code == nebula.ErrorCode_E_EXISTED {
return resp, nil
}
} else if resp.Code == nebula.ErrorCode_E_EXISTED {
return nil
return resp, nil
} else if code == nebula.ErrorCode_E_EXISTED {
return resp, nil
}
return errors.Errorf("response code %d", resp.Code)
return nil, fmt.Errorf("response code %d", code)
}
return resp, nil
}

func getResponseCode(resp interface{}) nebula.ErrorCode {
if r, ok := resp.(*meta.ExecResp); ok {
return r.Code
} else if r, ok := resp.(*meta.GetSpaceResp); ok {
return r.Code
} else if r, ok := resp.(*meta.ListSpacesResp); ok {
return r.Code
} else if r, ok := resp.(*meta.ListHostsResp); ok {
return r.Code
} else if r, ok := resp.(*meta.ListPartsResp); ok {
return r.Code
} else if r, ok := resp.(*meta.AdminJobResp); ok {
return r.Code
}
return nebula.ErrorCode_E_UNKNOWN
}

func getResponseLeader(resp interface{}) *nebula.HostAddr {
if r, ok := resp.(*meta.ExecResp); ok {
return r.Leader
} else if r, ok := resp.(*meta.GetSpaceResp); ok {
return r.Leader
} else if r, ok := resp.(*meta.ListSpacesResp); ok {
return r.Leader
} else if r, ok := resp.(*meta.ListHostsResp); ok {
return r.Leader
} else if r, ok := resp.(*meta.ListPartsResp); ok {
return r.Leader
} else if r, ok := resp.(*meta.AdminJobResp); ok {
return r.Leader
}
return nil
}