Skip to content
This repository was archived by the owner on Dec 21, 2021. It is now read-only.

Remove systemd units without a corresponding pod #312

Merged
merged 3 commits into from
Sep 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## [Unreleased]

### Added
- Cleanup stage added where systemd units without corresponding pods are
removed on startup ([#312]).

[#312]: https://github.com/stackabletech/agent/pull/312

## [0.6.1] - 2021-09-14

### Changed
Expand Down
3 changes: 3 additions & 0 deletions docs/modules/ROOT/nav.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@
* xref:limitations.adoc[]
* xref:services.adoc[]
* xref:jobs.adoc[]
* Stages
** xref:stages/overview.adoc[]
** xref:stages/cleanup.adoc[]
* Monitoring
** xref:monitoring/logs.adoc[]
8 changes: 8 additions & 0 deletions docs/modules/ROOT/pages/stages/cleanup.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
= Cleanup stage

On startup the systemd units in the `system-stackable` slice are
compared to the pods assigned to this node. If a systemd unit is as
expected then it is kept and the Stackable agent will take ownership
again in a later stage. If there is no corresponding pod or the systemd
unit differs from the pod specification then it is removed and the
Stackable agent will create a new systemd unit afterwards.
26 changes: 26 additions & 0 deletions docs/modules/ROOT/pages/stages/overview.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
= Overview

When the Stackable Agent starts, it runs through the following stages:

* Check configured directories and files.
** Check if the optional files can be opened if they exist.
** Create the directories where write access is required and which do
not exist yet.
** Check the configured directories if they are writable by the current
process.
* Bootstrap the cluster with TLS certificates but only if no existing
kubeconfig can be found.
* Remove all systemd units from a previous run without a corresponding
pod (see xref:stages/cleanup.adoc[]).
* Start the kubelet.

After the kubelet was started, assigned pods run through the following
stages:

* Download the package from a registered Stackable repository.
* Unpack the package and install it.
* Create the configuration files according to the config maps.
* Create, start, and enable the systemd units.
* Monitor the systemd units and patch the pod status accordingly.
* Stop, disable, and remove the systemd units on termination or when the
pod is deleted.
2 changes: 2 additions & 0 deletions src/bin/stackable-agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ async fn main() -> anyhow::Result<()> {
.await
.expect("Error initializing provider.");

provider.cleanup(&krustlet_config.node_name).await;

let kubelet = Kubelet::new(provider, kubeconfig, krustlet_config).await?;
kubelet.start().await
}
Expand Down
222 changes: 222 additions & 0 deletions src/provider/cleanup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
//! Initial cleanup
//!
//! On startup the systemd units in the `system-stackable` slice are compared to the pods assigned
//! to this node. If a systemd unit is as expected then it is kept and the Stackable Agent will
//! take ownership again in the `Starting` stage. If there is no corresponding pod or the systemd
//! unit differs from the pod specification then it is removed and the Stackable Agent will create
//! a new systemd unit in the `CreatingService` stage.
//!
//! The cleanup stage is implemented as part of the [`StackableProvider`] because the expected
//! content of a systemd unit file can only be determined with the directories configured in the
//! provider.
//!
//! The cleanup code resides in a separate module because the amount of code justifies it and the
//! log output is more meaningful. It makes it clearer whether a systemd unit is removed in the
//! cleanup stage or in the normal process.
use std::collections::HashMap;

use anyhow::Context;
use k8s_openapi::api::core::v1::Pod as KubePod;
use kube::api::{ListParams, Meta, ObjectList};
use kube::Api;
use kubelet::pod::Pod;
use kubelet::provider::Provider;
use log::{debug, error, info, warn};
use tokio::fs::{read_to_string, remove_file};

use super::systemdmanager::systemdunit::SystemDUnit;
use super::systemdmanager::systemdunit::STACKABLE_SLICE;
use super::StackableProvider;

impl StackableProvider {
/// Removes systemd units without corresponding pods.
///
/// The systemd units in the `system-stackable` slice are compared with the pods assigned to
/// this node and all units without corresponding pods or which differ from the pod
/// specifications are removed.
pub async fn cleanup(&self, node_name: &str) {
let systemd_manager = &self.shared.systemd_manager;

if let Err(error) = systemd_manager.reload().await {
error!(
"Skipping the cleanup stage because the systemd daemon reload failed. {}",
error
);
return;
}

let units_in_slice = match systemd_manager.slice_content(STACKABLE_SLICE).await {
Ok(units_in_slice) => units_in_slice,
Err(error) => {
debug!(
"Skipping the cleanup stage because no systemd units were found in the slice \
[{}]. {}",
STACKABLE_SLICE, error
);
return;
}
};

let pods = match self.assigned_pods(node_name).await {
Ok(pods) => pods.items,
Err(error) => {
error!(
"The assigned pods could not be retrieved. All systemd units in the slice [{}] \
will be removed. {}",
STACKABLE_SLICE, error
);
Vec::new()
}
};

let mut expected_units = HashMap::new();
for pod in pods {
match self.units_from_pod(&pod).await {
Ok(units) => expected_units.extend(units.into_iter()),
Err(error) => warn!(
"Systemd units could not be generated for pod [{}/{}]. {}",
pod.namespace().unwrap_or_else(|| String::from("default")),
pod.name(),
error
),
}
}

for unit_name in &units_in_slice {
let delete = match expected_units.get(unit_name) {
Some(expected_content) => match self.unit_file_content(unit_name).await {
Ok(Some(content)) if &content == expected_content => {
info!(
"The systemd unit [{}] will be kept because a corresponding pod \
exists.",
unit_name
);
false
}
Ok(Some(content)) => {
info!(
"The systemd unit [{}] will be removed because it differs from the \
corresponding pod specification.\n\
expected content:\n\
{}\n\n\
actual content:\n\
{}",
unit_name, expected_content, content
);
true
}
Ok(None) => {
info!(
"The systemd unit [{}] will be removed because its file path could not \
be determined.",
unit_name
);
true
}
Err(error) => {
warn!(
"The systemd unit [{}] will be removed because the file content could \
not be retrieved. {}",
unit_name, error
);
true
}
},
None => {
info!(
"The systemd unit [{}] will be removed because no corresponding pod \
exists.",
unit_name
);
true
}
};

if delete {
self.remove_unit(unit_name).await;
};
}
}

/// Returns a list of all pods assigned to the given node.
async fn assigned_pods(&self, node_name: &str) -> anyhow::Result<ObjectList<KubePod>> {
let client = &self.shared.client;

let api: Api<KubePod> = Api::all(client.to_owned());
let lp = ListParams::default().fields(&format!("spec.nodeName={}", node_name));
api.list(&lp).await.with_context(|| {
format!(
"The pods assigned to this node (nodeName = [{}]) could not be retrieved.",
node_name
)
})
}

/// Creates the systemd unit files for the given pod in memory.
///
/// A mapping from systemd unit file names to the file content is returned.
async fn units_from_pod(&self, kubepod: &KubePod) -> anyhow::Result<HashMap<String, String>> {
let systemd_manager = &self.shared.systemd_manager;

let mut units = HashMap::new();
let pod = Pod::from(kubepod.to_owned());
let pod_state = self.initialize_pod_state(&pod).await?;

for container in pod.containers() {
let unit = SystemDUnit::new(
systemd_manager.is_user_mode(),
&pod_state,
&self.shared.kubeconfig_path,
&pod,
&container,
)?;
units.insert(unit.get_name(), unit.get_unit_file_content());
}

Ok(units)
}

/// Returns the content of the given systemd unit file.
async fn unit_file_content(&self, unit_name: &str) -> anyhow::Result<Option<String>> {
let systemd_manager = &self.shared.systemd_manager;

let file_path_result = systemd_manager
.fragment_path(unit_name)
.await
.with_context(|| {
format!(
"The file path of the unit [{}] could not be determined.",
unit_name
)
});

match file_path_result {
Ok(Some(file_path)) => {
let file_content = read_to_string(&file_path)
.await
.with_context(|| format!("The file [{}] could not be read.", file_path))?;
Ok(Some(file_content))
}
Ok(None) => Ok(None),
Err(error) => Err(error),
}
}

/// Stops, disables and removes the given systemd unit.
async fn remove_unit(&self, unit_name: &str) {
let systemd_manager = &self.shared.systemd_manager;

if let Err(error) = systemd_manager.stop(unit_name).await {
warn!("{}", error);
}
if let Err(error) = systemd_manager.disable(unit_name).await {
warn!("{}", error);
}
if let Ok(Some(file_path)) = systemd_manager.fragment_path(unit_name).await {
debug!("Removing file [{}].", file_path);
if let Err(error) = remove_file(file_path).await {
warn!("{}", error);
}
}
}
}
Loading