Skip to content

Commit 546344f

Browse files
committed
CSPL-3551 Update of conf files when queue name or type change
1 parent 4b064a6 commit 546344f

File tree

5 files changed

+64
-31
lines changed

5 files changed

+64
-31
lines changed

pkg/splunk/client/enterprise.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,16 @@ func (c *SplunkClient) UpdateConfFile(fileName, property string, propertyKVList
10041004
err = c.Do(request, expectedStatus, nil)
10051005
return err
10061006
}
1007+
1008+
// Deletes conf files properties
1009+
func (c *SplunkClient) DeleteConfFileProperty(fileName, property string) error {
1010+
endpoint := fmt.Sprintf("%s/servicesNS/nobody/system/configs/conf-%s/%s", c.ManagementURI, fileName, property)
1011+
1012+
request, err := http.NewRequest("DELETE", endpoint, nil)
1013+
if err != nil {
1014+
return err
1015+
}
1016+
1017+
expectedStatus := []int{200, 201, 404}
1018+
return c.Do(request, expectedStatus, nil)
1019+
}

pkg/splunk/enterprise/indexercluster.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,17 @@ func (mgr *indexerClusterPodManager) handlePullBusOrPipelineConfigChange(ctx con
12771277
}
12781278
splunkClient := newSplunkClientForPullBusPipeline(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd))
12791279

1280-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR)
1280+
afterDelete := false
1281+
if (newCR.Spec.PullBus.SQS.QueueName != "" && newCR.Status.PullBus.SQS.QueueName != "" && newCR.Spec.PullBus.SQS.QueueName != newCR.Status.PullBus.SQS.QueueName) ||
1282+
(newCR.Spec.PullBus.Type != "" && newCR.Status.PullBus.Type != "" && newCR.Spec.PullBus.Type != newCR.Status.PullBus.Type) ||
1283+
(newCR.Spec.PullBus.SQS.RetryPolicy != "" && newCR.Status.PullBus.SQS.RetryPolicy != "" && newCR.Spec.PullBus.SQS.RetryPolicy != newCR.Status.PullBus.SQS.RetryPolicy) {
1284+
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.PullBus.SQS.QueueName)); err != nil {
1285+
updateErr = err
1286+
}
1287+
afterDelete = true
1288+
}
1289+
1290+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR, afterDelete)
12811291

12821292
for _, pbVal := range pullBusChangedFieldsOutputs {
12831293
if err := splunkClient.UpdateConfFile("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PullBus.SQS.QueueName), [][]string{pbVal}); err != nil {
@@ -1302,15 +1312,15 @@ func (mgr *indexerClusterPodManager) handlePullBusOrPipelineConfigChange(ctx con
13021312
return updateErr
13031313
}
13041314

1305-
func getChangedPullBusAndPipelineFieldsIndexer(oldCrStatus *enterpriseApi.IndexerClusterStatus, newCR *enterpriseApi.IndexerCluster) (pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields [][]string) {
1315+
func getChangedPullBusAndPipelineFieldsIndexer(oldCrStatus *enterpriseApi.IndexerClusterStatus, newCR *enterpriseApi.IndexerCluster, afterDelete bool) (pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields [][]string) {
13061316
// Compare PullBus fields
13071317
oldPB := oldCrStatus.PullBus
13081318
newPB := newCR.Spec.PullBus
13091319
oldPC := oldCrStatus.PipelineConfig
13101320
newPC := newCR.Spec.PipelineConfig
13111321

13121322
// Push all PullBus fields
1313-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs = pullBusChanged(oldPB, newPB)
1323+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs = pullBusChanged(oldPB, newPB, afterDelete)
13141324

13151325
// Always set all pipeline fields, not just changed ones
13161326
pipelineChangedFields = pipelineConfigChanged(oldPC, newPC, oldCrStatus.PullBus.SQS.QueueName != "", true)
@@ -1329,37 +1339,37 @@ func imageUpdatedTo9(previousImage string, currentImage string) bool {
13291339
return strings.HasPrefix(previousVersion, "8") && strings.HasPrefix(currentVersion, "9")
13301340
}
13311341

1332-
func pullBusChanged(oldPullBus, newPullBus enterpriseApi.PushBusSpec) (inputs, outputs [][]string) {
1333-
if oldPullBus.Type != newPullBus.Type {
1342+
func pullBusChanged(oldPullBus, newPullBus enterpriseApi.PushBusSpec, afterDelete bool) (inputs, outputs [][]string) {
1343+
if oldPullBus.Type != newPullBus.Type || afterDelete {
13341344
inputs = append(inputs, []string{"remote_queue.type", newPullBus.Type})
13351345
}
1336-
if oldPullBus.SQS.AuthRegion != newPullBus.SQS.AuthRegion {
1346+
if oldPullBus.SQS.AuthRegion != newPullBus.SQS.AuthRegion || afterDelete {
13371347
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.auth_region", newPullBus.Type), newPullBus.SQS.AuthRegion})
13381348
}
1339-
if oldPullBus.SQS.Endpoint != newPullBus.SQS.Endpoint {
1349+
if oldPullBus.SQS.Endpoint != newPullBus.SQS.Endpoint || afterDelete {
13401350
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.endpoint", newPullBus.Type), newPullBus.SQS.Endpoint})
13411351
}
1342-
if oldPullBus.SQS.LargeMessageStoreEndpoint != newPullBus.SQS.LargeMessageStoreEndpoint {
1352+
if oldPullBus.SQS.LargeMessageStoreEndpoint != newPullBus.SQS.LargeMessageStoreEndpoint || afterDelete {
13431353
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", newPullBus.Type), newPullBus.SQS.LargeMessageStoreEndpoint})
13441354
}
1345-
if oldPullBus.SQS.LargeMessageStorePath != newPullBus.SQS.LargeMessageStorePath {
1355+
if oldPullBus.SQS.LargeMessageStorePath != newPullBus.SQS.LargeMessageStorePath || afterDelete {
13461356
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.large_message_store.path", newPullBus.Type), newPullBus.SQS.LargeMessageStorePath})
13471357
}
1348-
if oldPullBus.SQS.DeadLetterQueueName != newPullBus.SQS.DeadLetterQueueName {
1358+
if oldPullBus.SQS.DeadLetterQueueName != newPullBus.SQS.DeadLetterQueueName || afterDelete {
13491359
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.dead_letter_queue.name", newPullBus.Type), newPullBus.SQS.DeadLetterQueueName})
13501360
}
1351-
if oldPullBus.SQS.MaxRetriesPerPart != newPullBus.SQS.MaxRetriesPerPart || oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy {
1361+
if oldPullBus.SQS.MaxRetriesPerPart != newPullBus.SQS.MaxRetriesPerPart || oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy || afterDelete {
13521362
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newPullBus.SQS.RetryPolicy, newPullBus.Type), fmt.Sprintf("%d", newPullBus.SQS.MaxRetriesPerPart)})
13531363
}
1354-
if oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy {
1364+
if oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy || afterDelete {
13551365
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.retry_policy", newPullBus.Type), newPullBus.SQS.RetryPolicy})
13561366
}
13571367

13581368
outputs = inputs
1359-
if oldPullBus.SQS.SendInterval != newPullBus.SQS.SendInterval {
1369+
if oldPullBus.SQS.SendInterval != newPullBus.SQS.SendInterval || afterDelete {
13601370
outputs = append(outputs, []string{fmt.Sprintf("remote_queue.%s.send_interval", newPullBus.Type), newPullBus.SQS.SendInterval})
13611371
}
1362-
if oldPullBus.SQS.EncodingFormat != newPullBus.SQS.EncodingFormat {
1372+
if oldPullBus.SQS.EncodingFormat != newPullBus.SQS.EncodingFormat || afterDelete {
13631373
outputs = append(outputs, []string{fmt.Sprintf("remote_queue.%s.encoding_format", newPullBus.Type), newPullBus.SQS.EncodingFormat})
13641374
}
13651375

pkg/splunk/enterprise/indexercluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,7 @@ func TestGetChangedPullBusAndPipelineFieldsIndexer(t *testing.T) {
20472047
},
20482048
}
20492049

2050-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR)
2050+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR, false)
20512051
assert.Equal(t, 8, len(pullBusChangedFieldsInputs))
20522052
assert.Equal(t, [][]string{
20532053
{"remote_queue.type", newCR.Spec.PullBus.Type},

pkg/splunk/enterprise/ingestorcluster.go

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,17 @@ func (mgr *ingestorClusterPodManager) handlePushBusOrPipelineConfigChange(ctx co
393393
}
394394
splunkClient := mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd))
395395

396-
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR)
396+
afterDelete := false
397+
if (newCR.Spec.PushBus.SQS.QueueName != "" && newCR.Status.PushBus.SQS.QueueName != "" && newCR.Spec.PushBus.SQS.QueueName != newCR.Status.PushBus.SQS.QueueName) ||
398+
(newCR.Spec.PushBus.Type != "" && newCR.Status.PushBus.Type != "" && newCR.Spec.PushBus.Type != newCR.Status.PushBus.Type) ||
399+
(newCR.Spec.PushBus.SQS.RetryPolicy != "" && newCR.Status.PushBus.SQS.RetryPolicy != "" && newCR.Spec.PushBus.SQS.RetryPolicy != newCR.Status.PushBus.SQS.RetryPolicy) {
400+
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.PushBus.SQS.QueueName)); err != nil {
401+
updateErr = err
402+
}
403+
afterDelete = true
404+
}
405+
406+
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR, afterDelete)
397407

398408
for _, pbVal := range pushBusChangedFields {
399409
if err := splunkClient.UpdateConfFile("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PushBus.SQS.QueueName), [][]string{pbVal}); err != nil {
@@ -413,14 +423,14 @@ func (mgr *ingestorClusterPodManager) handlePushBusOrPipelineConfigChange(ctx co
413423
}
414424

415425
// Returns the names of PushBus and PipelineConfig fields that changed between oldCR and newCR.
416-
func getChangedPushBusAndPipelineFields(oldCrStatus *enterpriseApi.IngestorClusterStatus, newCR *enterpriseApi.IngestorCluster) (pushBusChangedFields, pipelineChangedFields [][]string) {
426+
func getChangedPushBusAndPipelineFields(oldCrStatus *enterpriseApi.IngestorClusterStatus, newCR *enterpriseApi.IngestorCluster, afterDelete bool) (pushBusChangedFields, pipelineChangedFields [][]string) {
417427
oldPB := oldCrStatus.PushBus
418428
newPB := newCR.Spec.PushBus
419429
oldPC := oldCrStatus.PipelineConfig
420430
newPC := newCR.Spec.PipelineConfig
421431

422432
// Push changed PushBus fields
423-
pushBusChangedFields = pushBusChanged(oldPB, newPB)
433+
pushBusChangedFields = pushBusChanged(oldPB, newPB, afterDelete)
424434

425435
// Always changed pipeline fields
426436
pipelineChangedFields = pipelineConfigChanged(oldPC, newPC, oldCrStatus.PushBus.SQS.QueueName != "", false)
@@ -468,35 +478,35 @@ func pipelineConfigChanged(oldPipelineConfig, newPipelineConfig enterpriseApi.Pi
468478
return output
469479
}
470480

471-
func pushBusChanged(oldPushBus, newPushBus enterpriseApi.PushBusSpec) (output [][]string) {
472-
if oldPushBus.Type != newPushBus.Type {
481+
func pushBusChanged(oldPushBus, newPushBus enterpriseApi.PushBusSpec, afterDelete bool) (output [][]string) {
482+
if oldPushBus.Type != newPushBus.Type || afterDelete {
473483
output = append(output, []string{"remote_queue.type", newPushBus.Type})
474484
}
475-
if oldPushBus.SQS.EncodingFormat != newPushBus.SQS.EncodingFormat {
485+
if oldPushBus.SQS.EncodingFormat != newPushBus.SQS.EncodingFormat || afterDelete {
476486
output = append(output, []string{fmt.Sprintf("remote_queue.%s.encoding_format", newPushBus.Type), newPushBus.SQS.EncodingFormat})
477487
}
478-
if oldPushBus.SQS.AuthRegion != newPushBus.SQS.AuthRegion {
488+
if oldPushBus.SQS.AuthRegion != newPushBus.SQS.AuthRegion || afterDelete {
479489
output = append(output, []string{fmt.Sprintf("remote_queue.%s.auth_region", newPushBus.Type), newPushBus.SQS.AuthRegion})
480490
}
481-
if oldPushBus.SQS.Endpoint != newPushBus.SQS.Endpoint {
491+
if oldPushBus.SQS.Endpoint != newPushBus.SQS.Endpoint || afterDelete {
482492
output = append(output, []string{fmt.Sprintf("remote_queue.%s.endpoint", newPushBus.Type), newPushBus.SQS.Endpoint})
483493
}
484-
if oldPushBus.SQS.LargeMessageStoreEndpoint != newPushBus.SQS.LargeMessageStoreEndpoint {
494+
if oldPushBus.SQS.LargeMessageStoreEndpoint != newPushBus.SQS.LargeMessageStoreEndpoint || afterDelete {
485495
output = append(output, []string{fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", newPushBus.Type), newPushBus.SQS.LargeMessageStoreEndpoint})
486496
}
487-
if oldPushBus.SQS.LargeMessageStorePath != newPushBus.SQS.LargeMessageStorePath {
497+
if oldPushBus.SQS.LargeMessageStorePath != newPushBus.SQS.LargeMessageStorePath || afterDelete {
488498
output = append(output, []string{fmt.Sprintf("remote_queue.%s.large_message_store.path", newPushBus.Type), newPushBus.SQS.LargeMessageStorePath})
489499
}
490-
if oldPushBus.SQS.DeadLetterQueueName != newPushBus.SQS.DeadLetterQueueName {
500+
if oldPushBus.SQS.DeadLetterQueueName != newPushBus.SQS.DeadLetterQueueName || afterDelete {
491501
output = append(output, []string{fmt.Sprintf("remote_queue.%s.dead_letter_queue.name", newPushBus.Type), newPushBus.SQS.DeadLetterQueueName})
492502
}
493-
if oldPushBus.SQS.MaxRetriesPerPart != newPushBus.SQS.MaxRetriesPerPart || oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy {
503+
if oldPushBus.SQS.MaxRetriesPerPart != newPushBus.SQS.MaxRetriesPerPart || oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy || afterDelete {
494504
output = append(output, []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newPushBus.SQS.RetryPolicy, newPushBus.Type), fmt.Sprintf("%d", newPushBus.SQS.MaxRetriesPerPart)})
495505
}
496-
if oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy {
506+
if oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy || afterDelete {
497507
output = append(output, []string{fmt.Sprintf("remote_queue.%s.retry_policy", newPushBus.Type), newPushBus.SQS.RetryPolicy})
498508
}
499-
if oldPushBus.SQS.SendInterval != newPushBus.SQS.SendInterval {
509+
if oldPushBus.SQS.SendInterval != newPushBus.SQS.SendInterval || afterDelete {
500510
output = append(output, []string{fmt.Sprintf("remote_queue.%s.send_interval", newPushBus.Type), newPushBus.SQS.SendInterval})
501511
}
502512
return output

pkg/splunk/enterprise/ingestorcluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func TestApplyIngestorCluster(t *testing.T) {
273273
for i := 0; i < int(cr.Status.ReadyReplicas); i++ {
274274
podName := fmt.Sprintf("splunk-test-ingestor-%d", i)
275275
baseURL := fmt.Sprintf("https://%s.splunk-%s-ingestor-headless.%s.svc.cluster.local:8089/servicesNS/nobody/system/configs/conf-default-mode", podName, cr.GetName(), cr.GetNamespace())
276-
276+
277277
for _, field := range propertyKVList {
278278
req, _ := http.NewRequest("POST", baseURL, strings.NewReader(fmt.Sprintf("name=%s", field[0])))
279279
mockHTTPClient.AddHandler(req, 200, "", nil)
@@ -417,7 +417,7 @@ func TestGetChangedPushBusAndPipelineFieldsIngestor(t *testing.T) {
417417
},
418418
}
419419

420-
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR)
420+
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR, false)
421421

422422
assert.Equal(t, 10, len(pushBusChangedFields))
423423
assert.Equal(t, [][]string{

0 commit comments

Comments
 (0)