Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions pkg/cluster/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"strconv"
"strings"
"time"

appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -459,8 +460,12 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
// 1. we have not observed a new master pod when re-creating former replicas
// 2. we know possible switchover targets even when no replicas were recreated
if newMasterPod == nil && len(replicas) > 0 {
if err := c.Switchover(masterPod, masterCandidate(replicas)); err != nil {
c.logger.Warningf("could not perform switch over: %v", err)
masterCandidate, err := c.getSwitchoverCandidate(masterPod)
if err != nil {
return fmt.Errorf("skipping switchover: %v", err)
}
if err := c.Switchover(masterPod, masterCandidate); err != nil {
return fmt.Errorf("could not perform switch over: %v", err)
}
} else if newMasterPod == nil && len(replicas) == 0 {
c.logger.Warningf("cannot perform switch over before re-creating the pod: no replicas")
Expand All @@ -475,6 +480,57 @@ func (c *Cluster) recreatePods(pods []v1.Pod, switchoverCandidates []spec.Namesp
return nil
}

func (c *Cluster) getSwitchoverCandidate(master *v1.Pod) (spec.NamespacedName, error) {

var members []patroni.ClusterMember
candidates := make([]spec.NamespacedName, 0)
syncCandidates := make([]spec.NamespacedName, 0)
skipReasons := make([]string, 0)

err := retryutil.Retry(1*time.Second, 5*time.Second,
func() (bool, error) {
var err error
members, err = c.patroni.GetClusterMembers(master)

if err != nil {
return false, err
}
return true, nil
},
)
if err != nil {
return spec.NamespacedName{}, fmt.Errorf("failed to get Patroni cluster members: %s", err)
}

for _, member := range members {
if member.LagInMB > 0 {
skipReasons = append(skipReasons, fmt.Sprintf("%s lags behind by %d MB", member.Name, member.LagInMB))
continue
}
if PostgresRole(member.Role) != Leader && PostgresRole(member.Role) != StandbyLeader && member.State == "running" {
candidates = append(candidates, spec.NamespacedName{Namespace: master.Namespace, Name: member.Name})
if PostgresRole(member.Role) != SyncStandby {
syncCandidates = append(syncCandidates, spec.NamespacedName{Namespace: master.Namespace, Name: member.Name})
}
}
}

if len(syncCandidates) > 0 {
return candidates[rand.Intn(len(syncCandidates))], nil
}
if len(candidates) > 0 {
return candidates[rand.Intn(len(candidates))], nil
}

if len(skipReasons) > 0 {
err = fmt.Errorf("no replica suitable for switchover: %s", strings.Join(skipReasons, `','`))
} else {
err = fmt.Errorf("no replica running")
}

return spec.NamespacedName{}, err
}

func (c *Cluster) podIsEndOfLife(pod *v1.Pod) (bool, error) {
node, err := c.KubeClient.Nodes().Get(context.TODO(), pod.Spec.NodeName, metav1.GetOptions{})
if err != nil {
Expand Down
80 changes: 80 additions & 0 deletions pkg/cluster/pod_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package cluster

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"testing"

"github.com/golang/mock/gomock"
"github.com/zalando/postgres-operator/mocks"
acidv1 "github.com/zalando/postgres-operator/pkg/apis/acid.zalan.do/v1"
"github.com/zalando/postgres-operator/pkg/spec"
"github.com/zalando/postgres-operator/pkg/util/k8sutil"
"github.com/zalando/postgres-operator/pkg/util/patroni"
)

func TestGetSwitchoverCandidate(t *testing.T) {
testName := "test getting right switchover candidate"
namespace := "default"

ctrl := gomock.NewController(t)
defer ctrl.Finish()

var cluster = New(Config{}, k8sutil.KubernetesClient{}, acidv1.Postgresql{}, logger, eventRecorder)

// simulate different member scenarios
tests := []struct {
subtest string
clusterJson string
expectedCandidate spec.NamespacedName
expectedError error
}{
{
subtest: "choose sync_standby over replica",
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`,
expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-1"},
expectedError: nil,
},
{
subtest: "choose replica without a lag",
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`,
expectedCandidate: spec.NamespacedName{Namespace: namespace, Name: "acid-test-cluster-2"},
expectedError: nil,
},
{
subtest: "no suitable replica available",
clusterJson: `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "replica", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 5}]}`,
expectedCandidate: spec.NamespacedName{},
expectedError: fmt.Errorf("no replica suitable for switchover: acid-test-cluster-1 lags behind by 5 MB"),
},
}

for _, tt := range tests {
// mocking cluster members
r := ioutil.NopCloser(bytes.NewReader([]byte(tt.clusterJson)))

response := http.Response{
StatusCode: 200,
Body: r,
}

mockClient := mocks.NewMockHTTPClient(ctrl)
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil).AnyTimes()

p := patroni.New(patroniLogger, mockClient)
cluster.patroni = p
mockMasterPod := newMockPod("192.168.100.1")
mockMasterPod.Namespace = namespace

candidate, err := cluster.getSwitchoverCandidate(mockMasterPod)
if err != nil && err.Error() != tt.expectedError.Error() {
t.Errorf("%s - %s: unexpected error, %v", testName, tt.subtest, err)
}

if candidate != tt.expectedCandidate {
t.Errorf("%s - %s: unexpect switchover candidate, got %s, expected %s", testName, tt.subtest, candidate, tt.expectedCandidate)
}
}
}
10 changes: 5 additions & 5 deletions pkg/cluster/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
type PostgresRole string

const (
// Master role
Master PostgresRole = "master"

// Replica role
Replica PostgresRole = "replica"
Master PostgresRole = "master"
Replica PostgresRole = "replica"
Leader PostgresRole = "leader"
StandbyLeader PostgresRole = "standby_leader"
SyncStandby PostgresRole = "sync_standby"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel there is a comment missing here:

  1. how does operator treat differently Master, StandbyLeader on the one hand and Leader on the other ? Both Master and StandbyLeader are leaders.
  2. SyncStanbdy is still a Replica

While it is still possible to figure it out from the code, a short comment about why these are not completely independent states is worthy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

master and replica are Spilo roles. The other ones role names returned by Patroni's cluster endpoint. Added comments

)

// PodEventType represents the type of a pod-related event
Expand Down
5 changes: 0 additions & 5 deletions pkg/cluster/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"sort"
"strings"
Expand Down Expand Up @@ -525,10 +524,6 @@ func (c *Cluster) credentialSecretNameForCluster(username string, clusterName st
"tprgroup", acidzalando.GroupName)
}

func masterCandidate(replicas []spec.NamespacedName) spec.NamespacedName {
return replicas[rand.Intn(len(replicas))]
}

func cloneSpec(from *acidv1.Postgresql) (*acidv1.Postgresql, error) {
var (
buf bytes.Buffer
Expand Down
37 changes: 37 additions & 0 deletions pkg/util/patroni/patroni.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
const (
failoverPath = "/failover"
configPath = "/config"
clusterPath = "/cluster"
statusPath = "/patroni"
restartPath = "/restart"
apiPort = 8008
Expand All @@ -29,6 +30,7 @@ const (

// Interface describe patroni methods
type Interface interface {
GetClusterMembers(master *v1.Pod) ([]ClusterMember, error)
Switchover(master *v1.Pod, candidate string) error
SetPostgresParameters(server *v1.Pod, options map[string]string) error
GetMemberData(server *v1.Pod) (MemberData, error)
Expand Down Expand Up @@ -175,6 +177,20 @@ func (p *Patroni) SetConfig(server *v1.Pod, config map[string]interface{}) error
return p.httpPostOrPatch(http.MethodPatch, apiURLString+configPath, buf)
}

// ClusterMembers array of cluster members from Patroni API
type ClusterMembers struct {
Members []ClusterMember `json:"members"`
}

// ClusterMember cluster member data from Patroni API
type ClusterMember struct {
Name string `json:"name"`
Role string `json:"role"`
State string `json:"state"`
Timeline int `json:"timeline"`
LagInMB int `json:"lag"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does Patroni always return lag in MB and not simply in bytes ?
for example, the patronictl converts bytes to MB

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed to just Lag

Copy link
Member Author

@FxKu FxKu Dec 10, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm maybe lag is reserved for something else:

cannot unmarshal string into Go struct field ClusterMember.members.lag of type int

renamed it back, since I think it's always in MB.

}

// MemberDataPatroni child element
type MemberDataPatroni struct {
Version string `json:"version"`
Expand Down Expand Up @@ -246,6 +262,27 @@ func (p *Patroni) Restart(server *v1.Pod) error {
return nil
}

// GetClusterMembers read cluster data from patroni API
func (p *Patroni) GetClusterMembers(server *v1.Pod) ([]ClusterMember, error) {

apiURLString, err := apiURL(server)
if err != nil {
return []ClusterMember{}, err
}
body, err := p.httpGet(apiURLString + clusterPath)
if err != nil {
return []ClusterMember{}, err
}

data := ClusterMembers{}
err = json.Unmarshal([]byte(body), &data)
if err != nil {
return []ClusterMember{}, err
}

return data.Members, nil
}

// GetMemberData read member data from patroni API
func (p *Patroni) GetMemberData(server *v1.Pod) (MemberData, error) {

Expand Down
49 changes: 49 additions & 0 deletions pkg/util/patroni/patroni_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,55 @@ func TestApiURL(t *testing.T) {
}
}

func TestGetClusterMembers(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

expectedClusterMemberData := []ClusterMember{
{
Name: "acid-test-cluster-0",
Role: "leader",
State: "running",
Timeline: 1,
LagInMB: 0,
}, {
Name: "acid-test-cluster-1",
Role: "sync_standby",
State: "running",
Timeline: 1,
LagInMB: 0,
}, {
Name: "acid-test-cluster-2",
Role: "replica",
State: "running",
Timeline: 1,
LagInMB: 0,
}}

json := `{"members": [{"name": "acid-test-cluster-0", "role": "leader", "state": "running", "api_url": "http://192.168.100.1:8008/patroni", "host": "192.168.100.1", "port": 5432, "timeline": 1}, {"name": "acid-test-cluster-1", "role": "sync_standby", "state": "running", "api_url": "http://192.168.100.2:8008/patroni", "host": "192.168.100.2", "port": 5432, "timeline": 1, "lag": 0}, {"name": "acid-test-cluster-2", "role": "replica", "state": "running", "api_url": "http://192.168.100.3:8008/patroni", "host": "192.168.100.3", "port": 5432, "timeline": 1, "lag": 0}]}`
r := ioutil.NopCloser(bytes.NewReader([]byte(json)))

response := http.Response{
StatusCode: 200,
Body: r,
}

mockClient := mocks.NewMockHTTPClient(ctrl)
mockClient.EXPECT().Get(gomock.Any()).Return(&response, nil)

p := New(logger, mockClient)

clusterMemberData, err := p.GetClusterMembers(newMockPod("192.168.100.1"))

if !reflect.DeepEqual(expectedClusterMemberData, clusterMemberData) {
t.Errorf("Patroni cluster members differ: expected: %#v, got: %#v", expectedClusterMemberData, clusterMemberData)
}

if err != nil {
t.Errorf("Could not read Patroni data: %v", err)
}
}

func TestGetMemberData(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down