Skip to content

Commit 3e95067

Browse files
authored
ROX-30836: implement hotreloading configuration for outputs (#119)
* ROX-30836: implement hotreloading configuration for outputs With this patch, it is possible to reload the configuration used for gRPC communication. Allowing changes to the certificate directory or output URL to be done without a full restart being required. The output code has also been slightly refactored, the module now has a single start function that will spawn a task for gRPC output and another for JSON to stdout (if this is required at start up or the gRPC output is not configured). Both the gRPC and stdout code now live as submodules of the output module. The stdout output is meant mostly for debugging, so hotreloading was not implemented for it. Lastly, there used to be a task that would receive `Arc<Event>` messages from the BPF worker, translate them to the `FileActivity` type and forward them out a separate channel that ultimately was the gRPC stream, this whole workflow has now been replaced by a filter_map operation that does the translating on the gRPC stream directly, leading to more concise code. * Implement integration test for gRPC hotreloading * Rename grpc::Client::is_active to is_enabled
1 parent 0f77f32 commit 3e95067

File tree

14 files changed

+561
-325
lines changed

14 files changed

+561
-325
lines changed

fact/src/config/mod.rs

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,7 @@ const CONFIG_FILES: [&str; 4] = [
2525
#[derive(Debug, Default, PartialEq, Eq, Clone)]
2626
pub struct FactConfig {
2727
paths: Option<Vec<PathBuf>>,
28-
url: Option<String>,
29-
certs: Option<PathBuf>,
28+
pub grpc: GrpcConfig,
3029
pub endpoint: EndpointConfig,
3130
skip_pre_flight: Option<bool>,
3231
json: Option<bool>,
@@ -78,14 +77,7 @@ impl FactConfig {
7877
self.paths = Some(paths.to_owned());
7978
}
8079

81-
if let Some(url) = from.url.as_deref() {
82-
self.url = Some(url.to_owned());
83-
}
84-
85-
if let Some(certs) = from.certs.as_deref() {
86-
self.certs = Some(certs.to_owned());
87-
}
88-
80+
self.grpc.update(&from.grpc);
8981
self.endpoint.update(&from.endpoint);
9082

9183
if let Some(skip_pre_flight) = from.skip_pre_flight {
@@ -109,14 +101,6 @@ impl FactConfig {
109101
self.paths.as_ref().map(|v| v.as_ref()).unwrap_or(&[])
110102
}
111103

112-
pub fn url(&self) -> Option<&str> {
113-
self.url.as_deref()
114-
}
115-
116-
pub fn certs(&self) -> Option<&Path> {
117-
self.certs.as_deref()
118-
}
119-
120104
pub fn skip_pre_flight(&self) -> bool {
121105
self.skip_pre_flight.unwrap_or(false)
122106
}
@@ -193,17 +177,9 @@ impl TryFrom<Vec<Yaml>> for FactConfig {
193177
"paths" if v.is_null() => {
194178
config.paths = Some(Vec::new());
195179
}
196-
"url" => {
197-
let Some(url) = v.as_str() else {
198-
bail!("url field has incorrect type: {v:?}");
199-
};
200-
config.url = Some(url.to_owned());
201-
}
202-
"certs" => {
203-
let Some(certs) = v.as_str() else {
204-
bail!("certs field has incorrect type: {v:?}");
205-
};
206-
config.certs = Some(PathBuf::from(certs));
180+
"grpc" if v.is_hash() => {
181+
let grpc = v.as_hash().unwrap();
182+
config.grpc = GrpcConfig::try_from(grpc)?;
207183
}
208184
"endpoint" if v.is_hash() => {
209185
let endpoint = v.as_hash().unwrap();
@@ -325,6 +301,63 @@ impl TryFrom<&yaml::Hash> for EndpointConfig {
325301
}
326302
}
327303

304+
#[derive(Debug, Default, PartialEq, Eq, Clone)]
305+
pub struct GrpcConfig {
306+
url: Option<String>,
307+
certs: Option<PathBuf>,
308+
}
309+
310+
impl GrpcConfig {
311+
fn update(&mut self, from: &GrpcConfig) {
312+
if let Some(url) = from.url.as_deref() {
313+
self.url = Some(url.to_owned());
314+
}
315+
316+
if let Some(certs) = from.certs.as_deref() {
317+
self.certs = Some(certs.to_owned());
318+
}
319+
}
320+
321+
pub fn url(&self) -> Option<&str> {
322+
self.url.as_deref()
323+
}
324+
325+
pub fn certs(&self) -> Option<&Path> {
326+
self.certs.as_deref()
327+
}
328+
}
329+
330+
impl TryFrom<&yaml::Hash> for GrpcConfig {
331+
type Error = anyhow::Error;
332+
333+
fn try_from(value: &yaml::Hash) -> Result<Self, Self::Error> {
334+
let mut grpc = GrpcConfig::default();
335+
for (k, v) in value.iter() {
336+
let Some(k) = k.as_str() else {
337+
bail!("key is not string: {k:?}");
338+
};
339+
340+
match k {
341+
"url" => {
342+
let Some(url) = v.as_str() else {
343+
bail!("url field has incorrect type: {v:?}");
344+
};
345+
grpc.url = Some(url.to_owned());
346+
}
347+
"certs" => {
348+
let Some(certs) = v.as_str() else {
349+
bail!("certs field has incorrect type: {v:?}");
350+
};
351+
grpc.certs = Some(PathBuf::from(certs));
352+
}
353+
name => bail!("Invalid field 'grpc.{name}' with value: {v:?}"),
354+
}
355+
}
356+
357+
Ok(grpc)
358+
}
359+
}
360+
328361
#[derive(Debug, Parser)]
329362
#[clap(version = crate::version::FACT_VERSION, about)]
330363
pub struct FactCli {
@@ -402,8 +435,10 @@ impl FactCli {
402435
fn to_config(&self) -> FactConfig {
403436
FactConfig {
404437
paths: self.paths.clone(),
405-
url: self.url.clone(),
406-
certs: self.certs.clone(),
438+
grpc: GrpcConfig {
439+
url: self.url.clone(),
440+
certs: self.certs.clone(),
441+
},
407442
endpoint: EndpointConfig {
408443
address: self.address,
409444
expose_metrics: resolve_bool_arg(self.expose_metrics, self.no_expose_metrics),

fact/src/config/reloader.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ use tokio::{
99
time::interval,
1010
};
1111

12-
use super::{EndpointConfig, FactConfig, CONFIG_FILES};
12+
use super::{EndpointConfig, FactConfig, GrpcConfig, CONFIG_FILES};
1313

1414
pub struct Reloader {
1515
config: FactConfig,
1616
endpoint: watch::Sender<EndpointConfig>,
17+
grpc: watch::Sender<GrpcConfig>,
1718
files: HashMap<&'static str, i64>,
1819
trigger: Arc<Notify>,
1920
}
@@ -51,12 +52,22 @@ impl Reloader {
5152
Some(handle)
5253
}
5354

55+
pub fn config(&self) -> &FactConfig {
56+
&self.config
57+
}
58+
5459
/// Subscribe to get notifications when endpoint configuration is
5560
/// changed.
5661
pub fn endpoint(&self) -> watch::Receiver<EndpointConfig> {
5762
self.endpoint.subscribe()
5863
}
5964

65+
/// Subscribe to get notifications when grpc configuration is
66+
/// changed.
67+
pub fn grpc(&self) -> watch::Receiver<GrpcConfig> {
68+
self.grpc.subscribe()
69+
}
70+
6071
/// Get a reference to the internal trigger for manual reloading of
6172
/// configuration.
6273
///
@@ -132,6 +143,16 @@ impl Reloader {
132143
}
133144
});
134145

146+
self.grpc.send_if_modified(|old| {
147+
if *old != new.grpc {
148+
debug!("Sending new gRPC configuration...");
149+
*old = new.grpc.clone();
150+
true
151+
} else {
152+
false
153+
}
154+
});
155+
135156
if self.config.hotreload() != new.hotreload() {
136157
warn!("Changes to the hotreload field only take effect on startup");
137158
}
@@ -162,10 +183,13 @@ impl From<FactConfig> for Reloader {
162183
})
163184
.collect();
164185
let (endpoint, _) = watch::channel(config.endpoint.clone());
186+
let (grpc, _) = watch::channel(config.grpc.clone());
165187
let trigger = Arc::new(Notify::new());
188+
166189
Reloader {
167190
config,
168191
endpoint,
192+
grpc,
169193
files,
170194
trigger,
171195
}

0 commit comments

Comments
 (0)