Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cluster/majorversionupgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (c *Cluster) majorVersionUpgrade() error {
return nil
}

if !c.isInMainternanceWindow() {
if !isInMainternanceWindow(c.Spec.MaintenanceWindows) {
c.logger.Infof("skipping major version upgrade, not in maintenance window")
return nil
}
Expand Down
56 changes: 46 additions & 10 deletions pkg/cluster/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
createPublications[slotName] = tableList
} else if currentTables != tableList {
alterPublications[slotName] = tableList
} else {
(*slotsToSync)[slotName] = slotAndPublication.Slot
}
}

Expand All @@ -142,30 +144,34 @@ func (c *Cluster) syncPublication(dbName string, databaseSlotsList map[string]za
return nil
}

var errorMessage error = nil
errors := make([]string, 0)
for publicationName, tables := range createPublications {
if err = c.executeCreatePublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("creation of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("creation of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for publicationName, tables := range alterPublications {
if err = c.executeAlterPublication(publicationName, tables); err != nil {
errorMessage = fmt.Errorf("update of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("update of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = databaseSlotsList[publicationName].Slot
}
for _, publicationName := range deletePublications {
if err = c.executeDropPublication(publicationName); err != nil {
errorMessage = fmt.Errorf("deletion of publication %q failed: %v", publicationName, err)
errors = append(errors, fmt.Sprintf("deletion of publication %q failed: %v", publicationName, err))
continue
}
(*slotsToSync)[publicationName] = nil
}

return errorMessage
if len(errors) > 0 {
return fmt.Errorf("%v", strings.Join(errors, `', '`))
}

return nil
}

func (c *Cluster) generateFabricEventStream(appId string) *zalandov1.FabricEventStream {
Expand Down Expand Up @@ -370,7 +376,7 @@ func (c *Cluster) syncStreams() error {
for dbName, databaseSlotsList := range databaseSlots {
err := c.syncPublication(dbName, databaseSlotsList, &slotsToSync)
if err != nil {
c.logger.Warningf("could not sync publications in database %q: %v", dbName, err)
c.logger.Warningf("could not sync all publications in database %q: %v", dbName, err)
continue
}
}
Expand Down Expand Up @@ -398,7 +404,7 @@ func (c *Cluster) syncStreams() error {
c.logger.Warningf("could not sync event streams with applicationId %s: %v", appId, err)
}
} else {
c.logger.Warningf("database replication slots for streams with applicationId %s not in sync, skipping event stream sync", appId)
c.logger.Warningf("database replication slots %#v for streams with applicationId %s not in sync, skipping event stream sync", slotsToSync, appId)
}
}

Expand All @@ -415,8 +421,9 @@ func hasSlotsInSync(appId string, databaseSlots map[string]map[string]zalandov1.
for dbName, slots := range databaseSlots {
for slotName := range slots {
if slotName == getSlotName(dbName, appId) {
if _, exists := slotsToSync[slotName]; !exists {
if slot, exists := slotsToSync[slotName]; !exists || slot == nil {
allSlotsInSync = false
continue
}
}
}
Expand All @@ -432,7 +439,17 @@ func (c *Cluster) syncStream(appId string) error {
if appId == stream.Spec.ApplicationId {
streamExists = true
desiredStreams := c.generateFabricEventStream(appId)
if match, reason := sameStreams(stream.Spec.EventStreams, desiredStreams.Spec.EventStreams); !match {
if !reflect.DeepEqual(stream.ObjectMeta.OwnerReferences, desiredStreams.ObjectMeta.OwnerReferences) {
c.logger.Infof("owner references of event streams with applicationId %s do not match the current ones", appId)
stream.ObjectMeta.OwnerReferences = desiredStreams.ObjectMeta.OwnerReferences
c.setProcessName("updating event streams with applicationId %s", appId)
stream, err := c.KubeClient.FabricEventStreams(stream.Namespace).Update(context.TODO(), stream, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("could not update event streams with applicationId %s: %v", appId, err)
}
c.Streams[appId] = stream
}
if match, reason := c.compareStreams(stream, desiredStreams); !match {
c.logger.Debugf("updating event streams with applicationId %s: %s", appId, reason)
desiredStreams.ObjectMeta = stream.ObjectMeta
updatedStream, err := c.updateStreams(desiredStreams)
Expand All @@ -459,7 +476,26 @@ func (c *Cluster) syncStream(appId string) error {
return nil
}

func sameStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
func (c *Cluster) compareStreams(curEventStreams, newEventStreams *zalandov1.FabricEventStream) (match bool, reason string) {
reasons := make([]string, 0)
match = true

// stream operator can add extra annotations so incl. current annotations in desired annotations
desiredAnnotations := c.annotationsSet(curEventStreams.Annotations)
if changed, reason := c.compareAnnotations(curEventStreams.ObjectMeta.Annotations, desiredAnnotations); changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams annotations do not match: %s", reason))
}

if changed, reason := sameEventStreams(curEventStreams.Spec.EventStreams, newEventStreams.Spec.EventStreams); !changed {
match = false
reasons = append(reasons, fmt.Sprintf("new streams EventStreams array does not match : %s", reason))
}

return match, strings.Join(reasons, ", ")
}

func sameEventStreams(curEventStreams, newEventStreams []zalandov1.EventStream) (match bool, reason string) {
if len(newEventStreams) != len(curEventStreams) {
return false, "number of defined streams is different"
}
Expand Down
Loading