Skip to content

Commit 62fb251

Browse files
committed
Correct IP handling in case of global coordination
1 parent 93117ff commit 62fb251

File tree

8 files changed

+72
-32
lines changed

8 files changed

+72
-32
lines changed

controllers/exclude_processes.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func (e excludeProcesses) reconcile(ctx context.Context, r *FoundationDBClusterR
112112
// Make sure it's safe to exclude processes.
113113
err = fdbstatus.CanSafelyExcludeProcessesWithRecoveryState(cluster, status, r.MinimumRecoveryTimeForExclusion)
114114
if err != nil {
115-
return &requeue{curError: err, delayedRequeue: true}
115+
return &requeue{curError: err, delayedRequeue: true, delay: 10 * time.Second}
116116
}
117117

118118
var fdbProcessesToExclude []fdbv1beta2.ProcessAddress

controllers/remove_process_groups.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ func (u removeProcessGroups) reconcile(ctx context.Context, r *FoundationDBClust
7373
// If no process groups are marked to remove we have to check if all process groups are excluded.
7474
if len(processGroupsToRemove) == 0 {
7575
if !allExcluded {
76-
return &requeue{message: "Reconciliation needs to exclude more processes"}
76+
return &requeue{message: "Reconciliation needs to exclude more processes", delay: 15 * time.Second}
7777
}
78+
7879
return nil
7980
}
8081

controllers/update_status.go

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ func (c updateStatus) reconcile(ctx context.Context, r *FoundationDBClusterRecon
242242
return &requeue{curError: clientErr}
243243
}
244244

245-
err = coordination.UpdateGlobalCoordinationState(logger, cluster, adminClient)
245+
err = coordination.UpdateGlobalCoordinationState(logger, cluster, adminClient, processMap)
246246
if err != nil {
247247
return &requeue{curError: err}
248248
}
@@ -384,6 +384,12 @@ func checkAndSetProcessStatus(logger logr.Logger, r *FoundationDBClusterReconcil
384384
return nil
385385
}
386386

387+
// If the process groups is excluded, ensure that the process is marked as excluded.
388+
if excluded {
389+
logger.Info("process group is excluded", "processGroupID", processGroupStatus.ProcessGroupID)
390+
processGroupStatus.SetExclude()
391+
}
392+
387393
// If the processes of this process group are not being excluded anymore, we will reset the exclusion timestamp.
388394
// This allows to handle cases were a process was fully excluded but not yet removed and someone manually includes
389395
// the processes back. If multiple processes are running inside the pod and at least one process is excluded,
@@ -920,24 +926,20 @@ func getFaultDomainFromProcesses(processes []fdbv1beta2.FoundationDBStatusProces
920926
}
921927

922928
// updateFaultDomains will update the process groups fault domain, based on the last seen zone id in the cluster status.
923-
func updateFaultDomains(logger logr.Logger, processes map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.FoundationDBStatusProcessInfo, status *fdbv1beta2.FoundationDBClusterStatus) {
929+
func updateFaultDomains(logger logr.Logger, processesMap map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.FoundationDBStatusProcessInfo, status *fdbv1beta2.FoundationDBClusterStatus) {
924930
// If the process map is empty we can skip any further steps.
925-
if len(processes) == 0 {
931+
if len(processesMap) == 0 {
926932
return
927933
}
928934

929935
for idx, processGroup := range status.ProcessGroups {
930-
process, ok := processes[processGroup.ProcessGroupID]
931-
if !ok || len(processes) == 0 {
932-
// Fallback for multiple storage or log servers, those will contain the process information with the process number as a suffix.
933-
process, ok = processes[processGroup.ProcessGroupID+"-1"]
934-
if !ok || len(processes) == 0 {
935-
logger.Info("skip updating fault domain for process group with missing process in FoundationDB cluster status", "processGroupID", processGroup.ProcessGroupID)
936-
continue
937-
}
936+
processes := coordination.GetProcessesFromProcessMap(processGroup.ProcessGroupID, processesMap)
937+
if len(processes) == 0 {
938+
logger.Info("skip updating fault domain for process group with missing process in FoundationDB cluster status", "processGroupID", processGroup.ProcessGroupID)
939+
continue
938940
}
939941

940-
faultDomain := getFaultDomainFromProcesses(process)
942+
faultDomain := getFaultDomainFromProcesses(processes)
941943
if faultDomain == "" {
942944
logger.Info("skip updating fault domain for process group with missing zoneid", "processGroupID", processGroup.ProcessGroupID)
943945
continue

e2e/test_operator/operator_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ var _ = Describe("Operator", Label("e2e", "pr"), func() {
383383
var useLocalitiesForExclusion bool
384384

385385
JustBeforeEach(func() {
386-
initialPods := fdbCluster.GetLogPods()
386+
initialPods := fdbCluster.GetPods()
387387
coordinators := fdbstatus.GetCoordinatorsFromStatus(fdbCluster.GetStatus())
388388

389389
for _, pod := range initialPods.Items {

fdbclient/admin_client.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,20 @@ func (client *cliAdminClient) ExcludeProcesses(addresses []fdbv1beta2.ProcessAdd
380380
return client.ExcludeProcessesWithNoWait(addresses, client.Cluster.GetUseNonBlockingExcludes())
381381
}
382382

383+
// getAddressStringForIncludeAndExclude will return a string with addresses or localities that can be used for exclusion
384+
// or inclusion. If any of the addresses defines a port, it will be reset by this method to ensure the whole pod gets
385+
// excluded or included.
386+
func getAddressStringForIncludeAndExclude(addresses []fdbv1beta2.ProcessAddress) string {
387+
// Ensure that the ports are set to 0, as the operator will always exclude whole pods.
388+
for idx, address := range addresses {
389+
if address.Port != 0 {
390+
addresses[idx].Port = 0
391+
}
392+
}
393+
394+
return fdbv1beta2.ProcessAddressesStringWithoutFlags(addresses, " ")
395+
}
396+
383397
// ExcludeProcessesWithNoWait starts evacuating processes so that they can be removed from the database. If noWait is
384398
// set to true, the exclude command will not block until all data is moved away from the processes.
385399
func (client *cliAdminClient) ExcludeProcessesWithNoWait(addresses []fdbv1beta2.ProcessAddress, noWait bool) error {
@@ -393,7 +407,7 @@ func (client *cliAdminClient) ExcludeProcessesWithNoWait(addresses []fdbv1beta2.
393407
excludeCommand.WriteString("no_wait ")
394408
}
395409

396-
excludeCommand.WriteString(fdbv1beta2.ProcessAddressesString(addresses, " "))
410+
excludeCommand.WriteString(getAddressStringForIncludeAndExclude(addresses))
397411

398412
_, err := client.runCommand(cliCommand{command: excludeCommand.String(), timeout: client.getTimeout()})
399413

@@ -405,10 +419,7 @@ func (client *cliAdminClient) IncludeProcesses(addresses []fdbv1beta2.ProcessAdd
405419
if len(addresses) == 0 {
406420
return nil
407421
}
408-
_, err := client.runCommand(cliCommand{command: fmt.Sprintf(
409-
"include %s",
410-
fdbv1beta2.ProcessAddressesString(addresses, " "),
411-
)})
422+
_, err := client.runCommand(cliCommand{command: fmt.Sprintf("include %s", getAddressStringForIncludeAndExclude(addresses))})
412423
return err
413424
}
414425

fdbclient/admin_client_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -801,7 +801,7 @@ protocol fdb00b071010000`,
801801

802802
When("the cluster specifies that blocking exclusions should be used", func() {
803803
It("should return that the exclusion command is called without no_wait", func() {
804-
Expect(mockRunner.receivedArgs[0]).To(ContainElement("exclude 127.0.0.1:4500"))
804+
Expect(mockRunner.receivedArgs[0]).To(ContainElement("exclude 127.0.0.1"))
805805
})
806806
})
807807

@@ -811,7 +811,7 @@ protocol fdb00b071010000`,
811811
})
812812

813813
It("should return that the exclusion command is called with no_wait", func() {
814-
Expect(mockRunner.receivedArgs[0]).To(ContainElement("exclude no_wait 127.0.0.1:4500"))
814+
Expect(mockRunner.receivedArgs[0]).To(ContainElement("exclude no_wait 127.0.0.1"))
815815
})
816816
})
817817
})
@@ -933,7 +933,7 @@ protocol fdb00b071010000`,
933933

934934
It("should issue an exclude command to verify the exclusion", func() {
935935
Expect(mockRunner.receivedBinary[0]).To(HaveSuffix(fdbcliStr))
936-
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.1:4500 192.168.0.2:4500"))
936+
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.1 192.168.0.2"))
937937
})
938938
})
939939

@@ -965,7 +965,7 @@ protocol fdb00b071010000`,
965965

966966
It("should issue an exclude command to verify the exclusion", func() {
967967
Expect(mockRunner.receivedBinary[0]).To(HaveSuffix(fdbcliStr))
968-
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.1:4500 192.168.0.2:4500"))
968+
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.1 192.168.0.2"))
969969
})
970970
})
971971

@@ -1107,15 +1107,15 @@ protocol fdb00b071010000`,
11071107
It("should return the missing address list and no error", func() {
11081108
Expect(result).To(ConsistOf(fdbv1beta2.ProcessAddress{
11091109
IPAddress: net.ParseIP("192.168.0.5"),
1110-
Port: 4500,
1110+
Port: 0,
11111111
}))
11121112
Expect(err).NotTo(HaveOccurred())
11131113
})
11141114

11151115
It("should issue an exclude command", func() {
11161116
Expect(mockRunner.receivedBinary[0]).NotTo(BeEmpty())
1117-
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.5:4500"))
1118-
Expect(mockRunner.receivedArgs[1]).To(ContainElements("exclude 192.168.0.1:4500 192.168.0.2:4500"))
1117+
Expect(mockRunner.receivedArgs[0]).To(ContainElements("exclude 192.168.0.5"))
1118+
Expect(mockRunner.receivedArgs[1]).To(ContainElements("exclude 192.168.0.1 192.168.0.2"))
11191119
})
11201120

11211121
When("the exclude command returns an error different from the timeout error", func() {
@@ -1126,7 +1126,7 @@ protocol fdb00b071010000`,
11261126
It("should return the missing address and no error", func() {
11271127
Expect(result).To(ConsistOf(fdbv1beta2.ProcessAddress{
11281128
IPAddress: net.ParseIP("192.168.0.5"),
1129-
Port: 4500,
1129+
Port: 0,
11301130
}))
11311131
Expect(err).NotTo(HaveOccurred())
11321132
})

internal/coordination/coordination.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,9 +199,27 @@ func GetAddressesFromStatus(logger logr.Logger, status *fdbv1beta2.FoundationDBS
199199
return addresses
200200
}
201201

202+
// GetProcessesFromProcessMap returns the slice of processes matching the process group ID.
203+
func GetProcessesFromProcessMap(processGroupID fdbv1beta2.ProcessGroupID, processesMap map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.FoundationDBStatusProcessInfo) []fdbv1beta2.FoundationDBStatusProcessInfo {
204+
if len(processesMap) == 0 {
205+
return nil
206+
}
207+
208+
processes, ok := processesMap[processGroupID]
209+
if !ok || len(processes) == 0 {
210+
// Fallback for multiple storage or log servers, those will contain the process information with the process number as a suffix.
211+
processes, ok = processesMap[processGroupID+"-1"]
212+
if !ok || len(processes) == 0 {
213+
return nil
214+
}
215+
}
216+
217+
return processes
218+
}
219+
202220
// UpdateGlobalCoordinationState will update the state for global synchronization. If the synchronization mode is local,
203221
// this method will skip all work.
204-
func UpdateGlobalCoordinationState(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, adminClient fdbadminclient.AdminClient) error {
222+
func UpdateGlobalCoordinationState(logger logr.Logger, cluster *fdbv1beta2.FoundationDBCluster, adminClient fdbadminclient.AdminClient, processesMap map[fdbv1beta2.ProcessGroupID][]fdbv1beta2.FoundationDBStatusProcessInfo) error {
205223
// If the synchronization mode is local (default) skip all work. If the mode is changed from global to local
206224
// the human operator must clean up.
207225
if cluster.GetSynchronizationMode() == fdbv1beta2.SynchronizationModeLocal {
@@ -273,8 +291,16 @@ func UpdateGlobalCoordinationState(logger logr.Logger, cluster *fdbv1beta2.Found
273291
updatesPendingForRemoval[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
274292
}
275293

276-
// Only add the process group if the exclusion is not done yet.
277-
if !processGroup.IsExcluded() {
294+
processes := GetProcessesFromProcessMap(processGroup.ProcessGroupID, processesMap)
295+
296+
var excluded bool
297+
for _, process := range processes {
298+
excluded = excluded || process.Excluded
299+
}
300+
301+
// Only add the process group if the exclusion is not done yet, either because the process group has the
302+
// exclusion timestamp set or because the processes are excluded.
303+
if !(processGroup.IsExcluded() || excluded) {
278304
if _, ok := pendingForExclusion[processGroup.ProcessGroupID]; !ok {
279305
updatesPendingForExclusion[processGroup.ProcessGroupID] = fdbv1beta2.UpdateActionAdd
280306
}

internal/coordination/coordination_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ var _ = Describe("operator_coordination", func() {
5959
JustBeforeEach(func() {
6060
adminClient, err := mock.NewMockAdminClientUncast(cluster, k8sClient)
6161
Expect(err).NotTo(HaveOccurred())
62-
Expect(UpdateGlobalCoordinationState(logr.Discard(), cluster, adminClient)).To(Succeed())
62+
Expect(UpdateGlobalCoordinationState(logr.Discard(), cluster, adminClient, nil)).To(Succeed())
6363
})
6464

6565
When("a process group is marked for removal", func() {

0 commit comments

Comments
 (0)