@@ -27,6 +27,7 @@ import (
27
27
"context"
28
28
"fmt"
29
29
"net/url"
30
+ "reflect"
30
31
"strings"
31
32
"sync"
32
33
"time"
@@ -73,9 +74,6 @@ type UpgradeManager interface {
73
74
74
75
// RunWatchUpgradePlan keeps watching the upgrade plan until the given context is canceled.
75
76
RunWatchUpgradePlan (context.Context )
76
-
77
- // UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
78
- UpgradePlanChangedCallback ()
79
77
}
80
78
81
79
// UpgradeManagerContext holds methods used by the upgrade manager to control its context.
@@ -155,6 +153,11 @@ func (p *UpgradePlan) ResetFailures() {
155
153
}
156
154
}
157
155
156
+ // Equals returns true if other plan is the same
157
+ func (p UpgradePlan ) Equals (other UpgradePlan ) bool {
158
+ return reflect .DeepEqual (p , other )
159
+ }
160
+
158
161
// UpgradeEntryType is a strongly typed upgrade plan item
159
162
type UpgradeEntryType string
160
163
@@ -823,39 +826,76 @@ func (m *upgradeManager) removeUpgradePlan(ctx context.Context) error {
823
826
return nil
824
827
}
825
828
829
+ // waitForPlanChanges returns unbuffered channel on which
830
+ func (m * upgradeManager ) waitForPlanChanges (ctx context.Context ) chan UpgradePlan {
831
+ ch := make (chan UpgradePlan )
832
+
833
+ go func () {
834
+ var oldPlan UpgradePlan
835
+ for {
836
+ delay := time .Second * 3
837
+ plan , err := m .readUpgradePlan (ctx )
838
+ if agency .IsKeyNotFound (err ) || plan .IsEmpty () {
839
+ // Just try later
840
+ } else if err != nil {
841
+ // Failed to read plan
842
+ m .log .Info ().Err (err ).Msg ("Failed to read upgrade plan" )
843
+ } else if ! oldPlan .Equals (plan ) {
844
+ ch <- plan
845
+ oldPlan = plan
846
+ delay = time .Millisecond * 500
847
+ }
848
+
849
+ timer := time .NewTimer (delay )
850
+ select {
851
+ case <- timer .C :
852
+ // Continue
853
+ case <- ctx .Done ():
854
+ // Context canceled
855
+ if ! timer .Stop () {
856
+ <- timer .C
857
+ }
858
+ return
859
+ }
860
+ }
861
+ }()
862
+
863
+ return ch
864
+ }
865
+
826
866
// RunWatchUpgradePlan keeps watching the upgrade plan in the agency.
827
867
// Once it detects that this starter has to act, it does.
828
868
func (m * upgradeManager ) RunWatchUpgradePlan (ctx context.Context ) {
829
- _ , myPeer , mode := m .upgradeManagerContext .ClusterConfig ()
830
- ownURL := myPeer .CreateStarterURL ("/" )
869
+ _ , _ , mode := m .upgradeManagerContext .ClusterConfig ()
831
870
if ! mode .HasAgency () {
832
871
// Nothing to do here without an agency
833
872
return
834
873
}
835
- registeredCallback := false
836
- defer func () {
837
- if registeredCallback {
838
- m .unregisterUpgradePlanChangedCallback (ctx , ownURL )
839
- }
840
- }()
874
+
875
+ planChanges := m .waitForPlanChanges (ctx )
876
+
877
+ var plan UpgradePlan
878
+ delay := 1 * time .Second
841
879
for {
842
- delay := time .Minute
843
- if ! registeredCallback {
844
- m .log .Debug ().Msg ("Registering upgrade plan changed callback..." )
845
- if err := m .registerUpgradePlanChangedCallback (ctx , ownURL ); err != nil {
846
- m .log .Info ().Err (err ).Msg ("Failed to register upgrade plan changed callback" )
847
- } else {
848
- registeredCallback = true
880
+ timer := time .NewTimer (delay )
881
+ select {
882
+ case <- timer .C :
883
+ // Continue
884
+ case plan = <- planChanges :
885
+ // Continue
886
+ if ! timer .Stop () {
887
+ <- timer .C
849
888
}
889
+ case <- ctx .Done ():
890
+ // Context canceled
891
+ if ! timer .Stop () {
892
+ <- timer .C
893
+ }
894
+ return
850
895
}
851
- plan , err := m .readUpgradePlan (ctx )
852
- if agency .IsKeyNotFound (err ) || plan .IsEmpty () {
853
- // Just try later
854
- } else if err != nil {
855
- // Failed to read plan
856
- m .log .Info ().Err (err ).Msg ("Failed to read upgrade plan" )
857
- } else if plan .IsReady () {
858
- // Plan entries have aal been processes
896
+
897
+ if plan .IsReady () {
898
+ // Plan entries have been processed
859
899
if ! plan .Finished {
860
900
// Let's show the user that we're done
861
901
if err := m .finishUpgradePlan (ctx , plan ); err != nil {
@@ -871,62 +911,7 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
871
911
}
872
912
delay = time .Second
873
913
}
874
-
875
- select {
876
- case <- time .After (delay ):
877
- // Continue
878
- case <- m .cbTrigger .Done ():
879
- // Continue
880
- case <- ctx .Done ():
881
- // Context canceled
882
- return
883
- }
884
- }
885
- }
886
-
887
- // UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
888
- func (m * upgradeManager ) UpgradePlanChangedCallback () {
889
- m .cbTrigger .Trigger ()
890
- }
891
-
892
- // registerUpgradePlanChangedCallback registers our callback URL with the agency
893
- func (m * upgradeManager ) registerUpgradePlanChangedCallback (ctx context.Context , ownURL string ) error {
894
- // Get api client
895
- api , err := m .createAgencyAPI ()
896
- if err != nil {
897
- return maskAny (err )
898
- }
899
- // Register callback
900
- cbURL , err := getURLWithPath (ownURL , "/cb/upgradePlanChanged" )
901
- if err != nil {
902
- return maskAny (err )
903
914
}
904
- ctx , cancel := context .WithTimeout (ctx , time .Second * 10 )
905
- defer cancel ()
906
- if err := api .RegisterChangeCallback (ctx , upgradePlanKey , cbURL ); err != nil {
907
- return maskAny (err )
908
- }
909
- return nil
910
- }
911
-
912
- // unregisterUpgradePlanChangedCallback removes our callback URL from the agency
913
- func (m * upgradeManager ) unregisterUpgradePlanChangedCallback (ctx context.Context , ownURL string ) error {
914
- // Get api client
915
- api , err := m .createAgencyAPI ()
916
- if err != nil {
917
- return maskAny (err )
918
- }
919
- // Register callback
920
- cbURL , err := getURLWithPath (ownURL , "/cb/upgradePlanChanged" )
921
- if err != nil {
922
- return maskAny (err )
923
- }
924
- ctx , cancel := context .WithTimeout (ctx , time .Second * 10 )
925
- defer cancel ()
926
- if err := api .UnregisterChangeCallback (ctx , upgradePlanKey , cbURL ); err != nil {
927
- return maskAny (err )
928
- }
929
- return nil
930
915
}
931
916
932
917
// processUpgradePlan inspects the first entry of the given plan and acts upon
0 commit comments