@@ -14,13 +14,14 @@ import (
14
14
)
15
15
16
16
var (
17
- cpu string = "2 "
17
+ cpu string = "4 "
18
18
memory string = "2048Mi"
19
- cpuLimit string = "2 "
19
+ cpuLimit string = "4 "
20
20
memoryLimit string = "2048Mi"
21
21
volumeSize string = "1Gi"
22
22
taskslots uint8 = 10
23
23
replicas uint8 = 1
24
+ Previledged bool = false
24
25
)
25
26
26
27
// Description and Examples for creating flink clsuters
@@ -35,7 +36,12 @@ var FlinkClusterCmd = &cobra.Command{
35
36
Use : "flink [NAME]" ,
36
37
Short : "create a flink cluster" ,
37
38
Long : flinkLongDesc ,
38
- Args : cobra .ExactArgs (1 ),
39
+ Args : func (cmd * cobra.Command , args []string ) error {
40
+ if len (args ) != 1 {
41
+ return fmt .Errorf ("flink command requires exactly one argument: cluster Name. Provided %d arguments" , len (args ))
42
+ }
43
+ return nil
44
+ },
39
45
Run : func (cmd * cobra.Command , args []string ) {
40
46
profile , err := utils .ValidateCluster ()
41
47
@@ -47,11 +53,9 @@ var FlinkClusterCmd = &cobra.Command{
47
53
fmt .Println ("Flink Operator not initialized on this cluster" )
48
54
return
49
55
}
50
-
51
56
namespace := "flink"
57
+
52
58
flinkVersion := "v1_16"
53
- flinkImage := fmt .Sprintf ("beamstackproj/flink-%s:latest" , flinkVersion )
54
- taskmanagerImage := fmt .Sprintf ("beamstackproj/beam-harness-%s:latest" , flinkVersion )
55
59
ClaimName := fmt .Sprintf ("%s-pvc" , args [0 ])
56
60
fmt .Printf ("creating flink cluster %s\n " , args [0 ])
57
61
@@ -68,82 +72,188 @@ var FlinkClusterCmd = &cobra.Command{
68
72
return
69
73
}
70
74
71
- spec := types.FlinkDeploymentSpec {
72
- Image : & flinkImage ,
73
- ImagePullPolicy : "IfNotPresent" ,
74
- FlinkVersion : flinkVersion ,
75
- FlinkConfiguration : map [string ]string {
76
- "taskmanager.numberOfTaskSlots" : fmt .Sprintf ("%d" , taskslots ),
77
- },
78
- ServiceAccount : "flink" ,
79
- PodTemplate : & v1.PodTemplateSpec {
80
- Spec : v1.PodSpec {
81
- Containers : []v1.Container {
82
- {
83
- Name : "flink-main-container" ,
84
- VolumeMounts : []v1.VolumeMount {
85
- {
86
- MountPath : "/opt/flink/log" ,
87
- Name : "flink-logs" ,
75
+ var spec types.FlinkDeploymentSpec
76
+ taskmanagerImage := fmt .Sprintf ("beamstackproj/beam-harness-%s:latest" , flinkVersion )
77
+
78
+ if Previledged {
79
+ flinkImage := fmt .Sprintf ("beamstackproj/flink-%s-docker:latest" , flinkVersion )
80
+
81
+ spec = types.FlinkDeploymentSpec {
82
+ Image : & flinkImage ,
83
+ ImagePullPolicy : "IfNotPresent" ,
84
+ FlinkVersion : flinkVersion ,
85
+ FlinkConfiguration : map [string ]string {
86
+ "taskmanager.numberOfTaskSlots" : fmt .Sprintf ("%d" , taskslots ),
87
+ },
88
+ ServiceAccount : "flink" ,
89
+ PodTemplate : & v1.PodTemplateSpec {
90
+ Spec : v1.PodSpec {
91
+ Containers : []v1.Container {
92
+ {
93
+ Name : "flink-main-container" ,
94
+ Image : flinkImage ,
95
+ VolumeMounts : []v1.VolumeMount {},
96
+ SecurityContext : & v1.SecurityContext {
97
+ Privileged : func (b bool ) * bool { return & b }(true ),
88
98
},
89
99
},
90
100
},
101
+ Volumes : []v1.Volume {},
91
102
},
92
- Volumes : []v1.Volume {
93
- {
94
- Name : "flink-logs" ,
95
- },
103
+ },
104
+ JobManager : types.JobManagerSpec {
105
+ Replicas : 1 ,
106
+ Resource : types.Resource {
107
+ Memory : memory ,
108
+ CPU : cpu ,
96
109
},
97
110
},
98
- },
99
- JobManager : types.JobManagerSpec {
100
- Replicas : 1 ,
101
- Resource : types.Resource {
102
- Memory : memory ,
103
- CPU : cpu ,
111
+ TaskManager : types.TaskManagerSpec {
112
+ Replicas : replicas ,
113
+ Resource : types.Resource {
114
+ Memory : memory ,
115
+ CPU : cpu ,
116
+ },
117
+
118
+ PodTemplate : & v1.PodTemplateSpec {
119
+ Spec : v1.PodSpec {
120
+ Containers : []v1.Container {
121
+ {
122
+ Name : "worker" ,
123
+ Image : taskmanagerImage ,
124
+ Args : []string {"-worker_pool" },
125
+ Ports : []v1.ContainerPort {
126
+ {
127
+ Name : "harness-port" ,
128
+ ContainerPort : 50000 ,
129
+ },
130
+ },
131
+ VolumeMounts : []v1.VolumeMount {
132
+ {
133
+ MountPath : "/pvc" ,
134
+ Name : "flink-cluster-pvc" ,
135
+ },
136
+ },
137
+ },
138
+ {
139
+ Name : "flink-main-container" ,
140
+ SecurityContext : & v1.SecurityContext {
141
+ Privileged : func (b bool ) * bool { return & b }(true ),
142
+ },
143
+ VolumeMounts : []v1.VolumeMount {
144
+ {
145
+ MountPath : "/var/run/docker.sock" ,
146
+ Name : "docker-socket" ,
147
+ },
148
+ },
149
+ },
150
+ },
151
+ Volumes : []v1.Volume {
152
+ {
153
+ Name : "flink-cluster-pvc" ,
154
+ VolumeSource : v1.VolumeSource {
155
+ PersistentVolumeClaim : & v1.PersistentVolumeClaimVolumeSource {
156
+ ClaimName : ClaimName ,
157
+ },
158
+ },
159
+ },
160
+ {
161
+ Name : "docker-socket" ,
162
+ VolumeSource : v1.VolumeSource {
163
+ HostPath : & v1.HostPathVolumeSource {
164
+ Path : "/var/run/docker.sock" ,
165
+ Type : func () * v1.HostPathType {
166
+ t := v1 .HostPathSocket
167
+ return & t
168
+ }(),
169
+ },
170
+ },
171
+ },
172
+ },
173
+ },
174
+ },
104
175
},
105
- },
106
- TaskManager : types.TaskManagerSpec {
107
- Replicas : replicas ,
108
- Resource : types.Resource {
109
- Memory : memory ,
110
- CPU : cpu ,
176
+ }
177
+ } else {
178
+ flinkImage := fmt .Sprintf ("beamstackproj/flink-%s:latest" , flinkVersion )
179
+ spec = types.FlinkDeploymentSpec {
180
+ Image : & flinkImage ,
181
+ ImagePullPolicy : "IfNotPresent" ,
182
+ FlinkVersion : flinkVersion ,
183
+ FlinkConfiguration : map [string ]string {
184
+ "taskmanager.numberOfTaskSlots" : fmt .Sprintf ("%d" , taskslots ),
111
185
},
112
-
186
+ ServiceAccount : "flink" ,
113
187
PodTemplate : & v1.PodTemplateSpec {
114
188
Spec : v1.PodSpec {
115
189
Containers : []v1.Container {
116
190
{
117
- Name : "worker" ,
118
- Image : taskmanagerImage ,
119
- Args : []string {"-worker_pool" },
120
- Ports : []v1.ContainerPort {
121
- {
122
- Name : "harness-port" ,
123
- ContainerPort : 50000 ,
124
- },
125
- },
191
+ Name : "flink-main-container" ,
126
192
VolumeMounts : []v1.VolumeMount {
127
193
{
128
- MountPath : "/pvc " ,
129
- Name : "flink-cluster-pvc " ,
194
+ MountPath : "/opt/flink/log " ,
195
+ Name : "flink-logs " ,
130
196
},
131
197
},
132
198
},
133
199
},
134
200
Volumes : []v1.Volume {
135
201
{
136
- Name : "flink-cluster-pvc" ,
137
- VolumeSource : v1.VolumeSource {
138
- PersistentVolumeClaim : & v1.PersistentVolumeClaimVolumeSource {
139
- ClaimName : ClaimName ,
202
+ Name : "flink-logs" ,
203
+ },
204
+ },
205
+ },
206
+ },
207
+ JobManager : types.JobManagerSpec {
208
+ Replicas : 1 ,
209
+ Resource : types.Resource {
210
+ Memory : memory ,
211
+ CPU : cpu ,
212
+ },
213
+ },
214
+ TaskManager : types.TaskManagerSpec {
215
+ Replicas : replicas ,
216
+ Resource : types.Resource {
217
+ Memory : memory ,
218
+ CPU : cpu ,
219
+ },
220
+
221
+ PodTemplate : & v1.PodTemplateSpec {
222
+ Spec : v1.PodSpec {
223
+ Containers : []v1.Container {
224
+ {
225
+ Name : "worker" ,
226
+ Image : taskmanagerImage ,
227
+ Args : []string {"-worker_pool" },
228
+ Ports : []v1.ContainerPort {
229
+ {
230
+ Name : "harness-port" ,
231
+ ContainerPort : 50000 ,
232
+ },
233
+ },
234
+ VolumeMounts : []v1.VolumeMount {
235
+ {
236
+ MountPath : "/pvc" ,
237
+ Name : "flink-cluster-pvc" ,
238
+ },
239
+ },
240
+ },
241
+ },
242
+ Volumes : []v1.Volume {
243
+ {
244
+ Name : "flink-cluster-pvc" ,
245
+ VolumeSource : v1.VolumeSource {
246
+ PersistentVolumeClaim : & v1.PersistentVolumeClaimVolumeSource {
247
+ ClaimName : ClaimName ,
248
+ },
140
249
},
141
250
},
142
251
},
143
252
},
144
253
},
145
254
},
146
- },
255
+ }
256
+
147
257
}
148
258
149
259
err = objects .CreateDynamicResource (
@@ -176,4 +286,5 @@ func init() {
176
286
FlinkClusterCmd .Flags ().Uint8Var (& replicas , "replicas" , replicas , "numbers of replicas sets for task manager" )
177
287
FlinkClusterCmd .Flags ().Uint8Var (& taskslots , "taskslots" , taskslots , "numbers of taskslots to be created for the task manager" )
178
288
FlinkClusterCmd .Flags ().StringVar (& volumeSize , "volumeSize" , volumeSize , "size of persistent volume to be attached to flink cluster" )
289
+ FlinkClusterCmd .Flags ().BoolVar (& Previledged , "Previledged" , Previledged , "" )
179
290
}
0 commit comments