1
- //! Ensures that `Pod`s are configured and running for each [`AirflowCluster`]
1
+ //! Ensures that `Pod`s are configured and running for each [`v1alpha1:: AirflowCluster`]
2
2
use std:: {
3
3
collections:: { BTreeMap , BTreeSet , HashMap } ,
4
4
io:: Write ,
@@ -13,18 +13,6 @@ use product_config::{
13
13
ProductConfigManager ,
14
14
} ;
15
15
use snafu:: { OptionExt , ResultExt , Snafu } ;
16
- use stackable_airflow_crd:: {
17
- authentication:: {
18
- AirflowAuthenticationClassResolved , AirflowClientAuthenticationDetailsResolved ,
19
- } ,
20
- build_recommended_labels,
21
- git_sync:: GitSync ,
22
- AirflowCluster , AirflowClusterStatus , AirflowConfig , AirflowConfigOptions , AirflowExecutor ,
23
- AirflowRole , Container , ExecutorConfig , ExecutorConfigFragment , AIRFLOW_CONFIG_FILENAME ,
24
- AIRFLOW_UID , APP_NAME , CONFIG_PATH , GIT_CONTENT , GIT_ROOT , GIT_SYNC_NAME , LOG_CONFIG_DIR ,
25
- OPERATOR_NAME , STACKABLE_LOG_DIR , TEMPLATE_CONFIGMAP_NAME , TEMPLATE_LOCATION , TEMPLATE_NAME ,
26
- TEMPLATE_VOLUME_NAME ,
27
- } ;
28
16
use stackable_operator:: {
29
17
builder:: {
30
18
self ,
@@ -86,6 +74,18 @@ use strum::{EnumDiscriminants, IntoEnumIterator, IntoStaticStr};
86
74
use crate :: {
87
75
config:: { self , PYTHON_IMPORTS } ,
88
76
controller_commons:: { self , CONFIG_VOLUME_NAME , LOG_CONFIG_VOLUME_NAME , LOG_VOLUME_NAME } ,
77
+ crd:: {
78
+ self ,
79
+ authentication:: {
80
+ AirflowAuthenticationClassResolved , AirflowClientAuthenticationDetailsResolved ,
81
+ } ,
82
+ build_recommended_labels,
83
+ git_sync:: { GitSync , GIT_SYNC_CONTENT , GIT_SYNC_NAME , GIT_SYNC_ROOT } ,
84
+ v1alpha1, AirflowClusterStatus , AirflowConfig , AirflowConfigOptions , AirflowExecutor ,
85
+ AirflowRole , Container , ExecutorConfig , ExecutorConfigFragment , AIRFLOW_CONFIG_FILENAME ,
86
+ AIRFLOW_UID , APP_NAME , CONFIG_PATH , LOG_CONFIG_DIR , OPERATOR_NAME , STACKABLE_LOG_DIR ,
87
+ TEMPLATE_CONFIGMAP_NAME , TEMPLATE_LOCATION , TEMPLATE_NAME , TEMPLATE_VOLUME_NAME ,
88
+ } ,
89
89
env_vars:: {
90
90
self , build_airflow_template_envs, build_gitsync_statefulset_envs, build_gitsync_template,
91
91
} ,
@@ -113,7 +113,6 @@ pub struct Ctx {
113
113
114
114
#[ derive( Snafu , Debug , EnumDiscriminants ) ]
115
115
#[ strum_discriminants( derive( IntoStaticStr ) ) ]
116
- #[ allow( clippy:: enum_variant_names) ]
117
116
pub enum Error {
118
117
#[ snafu( display( "object has no namespace" ) ) ]
119
118
ObjectHasNoNamespace ,
@@ -129,19 +128,19 @@ pub enum Error {
129
128
#[ snafu( display( "failed to apply Service for {rolegroup}" ) ) ]
130
129
ApplyRoleGroupService {
131
130
source : stackable_operator:: cluster_resources:: Error ,
132
- rolegroup : RoleGroupRef < AirflowCluster > ,
131
+ rolegroup : RoleGroupRef < v1alpha1 :: AirflowCluster > ,
133
132
} ,
134
133
135
134
#[ snafu( display( "failed to apply ConfigMap for {rolegroup}" ) ) ]
136
135
ApplyRoleGroupConfig {
137
136
source : stackable_operator:: cluster_resources:: Error ,
138
- rolegroup : RoleGroupRef < AirflowCluster > ,
137
+ rolegroup : RoleGroupRef < v1alpha1 :: AirflowCluster > ,
139
138
} ,
140
139
141
140
#[ snafu( display( "failed to apply StatefulSet for {rolegroup}" ) ) ]
142
141
ApplyRoleGroupStatefulSet {
143
142
source : stackable_operator:: cluster_resources:: Error ,
144
- rolegroup : RoleGroupRef < AirflowCluster > ,
143
+ rolegroup : RoleGroupRef < v1alpha1 :: AirflowCluster > ,
145
144
} ,
146
145
147
146
#[ snafu( display( "invalid product config" ) ) ]
@@ -192,19 +191,17 @@ pub enum Error {
192
191
#[ snafu( display( "failed to build config file for {rolegroup}" ) ) ]
193
192
BuildRoleGroupConfigFile {
194
193
source : FlaskAppConfigWriterError ,
195
- rolegroup : RoleGroupRef < AirflowCluster > ,
194
+ rolegroup : RoleGroupRef < v1alpha1 :: AirflowCluster > ,
196
195
} ,
197
196
198
197
#[ snafu( display( "failed to build ConfigMap for {rolegroup}" ) ) ]
199
198
BuildRoleGroupConfig {
200
199
source : stackable_operator:: builder:: configmap:: Error ,
201
- rolegroup : RoleGroupRef < AirflowCluster > ,
200
+ rolegroup : RoleGroupRef < v1alpha1 :: AirflowCluster > ,
202
201
} ,
203
202
204
203
#[ snafu( display( "failed to resolve and merge config for role and role group" ) ) ]
205
- FailedToResolveConfig {
206
- source : stackable_airflow_crd:: Error ,
207
- } ,
204
+ FailedToResolveConfig { source : crd:: Error } ,
208
205
209
206
#[ snafu( display( "could not parse Airflow role [{role}]" ) ) ]
210
207
UnidentifiedAirflowRole {
@@ -249,9 +246,7 @@ pub enum Error {
249
246
} ,
250
247
251
248
#[ snafu( display( "failed to apply authentication configuration" ) ) ]
252
- InvalidAuthenticationConfig {
253
- source : stackable_airflow_crd:: authentication:: Error ,
254
- } ,
249
+ InvalidAuthenticationConfig { source : crd:: authentication:: Error } ,
255
250
256
251
#[ snafu( display( "pod template serialization" ) ) ]
257
252
PodTemplateSerde { source : serde_yaml:: Error } ,
@@ -336,7 +331,7 @@ impl ReconcilerError for Error {
336
331
}
337
332
338
333
pub async fn reconcile_airflow (
339
- airflow : Arc < DeserializeGuard < AirflowCluster > > ,
334
+ airflow : Arc < DeserializeGuard < v1alpha1 :: AirflowCluster > > ,
340
335
ctx : Arc < Ctx > ,
341
336
) -> Result < Action > {
342
337
tracing:: info!( "Starting reconcile" ) ;
@@ -558,7 +553,7 @@ pub async fn reconcile_airflow(
558
553
559
554
#[ allow( clippy:: too_many_arguments) ]
560
555
async fn build_executor_template (
561
- airflow : & AirflowCluster ,
556
+ airflow : & v1alpha1 :: AirflowCluster ,
562
557
common_config : & CommonConfiguration < ExecutorConfigFragment , GenericProductSpecificCommonConfig > ,
563
558
resolved_product_image : & ResolvedProductImage ,
564
559
authentication_config : & AirflowClientAuthenticationDetailsResolved ,
@@ -612,7 +607,7 @@ async fn build_executor_template(
612
607
/// The server-role service is the primary endpoint that should be used by clients that do not perform internal load balancing,
613
608
/// including targets outside the cluster.
614
609
fn build_role_service (
615
- airflow : & AirflowCluster ,
610
+ airflow : & v1alpha1 :: AirflowCluster ,
616
611
resolved_product_image : & ResolvedProductImage ,
617
612
role_name : & str ,
618
613
port : u16 ,
@@ -674,9 +669,9 @@ fn role_port(role_name: &str) -> Option<u16> {
674
669
/// The rolegroup [`ConfigMap`] configures the rolegroup based on the configuration given by the administrator
675
670
#[ allow( clippy:: too_many_arguments) ]
676
671
fn build_rolegroup_config_map (
677
- airflow : & AirflowCluster ,
672
+ airflow : & v1alpha1 :: AirflowCluster ,
678
673
resolved_product_image : & ResolvedProductImage ,
679
- rolegroup : & RoleGroupRef < AirflowCluster > ,
674
+ rolegroup : & RoleGroupRef < v1alpha1 :: AirflowCluster > ,
680
675
rolegroup_config : & HashMap < PropertyNameKind , BTreeMap < String , String > > ,
681
676
authentication_config : & AirflowClientAuthenticationDetailsResolved ,
682
677
logging : & Logging < Container > ,
@@ -760,9 +755,9 @@ fn build_rolegroup_config_map(
760
755
///
761
756
/// This is mostly useful for internal communication between peers, or for clients that perform client-side load balancing.
762
757
fn build_rolegroup_service (
763
- airflow : & AirflowCluster ,
758
+ airflow : & v1alpha1 :: AirflowCluster ,
764
759
resolved_product_image : & ResolvedProductImage ,
765
- rolegroup : & RoleGroupRef < AirflowCluster > ,
760
+ rolegroup : & RoleGroupRef < v1alpha1 :: AirflowCluster > ,
766
761
) -> Result < Service > {
767
762
let mut ports = vec ! [ ServicePort {
768
763
name: Some ( METRICS_PORT_NAME . into( ) ) ,
@@ -807,9 +802,9 @@ fn build_rolegroup_service(
807
802
}
808
803
809
804
fn build_rolegroup_metadata (
810
- airflow : & AirflowCluster ,
805
+ airflow : & v1alpha1 :: AirflowCluster ,
811
806
resolved_product_image : & & ResolvedProductImage ,
812
- rolegroup : & & RoleGroupRef < AirflowCluster > ,
807
+ rolegroup : & & RoleGroupRef < v1alpha1 :: AirflowCluster > ,
813
808
prometheus_label : Label ,
814
809
) -> Result < ObjectMeta , Error > {
815
810
let metadata = ObjectMetaBuilder :: new ( )
@@ -835,10 +830,10 @@ fn build_rolegroup_metadata(
835
830
/// The [`Pod`](`stackable_operator::k8s_openapi::api::core::v1::Pod`)s are accessible through the corresponding [`Service`] (from [`build_rolegroup_service`]).
836
831
#[ allow( clippy:: too_many_arguments) ]
837
832
fn build_server_rolegroup_statefulset (
838
- airflow : & AirflowCluster ,
833
+ airflow : & v1alpha1 :: AirflowCluster ,
839
834
resolved_product_image : & ResolvedProductImage ,
840
835
airflow_role : & AirflowRole ,
841
- rolegroup_ref : & RoleGroupRef < AirflowCluster > ,
836
+ rolegroup_ref : & RoleGroupRef < v1alpha1 :: AirflowCluster > ,
842
837
rolegroup_config : & HashMap < PropertyNameKind , BTreeMap < String , String > > ,
843
838
authentication_config : & AirflowClientAuthenticationDetailsResolved ,
844
839
service_account : & ServiceAccount ,
@@ -1008,7 +1003,7 @@ fn build_server_rolegroup_statefulset(
1008
1003
) ?;
1009
1004
1010
1005
pb. add_volume (
1011
- VolumeBuilder :: new ( GIT_CONTENT )
1006
+ VolumeBuilder :: new ( GIT_SYNC_CONTENT )
1012
1007
. empty_dir ( EmptyDirVolumeSource :: default ( ) )
1013
1008
. build ( ) ,
1014
1009
)
@@ -1116,14 +1111,14 @@ fn build_logging_container(
1116
1111
1117
1112
#[ allow( clippy:: too_many_arguments) ]
1118
1113
fn build_executor_template_config_map (
1119
- airflow : & AirflowCluster ,
1114
+ airflow : & v1alpha1 :: AirflowCluster ,
1120
1115
resolved_product_image : & ResolvedProductImage ,
1121
1116
authentication_config : & AirflowClientAuthenticationDetailsResolved ,
1122
1117
sa_name : & str ,
1123
1118
merged_executor_config : & ExecutorConfig ,
1124
1119
env_overrides : & HashMap < String , String > ,
1125
1120
pod_overrides : & PodTemplateSpec ,
1126
- rolegroup_ref : & RoleGroupRef < AirflowCluster > ,
1121
+ rolegroup_ref : & RoleGroupRef < v1alpha1 :: AirflowCluster > ,
1127
1122
) -> Result < ConfigMap > {
1128
1123
let mut pb = PodBuilder :: new ( ) ;
1129
1124
let pb_metadata = ObjectMetaBuilder :: new ( )
@@ -1203,7 +1198,7 @@ fn build_executor_template_config_map(
1203
1198
airflow. volume_mounts ( ) ,
1204
1199
) ?;
1205
1200
pb. add_volume (
1206
- VolumeBuilder :: new ( GIT_CONTENT )
1201
+ VolumeBuilder :: new ( GIT_SYNC_CONTENT )
1207
1202
. empty_dir ( EmptyDirVolumeSource :: default ( ) )
1208
1203
. build ( ) ,
1209
1204
)
@@ -1275,7 +1270,7 @@ fn build_gitsync_container(
1275
1270
"-c" . to_string( ) ,
1276
1271
] )
1277
1272
. args ( vec ! [ gitsync. get_args( one_time) . join( "\n " ) ] )
1278
- . add_volume_mount ( GIT_CONTENT , GIT_ROOT )
1273
+ . add_volume_mount ( GIT_SYNC_CONTENT , GIT_SYNC_ROOT )
1279
1274
. context ( AddVolumeMountSnafu ) ?
1280
1275
. add_volume_mounts ( volume_mounts)
1281
1276
. context ( AddVolumeMountSnafu ) ?
@@ -1292,7 +1287,7 @@ fn build_gitsync_container(
1292
1287
}
1293
1288
1294
1289
pub fn error_policy (
1295
- _obj : Arc < DeserializeGuard < AirflowCluster > > ,
1290
+ _obj : Arc < DeserializeGuard < v1alpha1 :: AirflowCluster > > ,
1296
1291
error : & Error ,
1297
1292
_ctx : Arc < Ctx > ,
1298
1293
) -> Action {
0 commit comments