Skip to content

Commit

Permalink
Add parameters on 'sparkctl create' to delete SparkApplication if exi…
Browse files Browse the repository at this point in the history
…sts and to follow logs (kubeflow#1506)
  • Loading branch information
alaurentinoofficial authored May 12, 2022
1 parent 0e919bf commit ff44591
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 25 deletions.
14 changes: 14 additions & 0 deletions sparkctl/cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (

const bufferSize = 1024

var DeleteIfExists bool
var LogsEnabled bool
var RootPath string
var UploadToPath string
var UploadToEndpoint string
Expand Down Expand Up @@ -90,6 +92,10 @@ var createCmd = &cobra.Command{
}

func init() {
createCmd.Flags().BoolVarP(&DeleteIfExists, "delete", "d", false,
"delete the SparkApplication if already exists")
createCmd.Flags().BoolVarP(&LogsEnabled, "logs", "l", false,
"watch the SparkApplication logs")
createCmd.Flags().StringVarP(&UploadToPath, "upload-to", "u", "",
"the name of the bucket where local application dependencies are to be uploaded")
createCmd.Flags().StringVarP(&RootPath, "upload-prefix", "p", "",
Expand Down Expand Up @@ -151,6 +157,10 @@ func createFromScheduledSparkApplication(name string, kubeClient clientset.Inter
}

func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.Interface, crdClient crdclientset.Interface) error {
if DeleteIfExists {
deleteSparkApplication(app.Name, crdClient)
}

v1beta2.SetSparkApplicationDefaults(app)
if err := validateSpec(app.Spec); err != nil {
return err
Expand All @@ -177,6 +187,10 @@ func createSparkApplication(app *v1beta2.SparkApplication, kubeClient clientset.

fmt.Printf("SparkApplication \"%s\" created\n", app.Name)

if LogsEnabled {
doLog(app.Name, true, kubeClient, crdClient)
}

return nil
}

Expand Down
7 changes: 5 additions & 2 deletions sparkctl/cmd/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,15 @@ var deleteCmd = &cobra.Command{
}

func doDelete(name string, crdClientset crdclientset.Interface) error {
err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
if err := deleteSparkApplication(name, crdClientset); err != nil {
return err
}

fmt.Printf("SparkApplication \"%s\" deleted\n", name)

return nil
}

func deleteSparkApplication(name string, crdClientset crdclientset.Interface) error {
return crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}
86 changes: 63 additions & 23 deletions sparkctl/cmd/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"fmt"
"io"
"os"
"strings"
"time"

"github.com/spf13/cobra"

Expand Down Expand Up @@ -57,7 +57,7 @@ var logCommand = &cobra.Command{
return
}

if err := doLog(args[0], kubeClientset, crdClientset); err != nil {
if err := doLog(args[0], FollowLogs, kubeClientset, crdClientset); err != nil {
fmt.Fprintf(os.Stderr, "failed to get driver logs of SparkApplication %s: %v\n", args[0], err)
}
},
Expand All @@ -69,36 +69,76 @@ func init() {
logCommand.Flags().BoolVarP(&FollowLogs, "follow", "f", false, "whether to stream the logs")
}

func doLog(name string, kubeClientset clientset.Interface, crdClientset crdclientset.Interface) error {
app, err := crdClientset.SparkoperatorV1beta2().SparkApplications(Namespace).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get SparkApplication %s: %v", name, err)
}
func doLog(
name string,
followLogs bool,
kubeClient clientset.Interface,
crdClient crdclientset.Interface) error {

timeout := 30 * time.Second

podNameChannel := getPodNameChannel(name, crdClient)
var podName string
if ExecutorId < 0 {
podName = app.Status.DriverInfo.PodName
} else {
podName = strings.NewReplacer("driver", fmt.Sprintf("exec-%d", ExecutorId)).
Replace(app.Status.DriverInfo.PodName)

select {
case podName = <-podNameChannel:
case <-time.After(timeout):
return fmt.Errorf("not found pod name")
}

if podName == "" {
return fmt.Errorf("unable to fetch logs as the name of the target pod is empty")
waitLogsChannel := waitForLogsFromPodChannel(podName, kubeClient, crdClient)

select {
case <-waitLogsChannel:
case <-time.After(timeout):
return fmt.Errorf("timeout to fetch logs from pod \"%s\"", podName)
}

out := os.Stdout
if FollowLogs {
if err := streamLogs(out, kubeClientset, podName); err != nil {
return err
}
if followLogs {
return streamLogs(os.Stdout, kubeClient, podName)
} else {
if err := printLogs(out, kubeClientset, podName); err != nil {
return err
}
return printLogs(os.Stdout, kubeClient, podName)
}
}

return nil
func getPodNameChannel(
sparkApplicationName string,
crdClient crdclientset.Interface) chan string {

channel := make(chan string, 1)
go func() {
for true {
app, _ := crdClient.SparkoperatorV1beta2().SparkApplications(Namespace).Get(
context.TODO(),
sparkApplicationName,
metav1.GetOptions{})

if app.Status.DriverInfo.PodName != "" {
channel <- app.Status.DriverInfo.PodName
break
}
}
}()
return channel
}

func waitForLogsFromPodChannel(
podName string,
kubeClient clientset.Interface,
crdClient crdclientset.Interface) chan bool {

channel := make(chan bool, 1)
go func() {
for true {
_, err := kubeClient.CoreV1().Pods(Namespace).GetLogs(podName, &apiv1.PodLogOptions{}).Do(context.TODO()).Raw()

if err == nil {
channel <- true
break
}
}
}()
return channel
}

// printLogs is a one time operation that prints the fetched logs of the given pod.
Expand Down

0 comments on commit ff44591

Please sign in to comment.