Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions controllers/change_coordinators.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/coordinator"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/internal/locality"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbstatus"
"github.com/go-logr/logr"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -79,6 +80,13 @@ func (c changeCoordinators) reconcile(
return &requeue{curError: err, delayedRequeue: true}
}

logger.Info(
"all valid",
"hasValidCoordinators",
hasValidCoordinators,
"allAddressesValid",
allAddressesValid,
)
if hasValidCoordinators {
return nil
}
Expand All @@ -94,6 +102,21 @@ func (c changeCoordinators) reconcile(
return nil
}

// Perform safety checks before changing coordinators. The minimum uptime should reduce the coordinator changes
// if a process is down for a short amount of time, e.g. after a cluster wide bounce.
err = fdbstatus.CanSafelyChangeCoordinators(
logger,
cluster,
status,
r.MinimumUptimeForCoordinatorChangeWithMissingProcess,
r.MinimumUptimeForCoordinatorChangeWithUndesiredProcess,
r.EnableRecoveryState,
)
if err != nil {
logger.Info("Deferring coordinator change due to safety check", "error", err.Error())
return &requeue{curError: err, delayedRequeue: true}
}

err = r.takeLock(logger, cluster, "changing coordinators")
if err != nil {
return &requeue{curError: err, delayedRequeue: true}
Expand Down
172 changes: 167 additions & 5 deletions controllers/change_coordinators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package controllers
import (
"context"
"math"
"time"

"k8s.io/utils/ptr"

Expand Down Expand Up @@ -50,10 +51,6 @@ var _ = Describe("Change coordinators", func() {
},
}
Expect(setupClusterForTest(cluster)).NotTo(HaveOccurred())

var err error
_, err = mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
})

Describe("reconcile", func() {
Expand All @@ -70,7 +67,7 @@ var _ = Describe("Change coordinators", func() {
clusterReconciler,
cluster,
nil,
globalControllerLogger,
testLogger,
)
})

Expand Down Expand Up @@ -164,5 +161,170 @@ var _ = Describe("Change coordinators", func() {
},
)
})

When("safety checks are enabled", func() {
BeforeEach(func() {
clusterReconciler.MinimumUptimeForCoordinatorChangeWithUndesiredProcess = 5 * time.Minute
clusterReconciler.MinimumUptimeForCoordinatorChangeWithMissingProcess = 10 * time.Minute
clusterReconciler.EnableRecoveryState = true
})

When("one coordinator is undesired", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

status, err := adminClient.GetStatus()
Expect(err).NotTo(HaveOccurred())

coordinators := map[string]fdbv1beta2.None{}
for _, coordinator := range status.Client.Coordinators.Coordinators {
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
}

for _, process := range status.Cluster.Processes {
if _, ok := coordinators[process.Address.String()]; !ok {
continue
}
Expect(adminClient.ExcludeProcesses([]fdbv1beta2.ProcessAddress{
{
IPAddress: process.Address.IPAddress,
},
})).To(Succeed())
break
}
})

When("the cluster is up for long enough", func() {
It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this guarantee it is changing the coordinators? feels like there should be a more explicit check, esp wrt time passed

})
})

When("Too many active generations are present", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = ptr.To(11)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(ContainSubstring("cluster has 11 active generations, but only 10 active generations are allowed to safely change coordinators"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})

When("the cluster is only up for 10 seconds", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(Equal("cannot change coordinators: cluster is not up for long enough, cluster minimum uptime is 10.00 seconds but 300.00 seconds required for safe coordinator change"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})
})

When("one coordinator is missing", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())

status, err := adminClient.GetStatus()
Expect(err).NotTo(HaveOccurred())

coordinators := map[string]fdbv1beta2.None{}
for _, coordinator := range status.Client.Coordinators.Coordinators {
coordinators[coordinator.Address.String()] = fdbv1beta2.None{}
}

for _, process := range status.Cluster.Processes {
if _, ok := coordinators[process.Address.String()]; !ok {
continue
}
adminClient.MockMissingProcessGroup(
fdbv1beta2.ProcessGroupID(
process.Locality[fdbv1beta2.FDBLocalityInstanceIDKey],
),
true,
)
break
}
})

When("the cluster is up for long enough", func() {
It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
})
})

When("Multiple active generations are present", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = ptr.To(11)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.ActiveGenerations = nil
})

It("should change the coordinators", func() {
Expect(requeue).To(BeNil())
})
})

When("the cluster is only up for 10 seconds", func() {
BeforeEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = ptr.To(10.0)
})

AfterEach(func() {
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
Expect(err).NotTo(HaveOccurred())
adminClient.SecondsSinceLastRecovered = nil
})

It("should defer coordinator change and requeue with delay", func() {
Expect(requeue).NotTo(BeNil())
Expect(requeue.delayedRequeue).To(BeTrue())
Expect(requeue.curError).To(HaveOccurred())
Expect(
requeue.curError.Error(),
).To(Equal("cannot change coordinators: cluster has 1 missing coordinators, cluster minimum uptime is 10.00 seconds but 600.00 seconds required for safe coordinator change"))
Expect(cluster.Status.ConnectionString).To(Equal(originalConnectionString))
})
})
})
})
})
})
7 changes: 7 additions & 0 deletions controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,13 @@ type FoundationDBClusterReconciler struct {
// wait time will increase the chances that all updates are part of the list but will also delay the rollout of
// the change.
GlobalSynchronizationWaitDuration time.Duration
// MinimumUptimeForCoordinatorChangeWithMissingProcess defines the minimum uptime of the cluster before coordinator
// changes because of a missing coordinator are allowed.
MinimumUptimeForCoordinatorChangeWithMissingProcess time.Duration
// MinimumUptimeForCoordinatorChangeWithUndesiredProcess defines the minimum uptime of the cluster before coordinator
// changes because of an undesired coordinator are allowed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// changes because of an undesired coordinator are allowed.
// changes because of an undesired (excluded) coordinator are allowed.

Is there a different meaning of undesired other than excluded here? If not, feels like we should just use "excluded"

MinimumUptimeForCoordinatorChangeWithUndesiredProcess time.Duration

// MinimumRecoveryTimeForInclusion defines the duration in seconds that a cluster must be up
// before new inclusions are allowed. The operator issuing frequent inclusions in a short time window
// could cause instability for the cluster as each inclusion will/can cause a recovery. Delaying the inclusion
Expand Down
6 changes: 4 additions & 2 deletions pkg/fdbadminclient/mock/admin_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ type AdminClient struct {
localityInfo map[fdbv1beta2.ProcessGroupID]map[string]string
MaxZoneFailuresWithoutLosingData *int
MaxZoneFailuresWithoutLosingAvailability *int
ActiveGenerations *int
MaintenanceZone fdbv1beta2.FaultDomain
restoreURL string
maintenanceZoneStartTimestamp time.Time
MockAdditionTimeForGlobalCoordination time.Time
uptimeSecondsForMaintenanceZone float64
SecondsSinceLastRecovered *float64
TeamTracker []fdbv1beta2.FoundationDBStatusTeamTracker
Logs []fdbv1beta2.FoundationDBStatusLogInfo
mockError error
Expand Down Expand Up @@ -614,8 +616,8 @@ func (client *AdminClient) GetStatus() (*fdbv1beta2.FoundationDBStatus, error) {

status.Cluster.RecoveryState = fdbv1beta2.RecoveryState{
Name: "fully_recovered",
SecondsSinceLastRecovered: 600.0,
ActiveGenerations: 1,
SecondsSinceLastRecovered: ptr.Deref(client.SecondsSinceLastRecovered, 600.0),
ActiveGenerations: ptr.Deref(client.ActiveGenerations, 1),
}

return status, nil
Expand Down
85 changes: 85 additions & 0 deletions pkg/fdbstatus/status_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"math"
"strconv"
"strings"
"time"

fdbv1beta2 "github.com/FoundationDB/fdb-kubernetes-operator/v2/api/v1beta2"
"github.com/FoundationDB/fdb-kubernetes-operator/v2/pkg/fdbadminclient"
Expand Down Expand Up @@ -840,3 +841,87 @@ func ClusterIsConfigured(
status.Client.DatabaseStatus.Available &&
status.Cluster.Layers.Error != "configurationMissing"
}

// CanSafelyChangeCoordinators returns nil when it is safe to change coordinators in the cluster or returns an error
// with more information why it's not safe to change coordinators. This function differentiates between missing (down)
// processes and processes that are only excluded or undesired, applying different minimum uptime requirements for each case.
func CanSafelyChangeCoordinators(
logger logr.Logger,
cluster *fdbv1beta2.FoundationDBCluster,
status *fdbv1beta2.FoundationDBStatus,
minimumUptimeForMissing time.Duration,
minimumUptimeForExcluded time.Duration,
recoveryStateEnabled bool,
) error {
// TODO double check setting here + true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👀

currentMinimumUptime, _, err := GetMinimumUptimeAndAddressMap(
logger,
cluster,
status,
recoveryStateEnabled,
)
if err != nil {
return fmt.Errorf("failed to get minimum uptime: %w", err)
}

// Analyze current coordinators using the coordinator information from status
// This gives us the definitive list of coordinators regardless of whether processes are running
missingCoordinators := 0

// Create a map of process addresses to process group IDs for faster lookup
coordinators := map[string]bool{}
for _, coordinator := range status.Client.Coordinators.Coordinators {
// TODO validate if logic will work with DNS names.
coordinators[coordinator.Address.String()] = false
}

for _, process := range status.Cluster.Processes {
processAddr := process.Address.String()
if _, ok := coordinators[processAddr]; !ok {
continue
}

coordinators[processAddr] = true
}

for _, isPresent := range coordinators {
if !isPresent {
missingCoordinators++
}
}

// Apply different uptime requirements based on the type of coordinator issues
var requiredUptime float64
var reason string

logger.V(1).
Info("Checking if it is safe to change coordinators", "missingCoordinators", missingCoordinators, "currentMinimumUptime", currentMinimumUptime)
if missingCoordinators > 0 {
// Missing coordinators indicate processes that are down, use lower threshold
requiredUptime = minimumUptimeForMissing.Seconds()
reason = fmt.Sprintf("cluster has %d missing coordinators", missingCoordinators)
} else {
requiredUptime = minimumUptimeForExcluded.Seconds()
reason = "cluster is not up for long enough"

// Perform the default safet checks in case of "normal" coordinator changes or if processes are exclude. If
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Perform the default safet checks in case of "normal" coordinator changes or if processes are exclude. If
// Perform the default safety checks in case of "normal" coordinator changes or if processes are excluded. If

// the cluster has missing coordinators, we should bypass those checks to ensure we recruit the new coordinators
// in a timely manner.
err = DefaultSafetyChecks(status, 10, "change coordinators")
if err != nil {
return err
}
}

// Check that the cluster has been stable for the required time
if currentMinimumUptime < requiredUptime {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

edge case kinda but could this get thrown off by a crashlooping coordinator?
The case where a coordinator is missing but will come up, I think the currentMinimumUptime should be time since recovery ( at least) and hopefully it should come up before requiredUptime, but if the coordinator (or something in tx) keeps crashing before requiredUptime couldn't we get stuck here? Alternatively, if there is something wrong with storage servers and one is crashing couldn't that also get us stuck here?
Or are these just not likely scenarios to happen for FDB?

return fmt.Errorf(
"cannot change coordinators: %s, cluster minimum uptime is %.2f seconds but %.2f seconds required for safe coordinator change",
reason,
currentMinimumUptime,
requiredUptime,
)
}

return nil
}
Loading