Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/api/capi_cluster.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::BTreeMap;

use cluster_api_rs::capi_cluster::{ClusterSpec, ClusterStatus};
use fleet_api_rs::{fleet_bundle_namespace_mapping::BundleNamespaceMappingNamespaceSelector, fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec}};
use fleet_api_rs::{
fleet_bundle_namespace_mapping::BundleNamespaceMappingNamespaceSelector,
fleet_clustergroup::{ClusterGroupSelector, ClusterGroupSpec},
};
use kube::{
api::{ObjectMeta, TypeMeta},
Resource, ResourceExt as _,
Expand Down
6 changes: 0 additions & 6 deletions src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use clap::Parser;
use futures::stream::SelectAll;
use futures::{Stream, StreamExt};

use k8s_openapi::api::core::v1::Namespace;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{Condition, Time};
use kube::api::{DynamicObject, Patch, PatchParams};
use kube::core::DeserializeGuard;
Expand All @@ -33,7 +32,6 @@ use kube::{
use kube::{Resource, ResourceExt};

use std::collections::BTreeMap;
use std::future;

use std::ops::Deref;
use std::pin::Pin;
Expand Down Expand Up @@ -280,10 +278,6 @@ pub async fn run_cluster_controller(state: State) {
.expect("failed to create kube Client");

let (sub, reader) = state.dispatcher.subscribe();
let sub = sub
.map(|n: Arc<Namespace>| Ok(n.deref().clone()))
.predicate_filter(predicates::labels)
.filter_map(|n| future::ready(n.ok().map(Arc::new)));
let ns_controller = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(
Expand Down
38 changes: 28 additions & 10 deletions src/controllers/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ where
.map_err(GetOrCreateError::Create)?;

info!("Created fleet object");
ctx.diagnostics
match ctx
.diagnostics
.read()
.await
.recorder(ctx.client.clone())
Expand All @@ -95,7 +96,12 @@ where
},
&res.object_ref(&()),
)
.await?;
.await
{
// Ignore forbidden errors on namespace deletion
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
e => e?,
};

Ok(Action::await_change())
}
Expand All @@ -120,7 +126,8 @@ where
.map_err(PatchError::Patch)?;

info!("Updated fleet object");
ctx.diagnostics
match ctx
.diagnostics
.read()
.await
.recorder(ctx.client.clone())
Expand All @@ -139,7 +146,12 @@ where
},
&res.object_ref(&()),
)
.await?;
.await
{
// Ignore forbidden errors on namespace deletion
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
e => e?,
};

Ok(Action::await_change())
}
Expand Down Expand Up @@ -193,7 +205,12 @@ where
}

async fn cleanup(&self, ctx: Arc<Context>) -> crate::Result<Action> {
ctx.diagnostics
if let Some(mut bundle) = self.to_bundle(ctx.clone()).await? {
return Ok(bundle.cleanup(ctx).await?);
}

match ctx
.diagnostics
.read()
.await
.recorder(ctx.client.clone())
Expand All @@ -208,11 +225,12 @@ where
},
&self.object_ref(&()),
)
.await?;

if let Some(mut bundle) = self.to_bundle(ctx.clone()).await? {
return Ok(bundle.cleanup(ctx).await?);
}
.await
{
// Ignore forbidden errors on namespace deletion
Err(kube::Error::Api(e)) if &e.reason == "Forbidden" => (),
e => e?,
};

Ok(Action::await_change())
}
Expand Down
9 changes: 8 additions & 1 deletion src/controllers/helm/install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,14 @@ impl FleetOptions {
pub fn patch_fleet(&self, version: &str) -> FleetPatchResult<Child> {
let mut upgrade = Command::new("helm");

upgrade.args(["upgrade", "fleet", "fleet/fleet", "--reuse-values", "--version", version]);
upgrade.args([
"upgrade",
"fleet",
"fleet/fleet",
"--reuse-values",
"--version",
version,
]);

if !self.namespace.is_empty() {
upgrade.args(["--namespace", &self.namespace]);
Expand Down
11 changes: 11 additions & 0 deletions src/multi_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use async_broadcast::{InactiveReceiver, Receiver, Sender};
Expand All @@ -18,6 +19,7 @@ use kube::{
};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use tokio::time::sleep;

#[derive(Clone)]
pub struct MultiDispatcher {
Expand Down Expand Up @@ -70,6 +72,11 @@ impl MultiDispatcher {
}
}
}

// Subscribers count returns the number of current receiving streams
pub(crate) fn subscribers_count(&self) -> usize {
self.dispatch_tx.receiver_count()
}
}

/// `BroadcastStream` allows to stream shared list of dynamic objects,
Expand Down Expand Up @@ -223,6 +230,10 @@ where
W: Stream<Item = Result<Event<DynamicObject>>> + Unpin,
{
stream! {
while writer.subscribers_count() == 0 {
sleep(Duration::from_millis(100)).await;
}

while let Some(event) = broadcast.next().await {
match event {
Ok(ev) => {
Expand Down