Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix sync streams and add diffs for annotations and owner references #2728

Merged
merged 4 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix sync streams and add diffs for annotations and owner references
  • Loading branch information
FxKu committed Aug 14, 2024
commit 612fe5a7ed0e29247418f21a2a7f8827b084311c
2 changes: 1 addition & 1 deletion pkg/cluster/majorversionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}

if !c.isInMainternanceWindow() {
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
Expand Down
58 changes: 45 additions & 13 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
createPublications[slotName] = tableList
} else if currentTables != tableList {
alterPublications[slotName] = tableList
} else {
(*slotsToSync)[slotName] = slotAndPublication.Slot
}
}

Expand All @@ -142,30 +144,34 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil
}

var errorMessage error = nil
errors := make([]string, 0)
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for _, publicationName := range deletePublications {
if err = c.executeDropPublication(publicationName); err != nil {
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = nil
}

return errorMessage
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -370,7 +376,7 @@ func (c *Cluster) syncStreams() error {
for dbName, databaseSlotsList := range databaseSlots {
err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
if err != nil {
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err)
continue
}
}
Expand All @@ -393,12 +399,12 @@ func (c *Cluster) syncStreams() error {
// there will be a separate event stream resource for each ID
appIds := getDistinctApplicationIds(c.Spec.Streams)
for _, appId := range appIds {
if c.hasSlotsInSync(appId, databaseSlots, slotsToSync) {
if hasSlotsInSync(appId, databaseSlots, slotsToSync) {
if err = c.syncStream(appId); err != nil {
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
}
} else {
c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId)
c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId)
}
}

Expand All @@ -410,14 +416,13 @@ func (c *Cluster) syncStreams() error {
return nil
}

func (c *Cluster) hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.Slot, slotsToSync map[string]map[string]string) bool {
allSlotsInSync := true
for dbName, slots := range databaseSlots {
for slotName := range slots {
if slotName == getSlotName(dbName, appId) {
if _, exists := slotsToSync[slotName]; !exists {
if slot, exists := slotsToSync[slotName]; !exists || slot == nil {
allSlotsInSync = false
c.logger.Warnf("replication slot %q for applicationId %s not found in database", slotName, appId)
continue
}
}
Expand All @@ -434,7 +439,17 @@ func (c *Cluster) syncStream(appId string) error {
if appId == stream.Spec.ApplicationId {
streamExists = true
desiredStreams := c.generateFabricEventStream(appId)
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
c.setProcessName("updating event streams with applicationId %s", appId)
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
}
c.Streams[appId] = stream
}
if match, reason := c.compareStreams(stream, desiredStreams); !match {
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
desiredStreams.ObjectMeta = stream.ObjectMeta
updatedStream, err := c.updateStreams(desiredStreams)
Expand All @@ -461,7 +476,24 @@ func (c *Cluster) syncStream(appId string) error {
return nil
}

func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) {
reasons := make([]string, 0)
match = true

if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, newEventStreams.ObjectMeta.Annotations); changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}

if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
}

return match, strings.Join(reasons, ", ")
}

func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
if len(newEventStreams) != len(curEventStreams) {
return false, "number of defined streams is different"
}
Expand Down
Loading
Loading