Skip to content

Commit 486d5d6

Browse files
idanovindaFxKu
andauthored
Allow drop slots when it gets deleted from the manifest (#2089)
* Allow drop slots when it gets deleted from the manifest * use leader instead replica to query slots * fix and extend unit tests for config update checks Co-authored-by: Felix Kunde <felix-kunde@gmx.de>
1 parent 819e410 commit 486d5d6

File tree

4 files changed

+223
-14
lines changed

4 files changed

+223
-14
lines changed

e2e/tests/test_e2e.py

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,13 @@ def test_config_update(self):
395395
"spec": {
396396
"postgresql": {
397397
"parameters": {
398-
"max_connections": new_max_connections_value
398+
"max_connections": new_max_connections_value,
399+
"wal_level": "logical"
399400
}
400401
},
401402
"patroni": {
402403
"slots": {
403-
"test_slot": {
404+
"first_slot": {
404405
"type": "physical"
405406
}
406407
},
@@ -437,6 +438,8 @@ def compare_config():
437438
"synchronous_mode not updated")
438439
self.assertEqual(desired_config["failsafe_mode"], effective_config["failsafe_mode"],
439440
"failsafe_mode not updated")
441+
self.assertEqual(desired_config["slots"], effective_config["slots"],
442+
"slots not updated")
440443
return True
441444

442445
# check if Patroni config has been updated
@@ -497,6 +500,84 @@ def compare_config():
497500
self.eventuallyEqual(lambda: self.query_database(replica.metadata.name, "postgres", setting_query)[0], lower_max_connections_value,
498501
"Previous max_connections setting not applied on replica", 10, 5)
499502

503+
# patch new slot via Patroni REST
504+
patroni_slot = "test_patroni_slot"
505+
patch_slot_command = """curl -s -XPATCH -d '{"slots": {"test_patroni_slot": {"type": "physical"}}}' localhost:8008/config"""
506+
pg_patch_config["spec"]["patroni"]["slots"][patroni_slot] = {"type": "physical"}
507+
508+
k8s.exec_with_kubectl(leader.metadata.name, patch_slot_command)
509+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
510+
self.eventuallyTrue(compare_config, "Postgres config not applied")
511+
512+
# test adding new slots
513+
pg_add_new_slots_patch = {
514+
"spec": {
515+
"patroni": {
516+
"slots": {
517+
"test_slot": {
518+
"type": "logical",
519+
"database": "foo",
520+
"plugin": "pgoutput"
521+
},
522+
"test_slot_2": {
523+
"type": "physical"
524+
}
525+
}
526+
}
527+
}
528+
}
529+
530+
for slot_name, slot_details in pg_add_new_slots_patch["spec"]["patroni"]["slots"].items():
531+
pg_patch_config["spec"]["patroni"]["slots"][slot_name] = slot_details
532+
533+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
534+
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_add_new_slots_patch)
535+
536+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
537+
self.eventuallyTrue(compare_config, "Postgres config not applied")
538+
539+
# delete test_slot_2 from config and change the database type for test_slot
540+
slot_to_change = "test_slot"
541+
slot_to_remove = "test_slot_2"
542+
pg_delete_slot_patch = {
543+
"spec": {
544+
"patroni": {
545+
"slots": {
546+
"test_slot": {
547+
"type": "logical",
548+
"database": "bar",
549+
"plugin": "pgoutput"
550+
},
551+
"test_slot_2": None
552+
}
553+
}
554+
}
555+
}
556+
557+
pg_patch_config["spec"]["patroni"]["slots"][slot_to_change]["database"] = "bar"
558+
del pg_patch_config["spec"]["patroni"]["slots"][slot_to_remove]
559+
560+
k8s.api.custom_objects_api.patch_namespaced_custom_object(
561+
"acid.zalan.do", "v1", "default", "postgresqls", "acid-minimal-cluster", pg_delete_slot_patch)
562+
563+
self.eventuallyEqual(lambda: k8s.get_operator_state(), {"0": "idle"}, "Operator does not get in sync")
564+
self.eventuallyTrue(compare_config, "Postgres config not applied")
565+
566+
get_slot_query = """
567+
SELECT %s
568+
FROM pg_replication_slots
569+
WHERE slot_name = '%s';
570+
"""
571+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", slot_to_remove))), 0,
572+
"The replication slot cannot be deleted", 10, 5)
573+
574+
self.eventuallyEqual(lambda: self.query_database(leader.metadata.name, "postgres", get_slot_query%("database", slot_to_change))[0], "bar",
575+
"The replication slot cannot be updated", 10, 5)
576+
577+
# make sure slot from Patroni didn't get deleted
578+
self.eventuallyEqual(lambda: len(self.query_database(leader.metadata.name, "postgres", get_slot_query%("slot_name", patroni_slot))), 1,
579+
"The replication slot from Patroni gets deleted", 10, 5)
580+
500581
except timeout_decorator.TimeoutError:
501582
print('Operator log: {}'.format(k8s.get_operator_log()))
502583
raise

pkg/cluster/cluster.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ type Cluster struct {
8484
userSyncStrategy spec.UserSyncer
8585
deleteOptions metav1.DeleteOptions
8686
podEventsQueue *cache.FIFO
87+
replicationSlots map[string]interface{}
8788

8889
teamsAPIClient teams.Interface
8990
oauthTokenGetter OAuthTokenGetter
@@ -140,6 +141,7 @@ func New(cfg Config, kubeClient k8sutil.KubernetesClient, pgSpec acidv1.Postgres
140141
podEventsQueue: podEventsQueue,
141142
KubeClient: kubeClient,
142143
currentMajorVersion: 0,
144+
replicationSlots: make(map[string]interface{}),
143145
}
144146
cluster.logger = logger.WithField("pkg", "cluster").WithField("cluster-name", cluster.clusterName())
145147
cluster.teamsAPIClient = teams.NewTeamsAPI(cfg.OpConfig.TeamsAPIUrl, logger)
@@ -374,6 +376,10 @@ func (c *Cluster) Create() error {
374376
}
375377
}
376378

379+
for slotName, desiredSlot := range c.Spec.Patroni.Slots {
380+
c.replicationSlots[slotName] = desiredSlot
381+
}
382+
377383
return nil
378384
}
379385

pkg/cluster/sync.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -579,15 +579,26 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
579579
}
580580
}
581581

582+
slotsToSet := make(map[string]interface{})
583+
// check if there is any slot deletion
584+
for slotName, effectiveSlot := range c.replicationSlots {
585+
if desiredSlot, exists := desiredPatroniConfig.Slots[slotName]; exists {
586+
if reflect.DeepEqual(effectiveSlot, desiredSlot) {
587+
continue
588+
}
589+
}
590+
slotsToSet[slotName] = nil
591+
delete(c.replicationSlots, slotName)
592+
}
582593
// check if specified slots exist in config and if they differ
583-
slotsToSet := make(map[string]map[string]string)
584594
for slotName, desiredSlot := range desiredPatroniConfig.Slots {
585595
if effectiveSlot, exists := effectivePatroniConfig.Slots[slotName]; exists {
586596
if reflect.DeepEqual(desiredSlot, effectiveSlot) {
587597
continue
588598
}
589599
}
590600
slotsToSet[slotName] = desiredSlot
601+
c.replicationSlots[slotName] = desiredSlot
591602
}
592603
if len(slotsToSet) > 0 {
593604
configToSet["slots"] = slotsToSet
@@ -614,7 +625,7 @@ func (c *Cluster) checkAndSetGlobalPostgreSQLConfiguration(pod *v1.Pod, effectiv
614625
}
615626

616627
// check if there exist only config updates that require a restart of the primary
617-
if !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 {
628+
if len(restartPrimary) > 0 && !util.SliceContains(restartPrimary, false) && len(configToSet) == 0 {
618629
requiresMasterRestart = true
619630
}
620631

pkg/cluster/sync_test.go

Lines changed: 121 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,13 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
144144
client, _ := newFakeK8sSyncClient()
145145
clusterName := "acid-test-cluster"
146146
namespace := "default"
147+
testSlots := map[string]map[string]string{
148+
"slot1": {
149+
"type": "logical",
150+
"plugin": "wal2json",
151+
"database": "foo",
152+
},
153+
}
147154

148155
ctrl := gomock.NewController(t)
149156
defer ctrl.Finish()
@@ -208,11 +215,26 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
208215

209216
// simulate existing config that differs from cluster.Spec
210217
tests := []struct {
211-
subtest string
212-
patroni acidv1.Patroni
213-
pgParams map[string]string
214-
restartPrimary bool
218+
subtest string
219+
patroni acidv1.Patroni
220+
desiredSlots map[string]map[string]string
221+
removedSlots map[string]map[string]string
222+
pgParams map[string]string
223+
shouldBePatched bool
224+
restartPrimary bool
215225
}{
226+
{
227+
subtest: "Patroni and Postgresql.Parameters do not differ",
228+
patroni: acidv1.Patroni{
229+
TTL: 20,
230+
},
231+
pgParams: map[string]string{
232+
"log_min_duration_statement": "200",
233+
"max_connections": "50",
234+
},
235+
shouldBePatched: false,
236+
restartPrimary: false,
237+
},
216238
{
217239
subtest: "Patroni and Postgresql.Parameters differ - restart replica first",
218240
patroni: acidv1.Patroni{
@@ -222,7 +244,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
222244
"log_min_duration_statement": "500", // desired 200
223245
"max_connections": "100", // desired 50
224246
},
225-
restartPrimary: false,
247+
shouldBePatched: true,
248+
restartPrimary: false,
226249
},
227250
{
228251
subtest: "multiple Postgresql.Parameters differ - restart replica first",
@@ -231,7 +254,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
231254
"log_min_duration_statement": "500", // desired 200
232255
"max_connections": "100", // desired 50
233256
},
234-
restartPrimary: false,
257+
shouldBePatched: true,
258+
restartPrimary: false,
235259
},
236260
{
237261
subtest: "desired max_connections bigger - restart replica first",
@@ -240,7 +264,8 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
240264
"log_min_duration_statement": "200",
241265
"max_connections": "30", // desired 50
242266
},
243-
restartPrimary: false,
267+
shouldBePatched: true,
268+
restartPrimary: false,
244269
},
245270
{
246271
subtest: "desired max_connections smaller - restart master first",
@@ -249,19 +274,105 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
249274
"log_min_duration_statement": "200",
250275
"max_connections": "100", // desired 50
251276
},
252-
restartPrimary: true,
277+
shouldBePatched: true,
278+
restartPrimary: true,
279+
},
280+
{
281+
subtest: "slot does not exist but is desired",
282+
patroni: acidv1.Patroni{
283+
TTL: 20,
284+
},
285+
desiredSlots: testSlots,
286+
pgParams: map[string]string{
287+
"log_min_duration_statement": "200",
288+
"max_connections": "50",
289+
},
290+
shouldBePatched: true,
291+
restartPrimary: false,
292+
},
293+
{
294+
subtest: "slot exist, nothing specified in manifest",
295+
patroni: acidv1.Patroni{
296+
TTL: 20,
297+
Slots: map[string]map[string]string{
298+
"slot1": {
299+
"type": "logical",
300+
"plugin": "pgoutput",
301+
"database": "foo",
302+
},
303+
},
304+
},
305+
pgParams: map[string]string{
306+
"log_min_duration_statement": "200",
307+
"max_connections": "50",
308+
},
309+
shouldBePatched: false,
310+
restartPrimary: false,
311+
},
312+
{
313+
subtest: "slot is removed from manifest",
314+
patroni: acidv1.Patroni{
315+
TTL: 20,
316+
Slots: map[string]map[string]string{
317+
"slot1": {
318+
"type": "logical",
319+
"plugin": "pgoutput",
320+
"database": "foo",
321+
},
322+
},
323+
},
324+
removedSlots: testSlots,
325+
pgParams: map[string]string{
326+
"log_min_duration_statement": "200",
327+
"max_connections": "50",
328+
},
329+
shouldBePatched: true,
330+
restartPrimary: false,
331+
},
332+
{
333+
subtest: "slot plugin differs",
334+
patroni: acidv1.Patroni{
335+
TTL: 20,
336+
Slots: map[string]map[string]string{
337+
"slot1": {
338+
"type": "logical",
339+
"plugin": "pgoutput",
340+
"database": "foo",
341+
},
342+
},
343+
},
344+
desiredSlots: testSlots,
345+
pgParams: map[string]string{
346+
"log_min_duration_statement": "200",
347+
"max_connections": "50",
348+
},
349+
shouldBePatched: true,
350+
restartPrimary: false,
253351
},
254352
}
255353

256354
for _, tt := range tests {
355+
if len(tt.desiredSlots) > 0 {
356+
cluster.Spec.Patroni.Slots = tt.desiredSlots
357+
}
358+
if len(tt.removedSlots) > 0 {
359+
for slotName, removedSlot := range tt.removedSlots {
360+
cluster.replicationSlots[slotName] = removedSlot
361+
}
362+
}
363+
257364
configPatched, requirePrimaryRestart, err := cluster.checkAndSetGlobalPostgreSQLConfiguration(mockPod, tt.patroni, cluster.Spec.Patroni, tt.pgParams, cluster.Spec.Parameters)
258365
assert.NoError(t, err)
259-
if configPatched != true {
366+
if configPatched != tt.shouldBePatched {
260367
t.Errorf("%s - %s: expected config update did not happen", testName, tt.subtest)
261368
}
262369
if requirePrimaryRestart != tt.restartPrimary {
263370
t.Errorf("%s - %s: wrong master restart strategy, got restart %v, expected restart %v", testName, tt.subtest, requirePrimaryRestart, tt.restartPrimary)
264371
}
372+
373+
// reset slots for next tests
374+
cluster.Spec.Patroni.Slots = nil
375+
cluster.replicationSlots = make(map[string]interface{})
265376
}
266377

267378
testsFailsafe := []struct {
@@ -342,7 +453,7 @@ func TestCheckAndSetGlobalPostgreSQLConfiguration(t *testing.T) {
342453
effectiveVal: util.True(),
343454
desiredVal: true,
344455
shouldBePatched: false, // should not require patching
345-
restartPrimary: true,
456+
restartPrimary: false,
346457
},
347458
}
348459

0 commit comments

Comments
 (0)