@@ -31,8 +31,11 @@ import (
31
31
"helm.sh/helm/v3/pkg/chart/loader"
32
32
"helm.sh/helm/v3/pkg/chartutil"
33
33
"helm.sh/helm/v3/pkg/cli"
34
+ admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
34
35
corev1 "k8s.io/api/core/v1"
35
36
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
37
+ "k8s.io/apimachinery/pkg/types"
38
+ "k8s.io/apimachinery/pkg/util/wait"
36
39
"k8s.io/client-go/kubernetes"
37
40
"k8s.io/client-go/kubernetes/scheme"
38
41
"k8s.io/client-go/rest"
@@ -53,6 +56,12 @@ import (
53
56
const (
54
57
ReleaseName = "spark-operator"
55
58
ReleaseNamespace = "spark-operator"
59
+
60
+ MutatingWebhookName = "spark-operator-webhook"
61
+ ValidatingWebhookName = "spark-operator-webhook"
62
+
63
+ PollInterval = 1 * time .Second
64
+ WaitTimeout = 5 * time .Minute
56
65
)
57
66
58
67
var (
@@ -123,7 +132,7 @@ var _ = BeforeSuite(func() {
123
132
installAction .ReleaseName = ReleaseName
124
133
installAction .Namespace = envSettings .Namespace ()
125
134
installAction .Wait = true
126
- installAction .Timeout = 5 * time . Minute
135
+ installAction .Timeout = WaitTimeout
127
136
chartPath := filepath .Join (".." , ".." , "charts" , "spark-operator-chart" )
128
137
chart , err := loader .Load (chartPath )
129
138
Expect (err ).NotTo (HaveOccurred ())
@@ -134,6 +143,12 @@ var _ = BeforeSuite(func() {
134
143
release , err := installAction .Run (chart , values )
135
144
Expect (err ).NotTo (HaveOccurred ())
136
145
Expect (release ).NotTo (BeNil ())
146
+
147
+ By ("Waiting for the webhooks to be ready" )
148
+ mutatingWebhookKey := types.NamespacedName {Name : MutatingWebhookName }
149
+ validatingWebhookKey := types.NamespacedName {Name : ValidatingWebhookName }
150
+ Expect (waitForMutatingWebhookReady (context .Background (), mutatingWebhookKey )).NotTo (HaveOccurred ())
151
+ Expect (waitForValidatingWebhookReady (context .Background (), validatingWebhookKey )).NotTo (HaveOccurred ())
137
152
})
138
153
139
154
var _ = AfterSuite (func () {
@@ -147,7 +162,7 @@ var _ = AfterSuite(func() {
147
162
uninstallAction := action .NewUninstall (actionConfig )
148
163
Expect (uninstallAction ).NotTo (BeNil ())
149
164
uninstallAction .Wait = true
150
- uninstallAction .Timeout = 5 * time . Minute
165
+ uninstallAction .Timeout = WaitTimeout
151
166
resp , err := uninstallAction .Run (ReleaseName )
152
167
Expect (err ).To (BeNil ())
153
168
Expect (resp ).NotTo (BeNil ())
@@ -160,3 +175,95 @@ var _ = AfterSuite(func() {
160
175
err = testEnv .Stop ()
161
176
Expect (err ).ToNot (HaveOccurred ())
162
177
})
178
+
179
+ func waitForMutatingWebhookReady (ctx context.Context , key types.NamespacedName ) error {
180
+ cancelCtx , cancelFunc := context .WithTimeout (ctx , WaitTimeout )
181
+ defer cancelFunc ()
182
+
183
+ mutatingWebhook := admissionregistrationv1.MutatingWebhookConfiguration {}
184
+ err := wait .PollUntilContextCancel (cancelCtx , PollInterval , true , func (ctx context.Context ) (bool , error ) {
185
+ if err := k8sClient .Get (ctx , key , & mutatingWebhook ); err != nil {
186
+ return false , err
187
+ }
188
+
189
+ for _ , wh := range mutatingWebhook .Webhooks {
190
+ // Checkout webhook CA certificate
191
+ if wh .ClientConfig .CABundle == nil {
192
+ return false , nil
193
+ }
194
+
195
+ // Checkout webhook service endpoints
196
+ svcRef := wh .ClientConfig .Service
197
+ if svcRef == nil {
198
+ return false , fmt .Errorf ("webhook service is nil" )
199
+ }
200
+ endpoints := corev1.Endpoints {}
201
+ endpointsKey := types.NamespacedName {Namespace : svcRef .Namespace , Name : svcRef .Name }
202
+ if err := k8sClient .Get (ctx , endpointsKey , & endpoints ); err != nil {
203
+ return false , err
204
+ }
205
+ if len (endpoints .Subsets ) == 0 {
206
+ return false , nil
207
+ }
208
+ }
209
+
210
+ return true , nil
211
+ })
212
+ return err
213
+ }
214
+
215
+ func waitForValidatingWebhookReady (ctx context.Context , key types.NamespacedName ) error {
216
+ cancelCtx , cancelFunc := context .WithTimeout (ctx , WaitTimeout )
217
+ defer cancelFunc ()
218
+
219
+ validatingWebhook := admissionregistrationv1.ValidatingWebhookConfiguration {}
220
+ err := wait .PollUntilContextCancel (cancelCtx , PollInterval , true , func (ctx context.Context ) (bool , error ) {
221
+ if err := k8sClient .Get (ctx , key , & validatingWebhook ); err != nil {
222
+ return false , err
223
+ }
224
+
225
+ for _ , wh := range validatingWebhook .Webhooks {
226
+ // Checkout webhook CA certificate
227
+ if wh .ClientConfig .CABundle == nil {
228
+ return false , nil
229
+ }
230
+
231
+ // Checkout webhook service endpoints
232
+ svcRef := wh .ClientConfig .Service
233
+ if svcRef == nil {
234
+ return false , fmt .Errorf ("webhook service is nil" )
235
+ }
236
+ endpoints := corev1.Endpoints {}
237
+ endpointsKey := types.NamespacedName {Namespace : svcRef .Namespace , Name : svcRef .Name }
238
+ if err := k8sClient .Get (ctx , endpointsKey , & endpoints ); err != nil {
239
+ return false , err
240
+ }
241
+ if len (endpoints .Subsets ) == 0 {
242
+ return false , nil
243
+ }
244
+ }
245
+
246
+ return true , nil
247
+ })
248
+ return err
249
+ }
250
+
251
+ func waitForSparkApplicationCompleted (ctx context.Context , key types.NamespacedName ) error {
252
+ cancelCtx , cancelFunc := context .WithTimeout (ctx , WaitTimeout )
253
+ defer cancelFunc ()
254
+
255
+ app := & v1beta2.SparkApplication {}
256
+ err := wait .PollUntilContextCancel (cancelCtx , PollInterval , true , func (ctx context.Context ) (bool , error ) {
257
+ if err := k8sClient .Get (ctx , key , app ); err != nil {
258
+ return false , err
259
+ }
260
+ switch app .Status .AppState .State {
261
+ case v1beta2 .ApplicationStateFailedSubmission , v1beta2 .ApplicationStateFailed :
262
+ return false , fmt .Errorf (app .Status .AppState .ErrorMessage )
263
+ case v1beta2 .ApplicationStateCompleted :
264
+ return true , nil
265
+ }
266
+ return false , nil
267
+ })
268
+ return err
269
+ }
0 commit comments