@@ -27,6 +27,7 @@ import (
27
27
"context"
28
28
"fmt"
29
29
"net/url"
30
+ "reflect"
30
31
"strings"
31
32
"sync"
32
33
"time"
@@ -73,9 +74,6 @@ type UpgradeManager interface {
73
74
74
75
// RunWatchUpgradePlan keeps watching the upgrade plan until the given context is canceled.
75
76
RunWatchUpgradePlan (context.Context )
76
-
77
- // UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
78
- UpgradePlanChangedCallback ()
79
77
}
80
78
81
79
// UpgradeManagerContext holds methods used by the upgrade manager to control its context.
@@ -155,6 +153,11 @@ func (p *UpgradePlan) ResetFailures() {
155
153
}
156
154
}
157
155
156
+ // Equals returns true if other plan is the same
157
+ func (p UpgradePlan ) Equals (other UpgradePlan ) bool {
158
+ return reflect .DeepEqual (p , other )
159
+ }
160
+
158
161
// UpgradeEntryType is a strongly typed upgrade plan item
159
162
type UpgradeEntryType string
160
163
@@ -823,39 +826,74 @@ func (m *upgradeManager) removeUpgradePlan(ctx context.Context) error {
823
826
return nil
824
827
}
825
828
829
+ // waitForPlanChanges returns unbuffered channel on which updated UpgradePlan will be delivered.
830
+ // Closing channel means that there will be no more changes.
831
+ func (m * upgradeManager ) waitForPlanChanges (ctx context.Context ) chan * UpgradePlan {
832
+ ch := make (chan * UpgradePlan )
833
+
834
+ go func () {
835
+ defer close (ch )
836
+ var oldPlan UpgradePlan
837
+ for {
838
+ delay := time .Second * 3
839
+ plan , err := m .readUpgradePlan (ctx )
840
+ if agency .IsKeyNotFound (err ) || plan .IsEmpty () {
841
+ // Just try later
842
+ } else if err != nil {
843
+ // Failed to read plan
844
+ m .log .Info ().Err (err ).Msg ("Failed to read upgrade plan" )
845
+ } else if ! oldPlan .Equals (plan ) {
846
+ ch <- & plan
847
+ oldPlan = plan
848
+ delay = time .Second
849
+ }
850
+
851
+ timer := time .NewTimer (delay )
852
+ select {
853
+ case <- timer .C :
854
+ // Continue
855
+ case <- ctx .Done ():
856
+ // Context canceled
857
+ if ! timer .Stop () {
858
+ <- timer .C
859
+ }
860
+ return
861
+ }
862
+ }
863
+ }()
864
+
865
+ return ch
866
+ }
867
+
826
868
// RunWatchUpgradePlan keeps watching the upgrade plan in the agency.
827
869
// Once it detects that this starter has to act, it does.
828
870
func (m * upgradeManager ) RunWatchUpgradePlan (ctx context.Context ) {
829
- _ , myPeer , mode := m .upgradeManagerContext .ClusterConfig ()
830
- ownURL := myPeer .CreateStarterURL ("/" )
871
+ _ , _ , mode := m .upgradeManagerContext .ClusterConfig ()
831
872
if ! mode .HasAgency () {
832
873
// Nothing to do here without an agency
833
874
return
834
875
}
835
- registeredCallback := false
836
- defer func () {
837
- if registeredCallback {
838
- m .unregisterUpgradePlanChangedCallback (ctx , ownURL )
839
- }
840
- }()
876
+
877
+ planChanges := m .waitForPlanChanges (ctx )
841
878
for {
842
- delay := time .Minute
843
- if ! registeredCallback {
844
- m .log .Debug ().Msg ("Registering upgrade plan changed callback..." )
845
- if err := m .registerUpgradePlanChangedCallback (ctx , ownURL ); err != nil {
846
- m .log .Info ().Err (err ).Msg ("Failed to register upgrade plan changed callback" )
847
- } else {
848
- registeredCallback = true
849
- }
879
+ var newPlan * UpgradePlan
880
+ select {
881
+ case newPlan = <- planChanges :
882
+ // Plan changed
883
+ case <- ctx .Done ():
884
+ // Context canceled
885
+ m .log .Info ().Msg ("Stopping watching for plan changes: context canceled" )
886
+ return
887
+ }
888
+ if newPlan == nil {
889
+ // channel was closed
890
+ m .log .Info ().Msg ("Stopping watching for plan changes" )
891
+ return
850
892
}
851
- plan , err := m .readUpgradePlan (ctx )
852
- if agency .IsKeyNotFound (err ) || plan .IsEmpty () {
853
- // Just try later
854
- } else if err != nil {
855
- // Failed to read plan
856
- m .log .Info ().Err (err ).Msg ("Failed to read upgrade plan" )
857
- } else if plan .IsReady () {
858
- // Plan entries have aal been processes
893
+
894
+ plan := * newPlan
895
+ if plan .IsReady () {
896
+ // Plan entries have been processed
859
897
if ! plan .Finished {
860
898
// Let's show the user that we're done
861
899
if err := m .finishUpgradePlan (ctx , plan ); err != nil {
@@ -869,66 +907,10 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
869
907
if err := m .processUpgradePlan (ctx , plan ); err != nil {
870
908
m .log .Error ().Err (err ).Msg ("Failed to process upgrade plan entry" )
871
909
}
872
- delay = time .Second
873
- }
874
-
875
- select {
876
- case <- time .After (delay ):
877
- // Continue
878
- case <- m .cbTrigger .Done ():
879
- // Continue
880
- case <- ctx .Done ():
881
- // Context canceled
882
- return
883
910
}
884
911
}
885
912
}
886
913
887
- // UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
888
- func (m * upgradeManager ) UpgradePlanChangedCallback () {
889
- m .cbTrigger .Trigger ()
890
- }
891
-
892
- // registerUpgradePlanChangedCallback registers our callback URL with the agency
893
- func (m * upgradeManager ) registerUpgradePlanChangedCallback (ctx context.Context , ownURL string ) error {
894
- // Get api client
895
- api , err := m .createAgencyAPI ()
896
- if err != nil {
897
- return maskAny (err )
898
- }
899
- // Register callback
900
- cbURL , err := getURLWithPath (ownURL , "/cb/upgradePlanChanged" )
901
- if err != nil {
902
- return maskAny (err )
903
- }
904
- ctx , cancel := context .WithTimeout (ctx , time .Second * 10 )
905
- defer cancel ()
906
- if err := api .RegisterChangeCallback (ctx , upgradePlanKey , cbURL ); err != nil {
907
- return maskAny (err )
908
- }
909
- return nil
910
- }
911
-
912
- // unregisterUpgradePlanChangedCallback removes our callback URL from the agency
913
- func (m * upgradeManager ) unregisterUpgradePlanChangedCallback (ctx context.Context , ownURL string ) error {
914
- // Get api client
915
- api , err := m .createAgencyAPI ()
916
- if err != nil {
917
- return maskAny (err )
918
- }
919
- // Register callback
920
- cbURL , err := getURLWithPath (ownURL , "/cb/upgradePlanChanged" )
921
- if err != nil {
922
- return maskAny (err )
923
- }
924
- ctx , cancel := context .WithTimeout (ctx , time .Second * 10 )
925
- defer cancel ()
926
- if err := api .UnregisterChangeCallback (ctx , upgradePlanKey , cbURL ); err != nil {
927
- return maskAny (err )
928
- }
929
- return nil
930
- }
931
-
932
914
// processUpgradePlan inspects the first entry of the given plan and acts upon
933
915
// it when needed.
934
916
func (m * upgradeManager ) processUpgradePlan (ctx context.Context , plan UpgradePlan ) error {
0 commit comments