Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 108 additions & 91 deletions src/api/fleet_addon_config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
use std::collections::BTreeMap;
use std::{fmt::Display, str::FromStr};

use fleet_api_rs::fleet_cluster::{ClusterAgentEnvVars, ClusterAgentTolerations};
use k8s_openapi::{
api::core::v1::{ConfigMap, ObjectReference},
apimachinery::pkg::apis::meta::v1::{Condition, LabelSelector},
};
use kube::{
api::{ObjectMeta, TypeMeta},
core::{ParseExpressionError, Selector},
CELSchema, CustomResource,
CELSchema, CustomResource, Resource,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_yaml::{Value, Error};
use serde::{ser, Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
use serde_yaml::Value;

pub const AGENT_NAMESPACE: &str = "fleet-addon-agent";
pub const EXPERIMENTAL_OCI_STORAGE: &str = "EXPERIMENTAL_OCI_STORAGE";
Expand Down Expand Up @@ -141,6 +143,46 @@ pub struct ClusterConfig {
pub agent_initiated: Option<bool>,
}

#[derive(Resource, Serialize, Deserialize, Default, Clone, Debug)]
#[resource(inherit = ConfigMap)]
pub struct FleetSettings {
#[serde(flatten, default)]
pub types: Option<TypeMeta>,
pub metadata: ObjectMeta,
pub data: Option<FleetSettingsSpec>,
}

#[serde_as]
#[derive(Serialize, Deserialize, Default, Clone, Debug)]
pub struct FleetSettingsSpec {
#[serde_as(as = "DisplayFromStr")]
pub fleet: FleetChartValues,

#[serde(flatten)]
pub other: Value,
}

impl FromStr for FleetChartValues {
type Err = serde_yaml::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
serde_yaml::from_str(s)
}
}

impl Display for FleetChartValues {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&serde_yaml::to_string(self).map_err(ser::Error::custom)?)
}
}

impl FleetAddonConfigSpec {
/// Returns reference to FeatureGates if defined.
pub(crate) fn feature_gates(&self) -> Option<&FeatureGates> {
self.config.as_ref()?.feature_gates.as_ref()
}
}

impl ClusterConfig {
pub(crate) fn agent_install_namespace(&self) -> String {
self.agent_namespace
Expand Down Expand Up @@ -249,55 +291,47 @@ pub struct FeatureGates {
pub config_map: Option<FeaturesConfigMap>,
}

impl FeatureGates {
/// Returns true if a ConfigMap reference is defined.
pub(crate) fn has_config_map_ref(&self) -> bool {
self.config_map.as_ref().and_then(|c | c.reference.as_ref()).is_some()
impl Display for FeatureGates {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let oci = self.experimental_oci_storage;
let helm_ops = self.experimental_helm_ops;
let config_map = self.config_map_ref();
f.write_str(&format!("ref={config_map:#?}, oci={oci}, helm={helm_ops}"))
}
}

/// Syncs the `fleet` data key on the ConfigMap with FeatureGates
pub(crate) fn sync_configmap(&self, config_map: &mut ConfigMap) -> Result<(), Error> {
let mut empty_data: BTreeMap<String,String> = BTreeMap::new();
let data = config_map.data.as_mut().unwrap_or(&mut empty_data);
match data.get("fleet") {
Some(fleet_data) => {
data.insert(String::from("fleet"), self.merge_features(Some(fleet_data))?);
}
None => {
data.insert(String::from("fleet"), self.merge_features(None)?);
}
}
config_map.data = Some(data.clone());
Ok(())
impl FeatureGates {
/// Returns reference to a ConfigMap if defined.
pub(crate) fn config_map_ref(&self) -> Option<&ObjectReference> {
self.config_map.as_ref()?.reference.as_ref()
}

/// Merge the feature gates environment variables with a provided optional input.
fn merge_features(&self, input: Option<&String>) -> Result<String, Error> {
let mut extra_env_map: BTreeMap<String, String> = BTreeMap::new();
let mut values = FleetChartValues {
..Default::default()
};

if let Some(input) = input {
values = serde_yaml::from_str(input)?;
// If there are existing extraEnv entries, convert them to a map.
if let Some(extra_env) = values.extra_env {
extra_env_map = extra_env.into_iter().map(|var| (var.name.unwrap_or_default(), var.value.unwrap_or_default())).collect();
}
}

pub(crate) fn merge_features(&self, settings: &mut FleetSettingsSpec) {
// Sync the feature flags to the map.
extra_env_map.insert(String::from(EXPERIMENTAL_HELM_OPS), self.experimental_helm_ops.to_string());
extra_env_map.insert(String::from(EXPERIMENTAL_OCI_STORAGE), self.experimental_oci_storage.to_string());

// Convert the map back to list.
values.extra_env = Some(extra_env_map.iter()
.map(|(key, value)|
EnvironmentVariable{name: Some(key.clone()), value: Some(value.clone())})
.collect());
let env_map = settings.fleet.extra_env.get_or_insert_default();

match env_map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

.iter_mut()
.find(|env| env.name == EXPERIMENTAL_HELM_OPS)
{
Some(helm_ops) => helm_ops.value = self.experimental_helm_ops.to_string(),
None => env_map.push(EnvironmentVariable {
name: EXPERIMENTAL_HELM_OPS.to_string(),
value: self.experimental_helm_ops.to_string(),
}),
};

//Return the serialized updated values.
serde_yaml::to_string(&values)
match env_map
.iter_mut()
.find(|env| env.name == EXPERIMENTAL_OCI_STORAGE)
{
Some(helm_ops) => helm_ops.value = self.experimental_oci_storage.to_string(),
None => env_map.push(EnvironmentVariable {
name: EXPERIMENTAL_OCI_STORAGE.to_string(),
value: self.experimental_oci_storage.to_string(),
}),
};
}
}

Expand Down Expand Up @@ -328,15 +362,15 @@ pub struct FeaturesConfigMap {
pub struct FleetChartValues {
pub extra_env: Option<Vec<EnvironmentVariable>>,
#[serde(flatten)]
pub other: Value
pub other: Value,
}

/// EnvironmentVariable is a simple name/value pair.
#[derive(Clone, Default, Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct EnvironmentVariable {
pub name: Option<String>,
pub value: Option<String>
pub name: String,
pub value: String,
}

#[derive(Clone, Debug, Serialize, Deserialize, JsonSchema)]
Expand Down Expand Up @@ -469,11 +503,11 @@ impl FleetAddonConfig {

#[cfg(test)]
mod tests {
use std::collections::BTreeMap;

use k8s_openapi::api::core::v1::ConfigMap;
use std::str::FromStr;

use crate::api::fleet_addon_config::{FeatureGates, NamingStrategy};
use crate::api::fleet_addon_config::{
FeatureGates, FleetChartValues, FleetSettingsSpec, NamingStrategy,
};

#[tokio::test]
async fn test_naming_strategy() {
Expand Down Expand Up @@ -527,41 +561,30 @@ mod tests {
let want_fleet_data = r#"extraEnv:
- name: EXPERIMENTAL_HELM_OPS
value: 'true'
- name: EXPERIMENTAL_OCI_STORAGE
value: 'true'
- name: foo
value: bar
- name: EXPERIMENTAL_OCI_STORAGE
value: 'true'
foo:
bar: foobar
"#;
let fleet_data = r#"foo:
bar: foobar
extraEnv:
- name: foo
value: bar
- name: EXPERIMENTAL_OCI_STORAGE
value: "false"
let fleet_data = r#"extraEnv:
- name: EXPERIMENTAL_HELM_OPS
value: "false"
- name: foo
value: bar
foo:
bar: foobar
"#;
let mut data: BTreeMap<String, String> = BTreeMap::new();
data.insert(String::from("fleet"), String::from(fleet_data));
let mut config_map = ConfigMap {
data: Some(data),
let mut data = FleetSettingsSpec {
fleet: FleetChartValues::from_str(fleet_data).unwrap(),
..Default::default()
};
let feature_gates = FeatureGates {
experimental_oci_storage: true,
experimental_helm_ops: true,
config_map: None
};
let _ = feature_gates.sync_configmap(&mut config_map);

let synced_fleet_data = config_map.data.as_ref().and_then(|d | d.get("fleet")).unwrap();
assert_eq!(
want_fleet_data.to_string(),
synced_fleet_data.clone()
)
let feature_gates = FeatureGates::default();

feature_gates.merge_features(&mut data);

assert_eq!(want_fleet_data.to_string(), data.fleet.to_string())
}

#[tokio::test]
Expand All @@ -572,21 +595,15 @@ extraEnv:
- name: EXPERIMENTAL_OCI_STORAGE
value: 'false'
"#;
let mut config_map = ConfigMap {
data: None,
..Default::default()
};
let mut data = FleetSettingsSpec::default();
let feature_gates = FeatureGates {
experimental_oci_storage: false,
experimental_helm_ops: false,
config_map: None
config_map: None,
};
let _ = feature_gates.sync_configmap(&mut config_map);

let synced_fleet_data = config_map.data.as_ref().and_then(|d | d.get("fleet")).unwrap();
assert_eq!(
want_fleet_data.to_string(),
synced_fleet_data.clone()
)
}

feature_gates.merge_features(&mut data);

assert_eq!(want_fleet_data.to_string(), data.fleet.to_string())
}
}
77 changes: 35 additions & 42 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,17 @@ use crate::api::fleet_addon_config::FleetAddonConfig;
use crate::api::fleet_cluster;
use crate::api::fleet_clustergroup::ClusterGroup;
use crate::controllers::addon_config::FleetConfig;
use crate::controllers::controller::{fetch_config, Context, FleetController};
use crate::controllers::controller::{fetch_config, Context, DynamicStream, FleetController};
use crate::metrics::Diagnostics;
use crate::multi_dispatcher::{broadcaster, BroadcastStream, MultiDispatcher};
use crate::{Error, Metrics};

use chrono::Local;
use clap::Parser;
use futures::stream::SelectAll;
use futures::{Stream, StreamExt};

use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
use kube::api::{DynamicObject, Patch, PatchParams};
use kube::api::{Patch, PatchParams};
use kube::core::DeserializeGuard;
use kube::runtime::reflector::store::Writer;
use kube::runtime::reflector::ObjectRef;
Expand All @@ -34,15 +33,10 @@ use kube::{Resource, ResourceExt};
use std::collections::BTreeMap;

use std::ops::Deref;
use std::pin::Pin;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::{self, warn};

type DynamicStream = SelectAll<
Pin<Box<dyn Stream<Item = Result<watcher::Event<DynamicObject>, watcher::Error>> + Send>>,
>;

/// State shared between the controller and the web server
#[derive(Clone)]
pub struct State {
Expand Down Expand Up @@ -211,42 +205,41 @@ pub async fn run_fleet_helm_controller(state: State) {
|obj, ctx| async move {
let mut obj = obj.deref().clone();
obj.metadata.managed_fields = None;
obj.status = Some(obj.status.clone().unwrap_or_default());
let res = FleetAddonConfig::reconcile_helm(&mut obj, ctx.clone()).await;
if let Some(ref mut status) = obj.status {
let conditions = &mut status.conditions;
let mut message = "Addon provider is ready".to_string();
let mut status = "True";
if let Err(ref e) = res {
message = format!("FleetAddonConfig reconcile error: {e}");
status = "False";
}
conditions.push(Condition {
last_transition_time: Time(Local::now().to_utc()),
message,
observed_generation: obj.metadata.generation,
reason: "Ready".into(),
status: status.into(),
type_: "Ready".into(),
});
}
if let Some(ref mut status) = obj.status {
let mut uniques: BTreeMap<String, Condition> = BTreeMap::new();
status
.conditions
.iter()
.for_each(|e| match uniques.get(&e.type_) {
Some(existing)
if existing.message == e.message
&& existing.reason == e.reason
&& existing.status == e.status
&& existing.observed_generation == e.observed_generation => {}
_ => {
uniques.insert(e.type_.clone(), e.clone());
}
});
status.conditions = uniques.into_values().collect();
let status = obj.status.get_or_insert_default();
let conditions = &mut status.conditions;
let mut message = "Addon provider is ready".to_string();
let mut status_message = "True";
if let Err(ref e) = res {
message = format!("FleetAddonConfig reconcile error: {e}");
status_message = "False";
}
conditions.push(Condition {
last_transition_time: Time(Local::now().to_utc()),
message,
observed_generation: obj.metadata.generation,
reason: "Ready".into(),
status: status_message.into(),
type_: "Ready".into(),
});

let status = obj.status.get_or_insert_default();
let mut uniques: BTreeMap<String, Condition> = BTreeMap::new();
status
.conditions
.iter()
.for_each(|e| match uniques.get(&e.type_) {
Some(existing)
if existing.message == e.message
&& existing.reason == e.reason
&& existing.status == e.status
&& existing.observed_generation == e.observed_generation => {}
_ => {
uniques.insert(e.type_.clone(), e.clone());
}
});
status.conditions = uniques.into_values().collect();

let api: Api<FleetAddonConfig> = Api::all(ctx.client.clone());
let patch = api
.patch_status(
Expand Down
Loading