Skip to content

Commit 54a875a

Browse files
committed
Show TF Serving logs
1 parent ee4b3fc commit 54a875a

File tree

1 file changed

+48
-16
lines changed

1 file changed

+48
-16
lines changed

pkg/operator/workloads/logs.go

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,8 @@ func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous
160160
podCheckCancel <- struct{}{}
161161
}
162162

163-
func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) (*os.Process, error) {
164-
cmdPath := "/bin/bash"
163+
func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) ([]*os.Process, error) {
164+
processList := []*os.Process{}
165165

166166
kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
167167
if previous {
@@ -171,27 +171,55 @@ func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) (*os.
171171
identifier := pod.Name
172172
kubectlArgs = append(kubectlArgs, pod.Name)
173173
if pod.Labels["workloadType"] == workloadTypeAPI && pod.Labels["userFacing"] == "true" {
174-
kubectlArgs = append(kubectlArgs, apiContainerName)
175-
kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
174+
175+
for _, container := range pod.Spec.Containers {
176+
if container.Name == tfServingContainerName {
177+
tfServingArgs := make([]string, len(kubectlArgs))
178+
copy(tfServingArgs, kubectlArgs)
179+
tfServingArgs = append(tfServingArgs, tfServingContainerName)
180+
tfServingIdentifier := pod.Name + " " + tfServingContainerName
181+
process, err := createKubectlProcess(tfServingArgs, tfServingIdentifier, attrs)
182+
if err != nil {
183+
return nil, err
184+
}
185+
processList = append(processList, process)
186+
}
187+
}
176188
identifier += " " + apiContainerName
189+
kubectlArgs = append(kubectlArgs, apiContainerName)
177190
}
178191

192+
process, err := createKubectlProcess(kubectlArgs, identifier, attrs)
193+
if err != nil {
194+
for _, processToKill := range processList {
195+
processToKill.Kill()
196+
}
197+
return nil, err
198+
}
199+
200+
processList = append(processList, process)
201+
return processList, nil
202+
}
203+
204+
func createKubectlProcess(kubectlArgs []string, identifier string, attrs *os.ProcAttr) (*os.Process, error) {
205+
cmdPath := "/bin/bash"
206+
207+
kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
179208
labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY\" | tail -n +1; done", identifier)
180-
kubectlCmd := strings.Join(kubectlArgs, " ")
181-
bashArgs := []string{"/bin/bash", "-c", kubectlCmd + labelLog}
209+
kubectlArgsCmd := strings.Join(kubectlArgs, " ")
210+
bashArgs := []string{"/bin/bash", "-c", kubectlArgsCmd + labelLog}
182211
process, err := os.StartProcess(cmdPath, bashArgs, attrs)
183212
if err != nil {
184213
return nil, errors.Wrap(err, strings.Join(bashArgs, " "))
185214
}
186-
187215
return process, nil
188216
}
189217

190218
func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, wrotePending bool, inr *os.File) {
191219
timer := time.NewTimer(0)
192220
defer timer.Stop()
193221

194-
processMap := make(map[string]*os.Process)
222+
processMap := make(map[string][]*os.Process)
195223
defer deleteProcesses(processMap)
196224
labels := initialPodList[0].GetLabels()
197225
podSearchLabels := map[string]string{
@@ -275,17 +303,17 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
275303
}
276304

277305
for _, podName := range podsToAdd[:maxPodsToAdd] {
278-
process, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
306+
processList, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
279307
Files: []*os.File{inr, outw, outw},
280308
})
281309
if err != nil {
282310
socketWriterError <- err
283311
return
284312
}
285-
processMap[podName] = process
313+
processMap[podName] = processList
286314
}
287315

288-
deleteMap := make(map[string]*os.Process, len(podsToDelete))
316+
deleteMap := make(map[string][]*os.Process, len(podsToDelete))
289317

290318
for podName := range podsToDelete {
291319
deleteMap[podName] = processMap[podName]
@@ -303,13 +331,17 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
303331
}
304332
}
305333

306-
func deleteProcesses(processMap map[string]*os.Process) {
307-
for _, process := range processMap {
308-
process.Signal(os.Interrupt)
334+
func deleteProcesses(processMap map[string][]*os.Process) {
335+
for _, processes := range processMap {
336+
for _, process := range processes {
337+
process.Signal(os.Interrupt)
338+
}
309339
}
310340
time.Sleep(5 * time.Second)
311-
for _, process := range processMap {
312-
process.Signal(os.Kill)
341+
for _, processes := range processMap {
342+
for _, process := range processes {
343+
process.Signal(os.Kill)
344+
}
313345
}
314346
}
315347

0 commit comments

Comments
 (0)