Skip to content

Commit ee4ec61

Browse files
authored
Log init containers and remove previous flag (#393)
1 parent 89cd04e commit ee4ec61

File tree

1 file changed

+80
-87
lines changed

1 file changed

+80
-87
lines changed

pkg/operator/workloads/logs.go

Lines changed: 80 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"io"
2323
"os"
2424
"regexp"
25+
"sort"
2526
"strings"
2627
"time"
2728

@@ -31,6 +32,7 @@ import (
3132
"github.com/cortexlabs/cortex/pkg/lib/errors"
3233
"github.com/cortexlabs/cortex/pkg/lib/k8s"
3334
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
35+
s "github.com/cortexlabs/cortex/pkg/lib/strings"
3436
"github.com/cortexlabs/cortex/pkg/operator/config"
3537
)
3638

@@ -73,30 +75,26 @@ func ReadLogs(appName string, podSearchLabels map[string]string, verbose bool, s
7375

7476
switch {
7577
case len(podMap[k8s.PodStatusSucceeded]) > 0:
76-
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, false, socket)
78+
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, socket)
7779
case len(podMap[k8s.PodStatusRunning]) > 0:
78-
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, false, socket)
80+
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, socket)
7981
case len(podMap[k8s.PodStatusPending]) > 0:
80-
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, false, socket)
82+
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, socket)
8183
case len(podMap[k8s.PodStatusKilled]) > 0:
82-
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, false, socket)
84+
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, socket)
8385
case len(podMap[k8s.PodStatusKilledOOM]) > 0:
84-
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, false, socket)
86+
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, socket)
8587
case len(podMap[k8s.PodStatusFailed]) > 0:
86-
previous := false
87-
if pods[0].Labels["workloadType"] == workloadTypeAPI {
88-
previous = true
89-
}
90-
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, previous, socket)
88+
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, socket)
9189
case len(podMap[k8s.PodStatusTerminating]) > 0:
92-
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, false, socket)
90+
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, socket)
9391
case len(podMap[k8s.PodStatusUnknown]) > 0:
94-
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, false, socket)
92+
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, socket)
9593
default: // unexpected
9694
if len(pods) > maxParallelPodLogging {
9795
pods = pods[:maxParallelPodLogging]
9896
}
99-
getKubectlLogs(pods, verbose, wrotePending, false, socket)
97+
getKubectlLogs(pods, verbose, wrotePending, socket)
10098
}
10199
return
102100
}
@@ -126,7 +124,7 @@ func ReadLogs(appName string, podSearchLabels map[string]string, verbose bool, s
126124
}
127125
}
128126

129-
func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
127+
func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, socket *websocket.Conn) {
130128
if !wrotePending {
131129
isAllPending := true
132130
for _, pod := range pods {
@@ -154,55 +152,64 @@ func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous
154152
podCheckCancel := make(chan struct{})
155153
defer close(podCheckCancel)
156154

157-
go podCheck(podCheckCancel, socket, pods, previous, verbose, wrotePending, inr)
155+
go podCheck(podCheckCancel, socket, pods, verbose, wrotePending, inr)
158156
pumpStdin(socket, inw)
159157
podCheckCancel <- struct{}{}
160158
}
161159

162-
func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) ([]*os.Process, error) {
163-
processList := []*os.Process{}
160+
type LogKey struct {
161+
PodName string
162+
ContainerName string
163+
RestartCount int32
164+
}
164165

165-
kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
166-
if previous {
167-
kubectlArgs = append(kubectlArgs, "--previous")
168-
}
166+
func (l LogKey) String() string {
167+
return fmt.Sprintf("%s,%s,%d", l.PodName, l.ContainerName, l.RestartCount)
168+
}
169169

170-
identifier := pod.Name
171-
kubectlArgs = append(kubectlArgs, pod.Name)
172-
if pod.Labels["workloadType"] == workloadTypeAPI && pod.Labels["userFacing"] == "true" {
173-
174-
for _, container := range pod.Spec.Containers {
175-
if container.Name == tfServingContainerName {
176-
tfServingArgs := make([]string, len(kubectlArgs))
177-
copy(tfServingArgs, kubectlArgs)
178-
tfServingArgs = append(tfServingArgs, tfServingContainerName)
179-
tfServingIdentifier := pod.Name + " " + tfServingContainerName
180-
process, err := createKubectlProcess(tfServingArgs, tfServingIdentifier, attrs)
181-
if err != nil {
182-
return nil, err
183-
}
184-
processList = append(processList, process)
185-
}
186-
}
187-
identifier += " " + apiContainerName
188-
kubectlArgs = append(kubectlArgs, apiContainerName)
170+
func (l LogKey) KubeIdentifier() string {
171+
return fmt.Sprintf("%s %s", l.PodName, l.ContainerName)
172+
}
173+
174+
func StringToLogKey(str string) LogKey {
175+
split := strings.Split(str, ",")
176+
restartCount, _ := s.ParseInt32(split[2])
177+
return LogKey{PodName: split[0], ContainerName: split[1], RestartCount: restartCount}
178+
}
179+
180+
func GetLogKey(pod kcore.Pod, status kcore.ContainerStatus) LogKey {
181+
return LogKey{PodName: pod.Name, ContainerName: status.Name, RestartCount: status.RestartCount}
182+
}
183+
184+
func CurrentLoggingProcesses(logProcesses map[string]*os.Process) strset.Set {
185+
set := strset.New()
186+
for identifier := range logProcesses {
187+
set.Add(identifier)
189188
}
189+
return set
190+
}
190191

191-
process, err := createKubectlProcess(kubectlArgs, identifier, attrs)
192-
if err != nil {
193-
for _, processToKill := range processList {
194-
processToKill.Kill()
192+
func GetLogKeys(pod kcore.Pod) strset.Set {
193+
containerStatuses := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
194+
logKeys := strset.New()
195+
for _, status := range containerStatuses {
196+
if status.State.Terminated != nil && (status.State.Terminated.ExitCode != 0 || status.State.Terminated.StartedAt.After(time.Now().Add(-newPodCheckInterval))) {
197+
logKeys.Add(GetLogKey(pod, status).String())
198+
} else if status.State.Running != nil {
199+
logKeys.Add(GetLogKey(pod, status).String())
195200
}
196-
return nil, err
197201
}
198202

199-
processList = append(processList, process)
200-
return processList, nil
203+
return logKeys
201204
}
202205

203-
func createKubectlProcess(kubectlArgs []string, identifier string, attrs *os.ProcAttr) (*os.Process, error) {
206+
func createKubectlProcess(logKey LogKey, attrs *os.ProcAttr) (*os.Process, error) {
204207
cmdPath := "/bin/bash"
205208

209+
kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true", logKey.PodName, logKey.ContainerName}
210+
211+
identifier := logKey.KubeIdentifier()
212+
206213
kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
207214
labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY\" | tail -n +1; done", identifier)
208215
kubectlArgsCmd := strings.Join(kubectlArgs, " ")
@@ -214,11 +221,11 @@ func createKubectlProcess(kubectlArgs []string, identifier string, attrs *os.Pro
214221
return process, nil
215222
}
216223

217-
func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, wrotePending bool, inr *os.File) {
224+
func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, verbose bool, wrotePending bool, inr *os.File) {
218225
timer := time.NewTimer(0)
219226
defer timer.Stop()
220227

221-
processMap := make(map[string][]*os.Process)
228+
processMap := make(map[string]*os.Process)
222229
defer deleteProcesses(processMap)
223230
labels := initialPodList[0].GetLabels()
224231
podSearchLabels := map[string]string{
@@ -266,33 +273,23 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
266273
}
267274
}
268275

269-
prevRunningPods := strset.New()
270-
for podName := range processMap {
271-
prevRunningPods.Add(podName)
272-
}
273-
274-
newPods := strset.Difference(latestRunningPods, prevRunningPods)
275-
podsToDelete := strset.Difference(prevRunningPods, latestRunningPods)
276-
podsToKeep := strset.Intersection(prevRunningPods, latestRunningPods)
276+
runningLogProcesses := CurrentLoggingProcesses(processMap)
277277

278-
// Prioritize adding running pods
279-
podsToAddRunning := []string{}
280-
podsToAddNotRunning := []string{}
278+
sortedPods := latestRunningPods.Slice()
279+
sort.Slice(sortedPods, func(i, j int) bool {
280+
return latestRunningPodsMap[sortedPods[i]].CreationTimestamp.After(latestRunningPodsMap[sortedPods[j]].CreationTimestamp.Time)
281+
})
281282

282-
for podName := range newPods {
283-
pod := latestRunningPodsMap[podName]
284-
if k8s.GetPodStatus(&pod) == k8s.PodStatusRunning {
285-
podsToAddRunning = append(podsToAddRunning, podName)
286-
} else {
287-
podsToAddNotRunning = append(podsToAddNotRunning, podName)
283+
expectedLogProcesses := strset.New()
284+
for i, podName := range sortedPods {
285+
if i >= maxParallelPodLogging {
286+
break
288287
}
288+
expectedLogProcesses.Merge(GetLogKeys(latestRunningPodsMap[podName]))
289289
}
290-
podsToAdd := append(podsToAddRunning, podsToAddNotRunning...)
291290

292-
maxPodsToAdd := maxParallelPodLogging - len(podsToKeep)
293-
if len(podsToAdd) < maxPodsToAdd {
294-
maxPodsToAdd = len(podsToAdd)
295-
}
291+
processesToDelete := strset.Difference(runningLogProcesses, expectedLogProcesses)
292+
processesToAdd := strset.Difference(expectedLogProcesses, runningLogProcesses)
296293

297294
if wrotePending && len(latestRunningPods) > 0 {
298295
if !writeSocket("Streaming logs:", socket) {
@@ -301,20 +298,20 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
301298
wrotePending = false
302299
}
303300

304-
for _, podName := range podsToAdd[:maxPodsToAdd] {
305-
processList, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
301+
for logProcess := range processesToAdd {
302+
process, err := createKubectlProcess(StringToLogKey(logProcess), &os.ProcAttr{
306303
Files: []*os.File{inr, outw, outw},
307304
})
308305
if err != nil {
309306
socketWriterError <- err
310307
return
311308
}
312-
processMap[podName] = processList
309+
processMap[logProcess] = process
313310
}
314311

315-
deleteMap := make(map[string][]*os.Process, len(podsToDelete))
312+
deleteMap := make(map[string]*os.Process, len(processesToDelete))
316313

317-
for podName := range podsToDelete {
314+
for podName := range processesToDelete {
318315
deleteMap[podName] = processMap[podName]
319316
delete(processMap, podName)
320317
}
@@ -330,17 +327,13 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
330327
}
331328
}
332329

333-
func deleteProcesses(processMap map[string][]*os.Process) {
334-
for _, processes := range processMap {
335-
for _, process := range processes {
336-
process.Signal(os.Interrupt)
337-
}
330+
func deleteProcesses(processMap map[string]*os.Process) {
331+
for _, process := range processMap {
332+
process.Signal(os.Interrupt)
338333
}
339-
time.Sleep(5 * time.Second)
340-
for _, processes := range processMap {
341-
for _, process := range processes {
342-
process.Signal(os.Kill)
343-
}
334+
time.Sleep(3 * time.Second)
335+
for _, process := range processMap {
336+
process.Signal(os.Kill)
344337
}
345338
}
346339

0 commit comments

Comments
 (0)