Skip to content

Log init containers and remove previous flag #393

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 29, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 80 additions & 87 deletions pkg/operator/workloads/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io"
"os"
"regexp"
"sort"
"strings"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"github.com/cortexlabs/cortex/pkg/lib/errors"
"github.com/cortexlabs/cortex/pkg/lib/k8s"
"github.com/cortexlabs/cortex/pkg/lib/sets/strset"
s "github.com/cortexlabs/cortex/pkg/lib/strings"
"github.com/cortexlabs/cortex/pkg/operator/config"
)

Expand Down Expand Up @@ -73,30 +75,26 @@ func ReadLogs(appName string, podSearchLabels map[string]string, verbose bool, s

switch {
case len(podMap[k8s.PodStatusSucceeded]) > 0:
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusSucceeded], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusRunning]) > 0:
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusRunning], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusPending]) > 0:
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusPending], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusKilled]) > 0:
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusKilled], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusKilledOOM]) > 0:
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusKilledOOM], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusFailed]) > 0:
previous := false
if pods[0].Labels["workloadType"] == workloadTypeAPI {
previous = true
}
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, previous, socket)
getKubectlLogs(podMap[k8s.PodStatusFailed], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusTerminating]) > 0:
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusTerminating], verbose, wrotePending, socket)
case len(podMap[k8s.PodStatusUnknown]) > 0:
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, false, socket)
getKubectlLogs(podMap[k8s.PodStatusUnknown], verbose, wrotePending, socket)
default: // unexpected
if len(pods) > maxParallelPodLogging {
pods = pods[:maxParallelPodLogging]
}
getKubectlLogs(pods, verbose, wrotePending, false, socket)
getKubectlLogs(pods, verbose, wrotePending, socket)
}
return
}
Expand Down Expand Up @@ -126,7 +124,7 @@ func ReadLogs(appName string, podSearchLabels map[string]string, verbose bool, s
}
}

func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous bool, socket *websocket.Conn) {
func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, socket *websocket.Conn) {
if !wrotePending {
isAllPending := true
for _, pod := range pods {
Expand Down Expand Up @@ -154,55 +152,64 @@ func getKubectlLogs(pods []kcore.Pod, verbose bool, wrotePending bool, previous
podCheckCancel := make(chan struct{})
defer close(podCheckCancel)

go podCheck(podCheckCancel, socket, pods, previous, verbose, wrotePending, inr)
go podCheck(podCheckCancel, socket, pods, verbose, wrotePending, inr)
pumpStdin(socket, inw)
podCheckCancel <- struct{}{}
}

func startKubectlProcess(pod kcore.Pod, previous bool, attrs *os.ProcAttr) ([]*os.Process, error) {
processList := []*os.Process{}
type LogKey struct {
PodName string
ContainerName string
RestartCount int32
}

kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true"}
if previous {
kubectlArgs = append(kubectlArgs, "--previous")
}
func (l LogKey) String() string {
return fmt.Sprintf("%s,%s,%d", l.PodName, l.ContainerName, l.RestartCount)
}

identifier := pod.Name
kubectlArgs = append(kubectlArgs, pod.Name)
if pod.Labels["workloadType"] == workloadTypeAPI && pod.Labels["userFacing"] == "true" {

for _, container := range pod.Spec.Containers {
if container.Name == tfServingContainerName {
tfServingArgs := make([]string, len(kubectlArgs))
copy(tfServingArgs, kubectlArgs)
tfServingArgs = append(tfServingArgs, tfServingContainerName)
tfServingIdentifier := pod.Name + " " + tfServingContainerName
process, err := createKubectlProcess(tfServingArgs, tfServingIdentifier, attrs)
if err != nil {
return nil, err
}
processList = append(processList, process)
}
}
identifier += " " + apiContainerName
kubectlArgs = append(kubectlArgs, apiContainerName)
func (l LogKey) KubeIdentifier() string {
return fmt.Sprintf("%s %s", l.PodName, l.ContainerName)
}

func StringToLogKey(str string) LogKey {
split := strings.Split(str, ",")
restartCount, _ := s.ParseInt32(split[2])
return LogKey{PodName: split[0], ContainerName: split[1], RestartCount: restartCount}
}

func GetLogKey(pod kcore.Pod, status kcore.ContainerStatus) LogKey {
return LogKey{PodName: pod.Name, ContainerName: status.Name, RestartCount: status.RestartCount}
}

func CurrentLoggingProcesses(logProcesses map[string]*os.Process) strset.Set {
set := strset.New()
for identifier := range logProcesses {
set.Add(identifier)
}
return set
}

process, err := createKubectlProcess(kubectlArgs, identifier, attrs)
if err != nil {
for _, processToKill := range processList {
processToKill.Kill()
func GetLogKeys(pod kcore.Pod) strset.Set {
containerStatuses := append(pod.Status.InitContainerStatuses, pod.Status.ContainerStatuses...)
logKeys := strset.New()
for _, status := range containerStatuses {
if status.State.Terminated != nil && (status.State.Terminated.ExitCode != 0 || status.State.Terminated.StartedAt.After(time.Now().Add(-newPodCheckInterval))) {
logKeys.Add(GetLogKey(pod, status).String())
} else if status.State.Running != nil {
logKeys.Add(GetLogKey(pod, status).String())
}
return nil, err
}

processList = append(processList, process)
return processList, nil
return logKeys
}

func createKubectlProcess(kubectlArgs []string, identifier string, attrs *os.ProcAttr) (*os.Process, error) {
func createKubectlProcess(logKey LogKey, attrs *os.ProcAttr) (*os.Process, error) {
cmdPath := "/bin/bash"

kubectlArgs := []string{"kubectl", "-n=" + config.Cortex.Namespace, "logs", "--follow=true", logKey.PodName, logKey.ContainerName}

identifier := logKey.KubeIdentifier()

kubectlArgs = append(kubectlArgs, fmt.Sprintf("--tail=%d", initLogTailLines))
labelLog := fmt.Sprintf(" | while read -r; do echo \"[%s] $REPLY\" | tail -n +1; done", identifier)
kubectlArgsCmd := strings.Join(kubectlArgs, " ")
Expand All @@ -214,11 +221,11 @@ func createKubectlProcess(kubectlArgs []string, identifier string, attrs *os.Pro
return process, nil
}

func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, previous bool, verbose bool, wrotePending bool, inr *os.File) {
func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodList []kcore.Pod, verbose bool, wrotePending bool, inr *os.File) {
timer := time.NewTimer(0)
defer timer.Stop()

processMap := make(map[string][]*os.Process)
processMap := make(map[string]*os.Process)
defer deleteProcesses(processMap)
labels := initialPodList[0].GetLabels()
podSearchLabels := map[string]string{
Expand Down Expand Up @@ -266,33 +273,23 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
}
}

prevRunningPods := strset.New()
for podName := range processMap {
prevRunningPods.Add(podName)
}

newPods := strset.Difference(latestRunningPods, prevRunningPods)
podsToDelete := strset.Difference(prevRunningPods, latestRunningPods)
podsToKeep := strset.Intersection(prevRunningPods, latestRunningPods)
runningLogProcesses := CurrentLoggingProcesses(processMap)

// Prioritize adding running pods
podsToAddRunning := []string{}
podsToAddNotRunning := []string{}
sortedPods := latestRunningPods.Slice()
sort.Slice(sortedPods, func(i, j int) bool {
return latestRunningPodsMap[sortedPods[i]].CreationTimestamp.After(latestRunningPodsMap[sortedPods[j]].CreationTimestamp.Time)
})

for podName := range newPods {
pod := latestRunningPodsMap[podName]
if k8s.GetPodStatus(&pod) == k8s.PodStatusRunning {
podsToAddRunning = append(podsToAddRunning, podName)
} else {
podsToAddNotRunning = append(podsToAddNotRunning, podName)
expectedLogProcesses := strset.New()
for i, podName := range sortedPods {
if i >= maxParallelPodLogging {
break
}
expectedLogProcesses.Merge(GetLogKeys(latestRunningPodsMap[podName]))
}
podsToAdd := append(podsToAddRunning, podsToAddNotRunning...)

maxPodsToAdd := maxParallelPodLogging - len(podsToKeep)
if len(podsToAdd) < maxPodsToAdd {
maxPodsToAdd = len(podsToAdd)
}
processesToDelete := strset.Difference(runningLogProcesses, expectedLogProcesses)
processesToAdd := strset.Difference(expectedLogProcesses, runningLogProcesses)

if wrotePending && len(latestRunningPods) > 0 {
if !writeSocket("Streaming logs:", socket) {
Expand All @@ -301,20 +298,20 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
wrotePending = false
}

for _, podName := range podsToAdd[:maxPodsToAdd] {
processList, err := startKubectlProcess(latestRunningPodsMap[podName], previous, &os.ProcAttr{
for logProcess := range processesToAdd {
process, err := createKubectlProcess(StringToLogKey(logProcess), &os.ProcAttr{
Files: []*os.File{inr, outw, outw},
})
if err != nil {
socketWriterError <- err
return
}
processMap[podName] = processList
processMap[logProcess] = process
}

deleteMap := make(map[string][]*os.Process, len(podsToDelete))
deleteMap := make(map[string]*os.Process, len(processesToDelete))

for podName := range podsToDelete {
for podName := range processesToDelete {
deleteMap[podName] = processMap[podName]
delete(processMap, podName)
}
Expand All @@ -330,17 +327,13 @@ func podCheck(podCheckCancel chan struct{}, socket *websocket.Conn, initialPodLi
}
}

func deleteProcesses(processMap map[string][]*os.Process) {
for _, processes := range processMap {
for _, process := range processes {
process.Signal(os.Interrupt)
}
func deleteProcesses(processMap map[string]*os.Process) {
for _, process := range processMap {
process.Signal(os.Interrupt)
}
time.Sleep(5 * time.Second)
for _, processes := range processMap {
for _, process := range processes {
process.Signal(os.Kill)
}
time.Sleep(3 * time.Second)
for _, process := range processMap {
process.Signal(os.Kill)
}
}

Expand Down