Skip to content

Commit

Permalink
Added WatchDeploymentLogsByLabelSelector function
Browse files Browse the repository at this point in the history
  • Loading branch information
adilGhaffarDev committed Aug 9, 2022
1 parent 2fc48a8 commit e6abdc7
Showing 1 changed file with 92 additions and 7 deletions.
99 changes: 92 additions & 7 deletions test/framework/deployment_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ func DescribeFailedDeployment(input WaitForDeploymentsAvailableInput, deployment
return b.String()
}

// WatchDeploymentLogsBySelectorInput is the input for WatchDeploymentLogsByLabelSelector.
type WatchDeploymentLogsBySelectorInput struct {
GetLister GetLister
ClientSet *kubernetes.Clientset
Labels map[string]string
LogPath string
}

// WatchDeploymentLogsInput is the input for WatchDeploymentLogs.
type WatchDeploymentLogsInput struct {
GetLister GetLister
Expand All @@ -105,13 +113,90 @@ type WatchDeploymentLogsInput struct {
// logMetadata contains metadata about the logs.
// The format is very similar to the one used by promtail.
type logMetadata struct {
Job string `json:"job"`
Namespace string `json:"namespace"`
App string `json:"app"`
Pod string `json:"pod"`
Container string `json:"container"`
NodeName string `json:"node_name"`
Stream string `json:"stream"`
Job string `json:"job"`
Namespace string `json:"namespace"`
App string `json:"app"`
Pod string `json:"pod"`
Container string `json:"container"`
NodeName string `json:"node_name"`
Stream string `json:"stream"`
Labels map[string]string `json:"labels,omitempty"`
}

// WatchDeploymentLogsByLabelSelector streams logs for all containers for all pods belonging to a deployment on the basis of label. Each container's logs are streamed
// in a separate goroutine so they can all be streamed concurrently. This only causes a test failure if there are errors
// retrieving the deployment, its pods, or setting up a log file. If there is an error with the log streaming itself,
// that does not cause the test to fail.
func WatchDeploymentLogsByLabelSelector(ctx context.Context, input WatchDeploymentLogsBySelectorInput) {
Expect(ctx).NotTo(BeNil(), "ctx is required for WatchControllerLogs")
Expect(input.ClientSet).NotTo(BeNil(), "input.ClientSet is required for WatchControllerLogs")
Expect(input.Labels).NotTo(BeNil(), "input.Selector is required for WatchControllerLogs")

deploymentList := &appsv1.DeploymentList{}
Eventually(func() error {
return input.GetLister.List(ctx, deploymentList, client.MatchingLabels(input.Labels))
}, retryableOperationTimeout, retryableOperationInterval).Should(Succeed(), "Failed to get deployment for labels")

for _, deployment := range deploymentList.Items {
pods := &corev1.PodList{}
Expect(input.GetLister.List(ctx, pods, client.InNamespace(deployment.Namespace), client.MatchingLabels(input.Labels))).To(Succeed(), "Failed to list Pods for deployment %s/%s", deployment.Namespace, deployment.Name)
for _, pod := range pods.Items {
for _, container := range deployment.Spec.Template.Spec.Containers {
log.Logf("Creating log watcher for controller %s/%s, pod %s, container %s", deployment.Namespace, deployment.Name, pod.Name, container.Name)

// Create log metadata file.
logMetadataFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+"-log-metadata.json"))
Expect(os.MkdirAll(filepath.Dir(logMetadataFile), 0750)).To(Succeed())

metadata := logMetadata{
Job: deployment.Namespace + "/" + deployment.Name,
Namespace: deployment.Namespace,
App: deployment.Name,
Pod: pod.Name,
Container: container.Name,
NodeName: pod.Spec.NodeName,
Stream: "stderr",
Labels: input.Labels,
}
metadataBytes, err := json.Marshal(&metadata)
Expect(err).To(BeNil())
Expect(os.WriteFile(logMetadataFile, metadataBytes, 0600)).To(Succeed())

// Watch each container's logs in a goroutine so we can stream them all concurrently.
go func(pod corev1.Pod, container corev1.Container) {
defer GinkgoRecover()

logFile := filepath.Clean(path.Join(input.LogPath, deployment.Name, pod.Name, container.Name+".log"))
Expect(os.MkdirAll(filepath.Dir(logFile), 0750)).To(Succeed())

f, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600)
Expect(err).NotTo(HaveOccurred())
defer f.Close()

opts := &corev1.PodLogOptions{
Container: container.Name,
Follow: true,
}

podLogs, err := input.ClientSet.CoreV1().Pods(deployment.Namespace).GetLogs(pod.Name, opts).Stream(ctx)
if err != nil {
// Failing to stream logs should not cause the test to fail
log.Logf("Error starting logs stream for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err)
return
}
defer podLogs.Close()

out := bufio.NewWriter(f)
defer out.Flush()
_, err = out.ReadFrom(podLogs)
if err != nil && err != io.ErrUnexpectedEOF {
// Failing to stream logs should not cause the test to fail
log.Logf("Got error while streaming logs for pod %s/%s, container %s: %v", deployment.Namespace, pod.Name, container.Name, err)
}
}(pod, container)
}
}
}
}

// WatchDeploymentLogs streams logs for all containers for all pods belonging to a deployment. Each container's logs are streamed
Expand Down

0 comments on commit e6abdc7

Please sign in to comment.