Skip to content

Commit 974e6a5

Browse files
committed
WIP flink cluster + beamyaml deplpyment
1 parent 4c9b07d commit 974e6a5

21 files changed

+971
-122
lines changed

go.mod

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ require (
1212
github.com/spf13/viper v1.18.2
1313
gopkg.in/yaml.v2 v2.4.0
1414
helm.sh/helm/v3 v3.15.2
15-
k8s.io/api v0.30.2
16-
k8s.io/apimachinery v0.30.2
17-
k8s.io/client-go v0.30.2
15+
k8s.io/api v0.30.3
16+
k8s.io/apimachinery v0.30.3
17+
k8s.io/client-go v0.30.3
1818
)
1919

2020
require (

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,16 +575,22 @@ honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWh
575575
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
576576
k8s.io/api v0.30.2 h1:+ZhRj+28QT4UOH+BKznu4CBgPWgkXO7XAvMcMl0qKvI=
577577
k8s.io/api v0.30.2/go.mod h1:ULg5g9JvOev2dG0u2hig4Z7tQ2hHIuS+m8MNZ+X6EmI=
578+
k8s.io/api v0.30.3 h1:ImHwK9DCsPA9uoU3rVh4QHAHHK5dTSv1nxJUapx8hoQ=
579+
k8s.io/api v0.30.3/go.mod h1:GPc8jlzoe5JG3pb0KJCSLX5oAFIW3/qNJITlDj8BH04=
578580
k8s.io/apiextensions-apiserver v0.30.0 h1:jcZFKMqnICJfRxTgnC4E+Hpcq8UEhT8B2lhBcQ+6uAs=
579581
k8s.io/apiextensions-apiserver v0.30.0/go.mod h1:N9ogQFGcrbWqAY9p2mUAL5mGxsLqwgtUce127VtRX5Y=
580582
k8s.io/apimachinery v0.30.2 h1:fEMcnBj6qkzzPGSVsAZtQThU62SmQ4ZymlXRC5yFSCg=
581583
k8s.io/apimachinery v0.30.2/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
584+
k8s.io/apimachinery v0.30.3 h1:q1laaWCmrszyQuSQCfNB8cFgCuDAoPszKY4ucAjDwHc=
585+
k8s.io/apimachinery v0.30.3/go.mod h1:iexa2somDaxdnj7bha06bhb43Zpa6eWH8N8dbqVjTUc=
582586
k8s.io/apiserver v0.30.0 h1:QCec+U72tMQ+9tR6A0sMBB5Vh6ImCEkoKkTDRABWq6M=
583587
k8s.io/apiserver v0.30.0/go.mod h1:smOIBq8t0MbKZi7O7SyIpjPsiKJ8qa+llcFCluKyqiY=
584588
k8s.io/cli-runtime v0.30.0 h1:0vn6/XhOvn1RJ2KJOC6IRR2CGqrpT6QQF4+8pYpWQ48=
585589
k8s.io/cli-runtime v0.30.0/go.mod h1:vATpDMATVTMA79sZ0YUCzlMelf6rUjoBzlp+RnoM+cg=
586590
k8s.io/client-go v0.30.2 h1:sBIVJdojUNPDU/jObC+18tXWcTJVcwyqS9diGdWHk50=
587591
k8s.io/client-go v0.30.2/go.mod h1:JglKSWULm9xlJLx4KCkfLLQ7XwtlbflV6uFFSHTMgVs=
592+
k8s.io/client-go v0.30.3 h1:bHrJu3xQZNXIi8/MoxYtZBBWQQXwy16zqJwloXXfD3k=
593+
k8s.io/client-go v0.30.3/go.mod h1:8d4pf8vYu665/kUbsxWAQ/JDBNWqfFeZnvFiVdmx89U=
588594
k8s.io/component-base v0.30.0 h1:cj6bp38g0ainlfYtaOQuRELh5KSYjhKxM+io7AUIk4o=
589595
k8s.io/component-base v0.30.0/go.mod h1:V9x/0ePFNaKeKYA3bOvIbrNoluTSG+fSJKjLdjOoeXQ=
590596
k8s.io/klog/v2 v2.120.1 h1:QXU6cPEOIslTGvZaXvFWiP9VKyeet3sawzTOvdXb4Vw=

src/cmd/create/create.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
Copyright © 2024 NAME HERE <EMAIL ADDRESS>
3+
*/
4+
package create
5+
6+
import (
7+
"github.com/spf13/cobra"
8+
)
9+
10+
// infoCmd represents the info command
11+
var CreateCmd = &cobra.Command{
12+
Use: "create",
13+
Short: "'create' sub commands",
14+
Long: `create flink or spark clusters`,
15+
}
16+
17+
func init() {
18+
19+
CreateCmd.AddCommand(FlinkClusterCmd)
20+
}

src/cmd/create/flink_cluster.go

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package create
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
7+
"github.com/BeamStackProj/beamstack-cli/src/objects"
8+
"github.com/BeamStackProj/beamstack-cli/src/types"
9+
"github.com/BeamStackProj/beamstack-cli/src/utils"
10+
"github.com/spf13/cobra"
11+
12+
v1 "k8s.io/api/core/v1"
13+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14+
"k8s.io/client-go/kubernetes"
15+
)
16+
17+
var (
18+
cpu string = "2"
19+
memory string = "2048Mi"
20+
cpuLimit string = "2"
21+
memoryLimit string = "2048Mi"
22+
volumeSize string = "1Gi"
23+
taskslots uint8 = 10
24+
replicas uint8 = 1
25+
)
26+
27+
// infoCmd represents the info command
28+
var FlinkClusterCmd = &cobra.Command{
29+
Use: "flink-cluster",
30+
Short: "create a flink cluster",
31+
Long: `create a flink cluster`,
32+
Args: cobra.ExactArgs(1),
33+
Run: func(cmd *cobra.Command, args []string) {
34+
35+
profile, err := utils.ValidateCluster()
36+
37+
if err != nil {
38+
fmt.Println(err)
39+
return
40+
}
41+
if profile.Operators.Flink == nil {
42+
fmt.Println("Flink Operator not initialized on this cluster")
43+
return
44+
}
45+
46+
namespace := "flink"
47+
flinkVersion := strings.Replace(profile.Operators.Flink.Version, ".", "_", 1)
48+
flinkImage := fmt.Sprintf("beamstackproj/flink-v%s:latest", flinkVersion)
49+
taskmanagerImage := fmt.Sprintf("beamstackproj/flink-harness%s:latest", flinkVersion)
50+
ClaimName := fmt.Sprintf("%s-pvc", args[0])
51+
fmt.Printf("creating flink cluster %s\n", args[0])
52+
53+
config := utils.GetKubeConfig()
54+
55+
clientset, err := kubernetes.NewForConfig(config)
56+
if err != nil {
57+
fmt.Println(err)
58+
return
59+
}
60+
61+
if err := objects.CreatePVC(clientset, ClaimName, namespace, volumeSize); err != nil {
62+
fmt.Println(err)
63+
return
64+
}
65+
66+
spec := types.FlinkDeploymentSpec{
67+
Image: &flinkImage,
68+
ImagePullPolicy: "IfNotPresent",
69+
FlinkVersion: flinkVersion,
70+
FlinkConfiguration: map[string]string{
71+
"taskmanager.numberOfTaskSlots": string(taskslots),
72+
},
73+
ServiceAccountName: "flink",
74+
PodTemplate: &v1.PodTemplateSpec{
75+
Spec: v1.PodSpec{
76+
Containers: []v1.Container{
77+
{
78+
Name: "flink-main-container",
79+
VolumeMounts: []v1.VolumeMount{
80+
{
81+
MountPath: "/opt/flink/log",
82+
Name: "flink-logs",
83+
},
84+
},
85+
},
86+
},
87+
Volumes: []v1.Volume{
88+
{
89+
Name: "flink-logs",
90+
},
91+
},
92+
},
93+
},
94+
JobManager: types.JobManagerSpec{
95+
Resource: types.Resource{
96+
Memory: memory,
97+
CPU: cpu,
98+
},
99+
},
100+
TaskManager: types.TaskManagerSpec{
101+
Replicas: replicas,
102+
Resource: types.Resource{
103+
Memory: memory,
104+
CPU: cpu,
105+
},
106+
107+
PodTemplate: &v1.PodTemplateSpec{
108+
Spec: v1.PodSpec{
109+
Containers: []v1.Container{
110+
{
111+
Name: "worker",
112+
Image: taskmanagerImage,
113+
Args: []string{"-worker_pool"},
114+
Ports: []v1.ContainerPort{
115+
{
116+
Name: "harness-port",
117+
ContainerPort: 50000,
118+
},
119+
},
120+
VolumeMounts: []v1.VolumeMount{
121+
{
122+
MountPath: "/pvc",
123+
Name: "flink-cluster-pvc",
124+
},
125+
},
126+
},
127+
},
128+
Volumes: []v1.Volume{
129+
{
130+
Name: "flink-cluster-pvc",
131+
VolumeSource: v1.VolumeSource{
132+
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
133+
ClaimName: ClaimName,
134+
},
135+
},
136+
},
137+
},
138+
},
139+
},
140+
},
141+
}
142+
143+
err = objects.CreateDynamicResource(
144+
metav1.TypeMeta{
145+
APIVersion: "flink.apache.org/v1beta1",
146+
Kind: "FlinkDeployment",
147+
},
148+
metav1.ObjectMeta{
149+
Name: args[0],
150+
Namespace: "flink",
151+
},
152+
spec,
153+
)
154+
155+
if err != nil {
156+
fmt.Println(err)
157+
return
158+
}
159+
},
160+
}
161+
162+
func init() {
163+
FlinkClusterCmd.Flags().StringVar(&cpu, "cpu", cpu, "Cpu request for task manager")
164+
FlinkClusterCmd.Flags().StringVar(&cpuLimit, "cpuLimit", cpu, "Cpu request for task manager")
165+
FlinkClusterCmd.Flags().StringVar(&memory, "memory", memory, "Cpu request for task manager")
166+
FlinkClusterCmd.Flags().StringVar(&memoryLimit, "memoryLimit", memoryLimit, "Cpu request for task manager")
167+
FlinkClusterCmd.Flags().Uint8Var(&replicas, "replicas", replicas, "numbers of replicas sets for task manager")
168+
FlinkClusterCmd.Flags().Uint8Var(&taskslots, "taskslots", taskslots, "numbers of taskslots to be created for the task manager")
169+
FlinkClusterCmd.Flags().StringVar(&volumeSize, "volumeSize", volumeSize, "size of persistent volume to be attached to flink cluster")
170+
}

src/cmd/deploy/deploy.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,30 +4,16 @@ Copyright © 2024 NAME HERE <EMAIL ADDRESS>
44
package deploy
55

66
import (
7-
"fmt"
8-
97
"github.com/spf13/cobra"
108
)
119

1210
// infoCmd represents the info command
1311
var DeployCmd = &cobra.Command{
1412
Use: "deploy",
15-
Short: "Pallete that contains information based commands",
16-
Long: `Pallete that contains information based commands`,
17-
Run: func(cmd *cobra.Command, args []string) {
18-
fmt.Println("deploying resources!!")
19-
},
13+
Short: "deploy resources to k8s cluster",
14+
Long: `deploy resources to k8s cluster`,
2015
}
2116

2217
func init() {
23-
24-
// Here you will define your flags and configuration settings.
25-
26-
// Cobra supports Persistent Flags which will work for this command
27-
// and all subcommands, e.g.:
28-
// infoCmd.PersistentFlags().String("foo", "", "A help for foo")
29-
30-
// Cobra supports local flags which will only run when this command
31-
// is called directly, e.g.:
32-
// infoCmd.Flags().BoolP("toggle", "t", false, "Help message for toggle")
18+
DeployCmd.AddCommand(PipelineCmd)
3319
}

0 commit comments

Comments
 (0)