From ff445912533d0d46b02404b3034520cb43713065 Mon Sep 17 00:00:00 2001 From: Anderson Laurentino Date: Thu, 12 May 2022 00:30:07 -0300 Subject: [PATCH] Add parameters on 'sparkctl create' to delete SparkApplication if exists and to follow logs (#1506) --- sparkctl/cmd/create.go | 14 +++++++ sparkctl/cmd/delete.go | 7 +++- sparkctl/cmd/log.go | 86 +++++++++++++++++++++++++++++++----------- 3 files changed, 82 insertions(+), 25 deletions(-) diff --git a/sparkctl/cmd/create.go b/sparkctl/cmd/create.go index 318829a87d..21b56aa5d2 100644 --- a/sparkctl/cmd/create.go +++ b/sparkctl/cmd/create.go @@ -41,6 +41,8 @@ import ( const bufferSize = 1024 +var DeleteIfExists bool +var LogsEnabled bool var RootPath string var UploadToPath string var UploadToEndpoint string @@ -90,6 +92,10 @@ var createCmd = &cobra.Command{ } func init() { + createCmd.Flags().BoolVarP(&DeleteIfExists, "delete", "d", false, + "delete the SparkApplication if already exists") + createCmd.Flags().BoolVarP(&LogsEnabled, "logs", "l", false, + "watch the SparkApplication logs") createCmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "", "the name of the bucket where local application dependencies are to be uploaded") createCmd.Flags().StringVarP(&RootPath, "upload-prefix", "p", "", @@ -151,6 +157,10 @@ func createFromScheduledSparkApplication(name string, kubeClient clientset.Inter } func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.Interface, crdClient crdclientset.Interface) error { + if DeleteIfExists { + deleteSparkApplication(app.Name, crdClient) + } + v1beta2.SetSparkApplicationDefaults(app) if err := validateSpec(app.Spec); err != nil { return err @@ -177,6 +187,10 @@ func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset. fmt.Printf("SparkApplication \"%s\" created\n", app.Name) + if LogsEnabled { + doLog(app.Name, true, kubeClient, crdClient) + } + return nil } diff --git a/sparkctl/cmd/delete.go b/sparkctl/cmd/delete.go index bd70301805..293fc345a2 100644 --- a/sparkctl/cmd/delete.go +++ b/sparkctl/cmd/delete.go @@ -51,8 +51,7 @@ var deleteCmd = &cobra.Command{ } func doDelete(name string, crdClientset crdclientset.Interface) error { - err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) - if err != nil { + if err := deleteSparkApplication(name, crdClientset); err != nil { return err } @@ -60,3 +59,7 @@ func doDelete(name string, crdClientset crdclientset.Interface) error { return nil } + +func deleteSparkApplication(name string, crdClientset crdclientset.Interface) error { + return crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{}) +} diff --git a/sparkctl/cmd/log.go b/sparkctl/cmd/log.go index 8d61f42cac..f7ce655fb4 100644 --- a/sparkctl/cmd/log.go +++ b/sparkctl/cmd/log.go @@ -21,7 +21,7 @@ import ( "fmt" "io" "os" - "strings" + "time" "github.com/spf13/cobra" @@ -57,7 +57,7 @@ var logCommand = &cobra.Command{ return } - if err := doLog(args[0], kubeClientset, crdClientset); err != nil { + if err := doLog(args[0], FollowLogs, kubeClientset, crdClientset); err != nil { fmt.Fprintf(os.Stderr, "failed to get driver logs of SparkApplication %s: %v\n", args[0], err) } }, @@ -69,36 +69,76 @@ func init() { logCommand.Flags().BoolVarP(&FollowLogs, "follow", "f", false, "whether to stream the logs") } -func doLog(name string, kubeClientset clientset.Interface, crdClientset crdclientset.Interface) error { - app, err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to get SparkApplication %s: %v", name, err) - } +func doLog( + name string, + followLogs bool, + kubeClient clientset.Interface, + crdClient crdclientset.Interface) error { + timeout := 30 * time.Second + + podNameChannel := getPodNameChannel(name, crdClient) var podName string - if ExecutorId < 0 { - podName = app.Status.DriverInfo.PodName - } else { - podName = strings.NewReplacer("driver", fmt.Sprintf("exec-%d", ExecutorId)). - Replace(app.Status.DriverInfo.PodName) + + select { + case podName = <-podNameChannel: + case <-time.After(timeout): + return fmt.Errorf("not found pod name") } - if podName == "" { - return fmt.Errorf("unable to fetch logs as the name of the target pod is empty") + waitLogsChannel := waitForLogsFromPodChannel(podName, kubeClient, crdClient) + + select { + case <-waitLogsChannel: + case <-time.After(timeout): + return fmt.Errorf("timeout to fetch logs from pod \"%s\"", podName) } - out := os.Stdout - if FollowLogs { - if err := streamLogs(out, kubeClientset, podName); err != nil { - return err - } + if followLogs { + return streamLogs(os.Stdout, kubeClient, podName) } else { - if err := printLogs(out, kubeClientset, podName); err != nil { - return err - } + return printLogs(os.Stdout, kubeClient, podName) } +} - return nil +func getPodNameChannel( + sparkApplicationName string, + crdClient crdclientset.Interface) chan string { + + channel := make(chan string, 1) + go func() { + for true { + app, _ := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Get( + context.TODO(), + sparkApplicationName, + metav1.GetOptions{}) + + if app.Status.DriverInfo.PodName != "" { + channel <- app.Status.DriverInfo.PodName + break + } + } + }() + return channel +} + +func waitForLogsFromPodChannel( + podName string, + kubeClient clientset.Interface, + crdClient crdclientset.Interface) chan bool { + + channel := make(chan bool, 1) + go func() { + for true { + _, err := kubeClient.CoreV1().Pods(Namespace).GetLogs(podName, &apiv1.PodLogOptions{}).Do(context.TODO()).Raw() + + if err == nil { + channel <- true + break + } + } + }() + return channel } // printLogs is a one time operation that prints the fetched logs of the given pod.