Skip to content

Commit 38ffa7d

Browse files
authored
CaaS - batch probe fixes (#2220)
1 parent 2882436 commit 38ffa7d

File tree

3 files changed

+37
-22
lines changed

3 files changed

+37
-22
lines changed

cmd/dequeuer/main.go

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ func main() {
8787
log.Fatal("--api-name is a required option")
8888
case apiKind == "":
8989
log.Fatal("--api-kind is a required option")
90+
case adminPort == 0:
91+
log.Fatal("--admin-port is a required option")
9092
}
9193

9294
targetURL := "http://127.0.0.1:" + strconv.Itoa(userContainerPort)
@@ -142,8 +144,6 @@ func main() {
142144
var dequeuerConfig dequeuer.SQSDequeuerConfig
143145
var messageHandler dequeuer.MessageHandler
144146

145-
errCh := make(chan error)
146-
147147
switch apiKind {
148148
case userconfig.BatchAPIKind.String():
149149
if jobID == "" {
@@ -169,9 +169,6 @@ func main() {
169169
if clusterUID == "" {
170170
log.Fatal("--cluster-uid is a required option")
171171
}
172-
if adminPort == 0 {
173-
log.Fatal("--admin-port is a required option")
174-
}
175172

176173
config := dequeuer.AsyncMessageHandlerConfig{
177174
ClusterUID: clusterUID,
@@ -187,24 +184,26 @@ func main() {
187184
StopIfNoMessages: false,
188185
}
189186

190-
adminHandler := http.NewServeMux()
191-
adminHandler.Handle("/healthz", dequeuer.HealthcheckHandler(func() bool {
192-
return probe.AreProbesHealthy(probes)
193-
}))
194-
195-
go func() {
196-
server := &http.Server{
197-
Addr: ":" + strconv.Itoa(adminPort),
198-
Handler: adminHandler,
199-
}
200-
log.Infof("Starting %s server on %s", "admin", server.Addr)
201-
errCh <- server.ListenAndServe()
202-
}()
203-
204187
default:
205188
exit(log, err, fmt.Sprintf("kind %s is not supported", apiKind))
206189
}
207190

191+
errCh := make(chan error)
192+
193+
adminHandler := http.NewServeMux()
194+
adminHandler.Handle("/healthz", dequeuer.HealthcheckHandler(func() bool {
195+
return probe.AreProbesHealthy(probes)
196+
}))
197+
198+
go func() {
199+
server := &http.Server{
200+
Addr: ":" + strconv.Itoa(adminPort),
201+
Handler: adminHandler,
202+
}
203+
log.Infof("Starting %s server on %s", "admin", server.Addr)
204+
errCh <- server.ListenAndServe()
205+
}()
206+
208207
sigint := make(chan os.Signal, 1)
209208
signal.Notify(sigint, os.Interrupt)
210209

pkg/probe/probe.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,9 @@ func (p *Probe) httpProbe() error {
173173
"http://",
174174
)
175175

176-
httpClient := &http.Client{}
176+
httpClient := &http.Client{
177+
Timeout: time.Duration(p.TimeoutSeconds) * time.Second,
178+
}
177179
req, err := http.NewRequest(http.MethodGet, targetURL, nil)
178180
if err != nil {
179181
return err

pkg/workloads/k8s.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func asyncDequeuerProxyContainer(api spec.API, queueURL string) (kcore.Container
152152
},
153153
InitialDelaySeconds: 1,
154154
TimeoutSeconds: 1,
155-
PeriodSeconds: 5,
155+
PeriodSeconds: 10,
156156
SuccessThreshold: 1,
157157
FailureThreshold: 1,
158158
},
@@ -180,6 +180,7 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co
180180
"--job-id", jobID,
181181
"--user-port", s.Int32(*api.Pod.Port),
182182
"--statsd-port", consts.StatsDPortStr,
183+
"--admin-port", consts.AdminPortStr,
183184
},
184185
Env: append(baseEnvVars, kcore.EnvVar{
185186
Name: "HOST_IP",
@@ -189,6 +190,19 @@ func batchDequeuerProxyContainer(api spec.API, jobID, queueURL string) (kcore.Co
189190
},
190191
},
191192
}),
193+
ReadinessProbe: &kcore.Probe{
194+
Handler: kcore.Handler{
195+
HTTPGet: &kcore.HTTPGetAction{
196+
Path: "/healthz",
197+
Port: intstr.FromInt(int(consts.AdminPortInt32)),
198+
},
199+
},
200+
InitialDelaySeconds: 1,
201+
TimeoutSeconds: 1,
202+
PeriodSeconds: 10,
203+
SuccessThreshold: 1,
204+
FailureThreshold: 1,
205+
},
192206
VolumeMounts: []kcore.VolumeMount{
193207
ClusterConfigMount(),
194208
CortexMount(),
@@ -233,7 +247,7 @@ func realtimeProxyContainer(api spec.API) (kcore.Container, kcore.Volume) {
233247
},
234248
InitialDelaySeconds: 1,
235249
TimeoutSeconds: 1,
236-
PeriodSeconds: 5,
250+
PeriodSeconds: 10,
237251
SuccessThreshold: 1,
238252
FailureThreshold: 1,
239253
},

0 commit comments

Comments
 (0)