@@ -19,6 +19,8 @@ import (
1919 "strings"
2020 "time"
2121
22+ apps "k8s.io/api/apps/v1beta1"
23+ "k8s.io/api/core/v1"
2224 apierrors "k8s.io/apimachinery/pkg/api/errors"
2325 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2426 "k8s.io/apimachinery/pkg/labels"
@@ -35,6 +37,7 @@ import (
3537 "github.com/oracle/mysql-operator/pkg/controllers/cluster/labeler"
3638 mysqlclientset "github.com/oracle/mysql-operator/pkg/generated/clientset/versioned"
3739 "github.com/oracle/mysql-operator/pkg/resources/secrets"
40+ "github.com/oracle/mysql-operator/pkg/resources/statefulsets"
3841)
3942
4043// TestDBName is the name of database to use when executing test SQL queries.
@@ -112,7 +115,7 @@ func (j *ClusterTestJig) CreateAndAwaitClusterOrFail(namespace string, members i
112115 return cluster
113116}
114117
115- func (j * ClusterTestJig ) waitForConditionOrFail (namespace , name string , timeout time.Duration , message string , conditionFn func (* v1alpha1.Cluster ) bool ) * v1alpha1.Cluster {
118+ func (j * ClusterTestJig ) WaitForConditionOrFail (namespace , name string , timeout time.Duration , message string , conditionFn func (* v1alpha1.Cluster ) bool ) * v1alpha1.Cluster {
116119 var cluster * v1alpha1.Cluster
117120 pollFunc := func () (bool , error ) {
118121 c , err := j .MySQLClient .MySQLV1alpha1 ().Clusters (namespace ).Get (name , metav1.GetOptions {})
@@ -135,12 +138,99 @@ func (j *ClusterTestJig) waitForConditionOrFail(namespace, name string, timeout
135138// the running phase.
136139func (j * ClusterTestJig ) WaitForClusterReadyOrFail (namespace , name string , timeout time.Duration ) * v1alpha1.Cluster {
137140 Logf ("Waiting up to %v for Cluster \" %s/%s\" to be ready" , timeout , namespace , name )
138- cluster := j .waitForConditionOrFail (namespace , name , timeout , "have all nodes ready" , func (cluster * v1alpha1.Cluster ) bool {
141+ cluster := j .WaitForConditionOrFail (namespace , name , timeout , "have all nodes ready" , func (cluster * v1alpha1.Cluster ) bool {
139142 return clusterutil .IsClusterReady (cluster )
140143 })
141144 return cluster
142145}
143146
147+ // WaitForClusterUpgradedOrFail waits for a MySQL cluster to be upgraded to the
148+ // given version or fails.
149+ func (j * ClusterTestJig ) WaitForClusterUpgradedOrFail (namespace , name , version string , timeout time.Duration ) * v1alpha1.Cluster {
150+ Logf ("Waiting up to %v for Cluster \" %s/%s\" to be upgraded" , timeout , namespace , name )
151+
152+ cluster := j .WaitForConditionOrFail (namespace , name , timeout , "be upgraded " , func (cluster * v1alpha1.Cluster ) bool {
153+ set , err := j .KubeClient .AppsV1beta1 ().StatefulSets (cluster .Namespace ).Get (cluster .Name , metav1.GetOptions {})
154+ if err != nil {
155+ Failf ("Failed to get StatefulSet %[1]q for Cluster %[1]q: %[2]v" , name , err )
156+ }
157+
158+ set = j .waitForSSRollingUpdate (set )
159+
160+ var actualVersion string
161+ {
162+ var found bool
163+ var index int
164+ for i , c := range set .Spec .Template .Spec .Containers {
165+ if c .Name == statefulsets .MySQLServerName {
166+ index = i
167+ found = true
168+ break
169+ }
170+ }
171+
172+ if ! found {
173+ Failf ("no %q container found for StatefulSet %q" , statefulsets .MySQLServerName , set .Name )
174+ }
175+ image := set .Spec .Template .Spec .Containers [index ].Image
176+ parts := strings .Split (image , ":" )
177+ if len (parts ) < 2 {
178+ Failf ("invalid image %q for StatefulSet %q" , image , set .Name )
179+ }
180+ actualVersion = parts [len (parts )- 1 ]
181+ }
182+
183+ return actualVersion == version
184+ })
185+ return cluster
186+ }
187+
188+ // waitForSSState periodically polls for the ss and its pods until the until function returns either true or an error
189+ func (j * ClusterTestJig ) waitForSSState (ss * apps.StatefulSet , until func (* apps.StatefulSet , * v1.PodList ) (bool , error )) {
190+ pollErr := wait .PollImmediate (Poll , DefaultTimeout ,
191+ func () (bool , error ) {
192+ ssGet , err := j .KubeClient .AppsV1beta1 ().StatefulSets (ss .Namespace ).Get (ss .Name , metav1.GetOptions {})
193+ if err != nil {
194+ return false , err
195+ }
196+
197+ selector , err := metav1 .LabelSelectorAsSelector (ss .Spec .Selector )
198+ ExpectNoError (err )
199+ podList , err := j .KubeClient .CoreV1 ().Pods (ss .Namespace ).List (metav1.ListOptions {LabelSelector : selector .String ()})
200+ ExpectNoError (err )
201+
202+ return until (ssGet , podList )
203+ })
204+ if pollErr != nil {
205+ Failf ("Failed waiting for state update: %v" , pollErr )
206+ }
207+ }
208+
209+ // waitForRollingUpdate waits for all Pods in set to exist and have the correct revision and for the RollingUpdate to
210+ // complete. set must have a RollingUpdateStatefulSetStrategyType.
211+ func (j * ClusterTestJig ) waitForSSRollingUpdate (set * apps.StatefulSet ) * apps.StatefulSet {
212+ var pods * v1.PodList
213+ if set .Spec .UpdateStrategy .Type != apps .RollingUpdateStatefulSetStrategyType {
214+ Failf ("StatefulSet %s/%s attempt to wait for rolling update with updateStrategy %s" ,
215+ set .Namespace ,
216+ set .Name ,
217+ set .Spec .UpdateStrategy .Type )
218+ }
219+ Logf ("Waiting for StatefulSet %s/%s to complete update" , set .Namespace , set .Name )
220+ j .waitForSSState (set , func (set2 * apps.StatefulSet , pods2 * v1.PodList ) (bool , error ) {
221+ set = set2
222+ pods = pods2
223+ if len (pods .Items ) < int (* set .Spec .Replicas ) {
224+ return false , nil
225+ }
226+ if set .Status .UpdateRevision != set .Status .CurrentRevision {
227+ return false , nil
228+ }
229+ return true , nil
230+ })
231+ return set
232+ }
233+
144234// SanityCheckCluster checks basic properties of a given Cluster match
145235// our expectations.
146236func (j * ClusterTestJig ) SanityCheckCluster (cluster * v1alpha1.Cluster ) {
0 commit comments