Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log 11792 journal toggle #457

Merged
merged 14 commits into from
Dec 22, 2022
53 changes: 34 additions & 19 deletions bin/src/_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,29 +286,44 @@ pub async fn _main(
.map(|s| Duration::from_millis(s.parse().unwrap()))
.unwrap_or(FS_EVENT_DELAY);

debug!("Initialising journald source");
#[cfg(all(feature = "libjournald", target_os = "linux"))]
let (journalctl_source, journald_source) = if config.journald.paths.is_empty() {
let journalctl_source = create_journalctl_source()
.map(|s| s.map(StrictOrLazyLineBuilder::Strict))
.map_err(|e| {
info!("Journalctl source was not initialized");
debug!("Journalctl source initialization error: {}", e);
});
(journalctl_source.ok(), None)
} else {
(
None,
Some(create_source(&config.journald.paths).map(StrictOrLazyLineBuilder::Strict)),
)
let journald_source = match config.journald.systemd_journal_tailer {
true => {
if !config.journald.paths.is_empty() {
Some(create_source(&config.journald.paths).map(StrictOrLazyLineBuilder::Strict))
} else {
None
}
}
false => None,
};

#[cfg(all(feature = "libjournald", target_os = "linux"))]
let journalctl_source = match config.journald.systemd_journal_tailer {
true => {
if config.journald.paths.is_empty() {
create_journalctl_source()
.map(|s| s.map(StrictOrLazyLineBuilder::Strict))
.map_err(|e| {
info!("Journalctl source was not initialized");
debug!("Journalctl source initialization error: {}", e);
})
.ok()
} else {
None
}
}
false => None,
};

#[cfg(all(not(feature = "libjournald"), target_os = "linux"))]
let journalctl_source = create_journalctl_source()
.map(|s| s.map(StrictOrLazyLineBuilder::Strict))
.map_err(|e| warn!("Error initializing journalctl source: {}", e))
.ok();
debug!("Initialised journald source");
let journalctl_source = match config.journald.systemd_journal_tailer {
true => create_journalctl_source()
.map(|s| s.map(StrictOrLazyLineBuilder::Strict))
.map_err(|e| warn!("Error initializing journalctl source: {}", e))
.ok(),
false => None,
};

debug!("Initialising offset state");
if let Some(offset_state) = offset_state {
Expand Down
57 changes: 55 additions & 2 deletions bin/tests/it/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,9 @@ async fn test_z_journald_support() {
settings.journald_dirs = Some(dir);
settings.features = Some("libjournald");
settings.exclusion_regex = Some(r"^(?!/var/log/journal).*$");
assert_eq!(systemd::journal::print(6, "Sample info"), 0);
settings.log_journal_d = Some("true");

assert_eq!(systemd::journal::print(6, "Sample info"), 0);
let mut agent_handle = common::spawn_agent(settings);
let mut agent_stderr = BufReader::new(agent_handle.stderr.take().unwrap());

Expand All @@ -624,7 +625,7 @@ async fn test_z_journald_support() {
}

// Wait for the data to be received by the mock ingester
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
tokio::time::sleep(tokio::time::Duration::from_millis(1500)).await;

let map = received.lock().await;
let file_info = map.values().next().unwrap();
Expand All @@ -641,6 +642,57 @@ async fn test_z_journald_support() {
agent_handle.kill().expect("Could not kill process");
}

#[tokio::test]
#[cfg(all(target_os = "linux", feature = "integration_tests"))]
async fn test_z_journald_support_no_flag() {
let _ = env_logger::Builder::from_default_env().try_init();
tokio::time::sleep(Duration::from_millis(500)).await;
let dir = "/var/log/journal";
let (server, _received, shutdown_handle, addr) = common::start_http_ingester();
let mut settings = AgentSettings::with_mock_ingester("/var/log/journal", &addr);
settings.journald_dirs = Some(dir);
settings.features = Some("libjournald");
settings.exclusion_regex = Some(r"^(?!/var/log/journal).*$");

assert_eq!(systemd::journal::print(6, "Sample info"), 0);
let mut agent_handle = common::spawn_agent(settings);
let mut agent_stderr = BufReader::new(agent_handle.stderr.take().unwrap());

let (server_result, _) = tokio::join!(server, async {
common::wait_for_event("monitoring journald path", &mut agent_stderr);
shutdown_handle();
});

server_result.unwrap();
common::assert_agent_running(&mut agent_handle);
agent_handle.kill().expect("Could not kill process");
}

#[tokio::test]
#[cfg(all(target_os = "linux", feature = "integration_tests"))]
async fn test_z_journalctl_support_true_flag_no_path() {
let _ = env_logger::Builder::from_default_env().try_init();
tokio::time::sleep(Duration::from_millis(500)).await;
let (server, _received, shutdown_handle, addr) = common::start_http_ingester();
let mut settings = AgentSettings::with_mock_ingester("/var/log/journal", &addr);
settings.features = Some("libjournald");
settings.journald_dirs = None;
settings.log_journal_d = Some("true");

assert_eq!(systemd::journal::print(6, "Sample info"), 0);
let mut agent_handle = common::spawn_agent(settings);
let mut agent_stderr = BufReader::new(agent_handle.stderr.take().unwrap());

let (server_result, _) = tokio::join!(server, async {
common::wait_for_event("journalctl", &mut agent_stderr);
shutdown_handle();
});

server_result.unwrap();
common::assert_agent_running(&mut agent_handle);
agent_handle.kill().expect("Could not kill process");
}

#[tokio::test]
#[cfg(all(target_os = "linux", feature = "integration_tests"))]
async fn test_journalctl_support() {
Expand All @@ -650,6 +702,7 @@ async fn test_journalctl_support() {
let (server, received, shutdown_handle, addr) = common::start_http_ingester();
let mut settings = AgentSettings::with_mock_ingester("/var/log/journal", &addr);
settings.journald_dirs = None;
settings.log_journal_d = Some("true");
settings.exclusion_regex = Some(r"^(?!/var/log/journal).*$");

assert_eq!(systemd::journal::print(6, "Sample info"), 0);
Expand Down
8 changes: 8 additions & 0 deletions bin/tests/it/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub struct AgentSettings<'a> {
pub exclusion_regex: Option<&'a str>,
pub features: Option<&'a str>,
pub journald_dirs: Option<&'a str>,
pub log_journal_d: Option<&'a str>,
pub startup_lease: Option<&'a str>,
pub ssl_cert_file: Option<&'a std::path::Path>,
pub lookback: Option<&'a str>,
Expand All @@ -99,6 +100,7 @@ impl<'a> AgentSettings<'a> {
log_dirs,
exclusion_regex: Some(r"^/var.*"),
use_ssl: true,
log_journal_d: None,
..Default::default()
}
}
Expand All @@ -110,6 +112,7 @@ impl<'a> AgentSettings<'a> {
use_ssl: false,
ingester_key: Some("mock_key"),
exclusion: Some("/var/log/**"),
log_journal_d: None,
..Default::default()
}
}
Expand Down Expand Up @@ -246,6 +249,10 @@ pub fn spawn_agent(settings: AgentSettings) -> Child {
agent.env("LOGDNA_INGEST_BUFFER_SIZE", ingest_buffer_size);
}

if let Some(log_journal_d) = settings.log_journal_d {
agent.env("MZ_SYSTEMD_JOURNAL_TAILER", log_journal_d);
}

agent.spawn().expect("Failed to start agent")
}

Expand Down Expand Up @@ -288,6 +295,7 @@ where
continue;
}
debug!("{}", line.trim());

lines_buffer.push_str(&line);
lines_buffer.push('\n');
if condition(&line) {
Expand Down
10 changes: 10 additions & 0 deletions common/config/src/argv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ pub struct ArgumentOptions {
#[structopt(long = "include-regex", env = env_vars::INCLUSION_REGEX_RULES)]
inclusion_regex: Vec<String>,

/// Turning on or off systemd-journal
#[structopt(long, env = env_vars::SYSTEMD_JOURNAL_TAILER)]
systemd_journal_tailer: Option<bool>,

/// List of paths (directories or files) of journald paths to monitor,
/// for example: /var/log/journal or /run/systemd/journal
#[structopt(long, env = env_vars::JOURNALD_PATHS)]
Expand Down Expand Up @@ -315,6 +319,10 @@ impl ArgumentOptions {
.for_each(|v| paths.push(PathBuf::from(v)));
}

if self.systemd_journal_tailer.is_some() {
raw.journald.systemd_journal_tailer = self.systemd_journal_tailer
}

if self.lookback.is_some() {
raw.log.lookback = self.lookback.map(|v| v.to_string());
}
Expand Down Expand Up @@ -715,6 +723,7 @@ mod test {
log_k8s_events: Some(K8sTrackingConf::Never),
log_metric_server_stats: Some(K8sTrackingConf::Always),
journald_paths: vec_strings!("/a"),
systemd_journal_tailer: Some(false),
k8s_startup_lease: Some(K8sLeaseConf::Always),
ingest_timeout: Some(1111111),
ingest_buffer_size: Some(222222),
Expand Down Expand Up @@ -748,6 +757,7 @@ mod test {
assert_eq!(config.log.db_path, Some(PathBuf::from("a/b/c")));
assert_eq!(config.log.metrics_port, Some(9089));
assert_eq!(config.journald.paths, Some(vec_paths!["/a"]));
assert_eq!(config.journald.systemd_journal_tailer, Some(false));
assert_eq!(config.startup.option, Some(String::from("always")));
}

Expand Down
1 change: 1 addition & 0 deletions common/config/src/env_vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub const K8S_METADATA_LINE_EXCLUSION: &str = "MZ_K8S_METADATA_LINE_EXCLUSION";
pub const HOSTNAME: &str = "MZ_HOSTNAME";
pub const IP: &str = "MZ_IP";
pub const MAC: &str = "MZ_MAC";
pub const SYSTEMD_JOURNAL_TAILER: &str = "MZ_SYSTEMD_JOURNAL_TAILER";
pub const JOURNALD_PATHS: &str = "MZ_JOURNALD_PATHS";
pub const LOOKBACK: &str = "MZ_LOOKBACK";
pub const DB_PATH: &str = "MZ_DB_PATH";
Expand Down
2 changes: 2 additions & 0 deletions common/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ pub struct LogConfig {
#[derive(Debug)]
pub struct JournaldConfig {
pub paths: Vec<PathBuf>,
pub systemd_journal_tailer: bool,
}

#[derive(Clone, core::fmt::Debug, Display, EnumString, Eq, PartialEq)]
Expand Down Expand Up @@ -422,6 +423,7 @@ impl TryFrom<RawConfig> for Config {

let journald = JournaldConfig {
paths: raw.journald.paths.unwrap_or_default().into_iter().collect(),
systemd_journal_tailer: raw.journald.systemd_journal_tailer.unwrap_or(true),
};

Ok(Config {
Expand Down
6 changes: 6 additions & 0 deletions common/config/src/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ from_env_name!(INCLUSION_REGEX_RULES);
from_env_name!(IP);
from_env_name!(MAC);
from_env_name!(JOURNALD_PATHS);
from_env_name!(SYSTEMD_JOURNAL_TAILER);
from_env_name!(LOOKBACK);
from_env_name!(DB_PATH);
from_env_name!(METRICS_PORT);
Expand Down Expand Up @@ -256,6 +257,11 @@ fn from_property_map(map: HashMap<String, String>) -> Result<Config, ConfigError
.for_each(|v| paths.push(PathBuf::from(v)));
}

if let Some(value) = map.get(&SYSTEMD_JOURNAL_TAILER) {
log::error!("HEYYY {}", value);
result.journald.systemd_journal_tailer = Some(value.parse().unwrap_or(true));
}

result.log.lookback = map.get_string(&LOOKBACK);
result.log.use_k8s_enrichment = map.get_string(&USE_K8S_LOG_ENRICHMENT);
result.log.log_k8s_events = map.get_string(&LOG_K8S_EVENTS);
Expand Down
22 changes: 21 additions & 1 deletion common/config/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,30 @@ impl Merge for K8sStartupLeaseConfig {
}
}

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone, Default)]
#[derive(Serialize, Deserialize, Debug, Eq, PartialEq, Clone)]
pub struct JournaldConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub paths: Option<Vec<PathBuf>>,

pub systemd_journal_tailer: Option<bool>,
}

impl Default for JournaldConfig {
fn default() -> Self {
JournaldConfig {
systemd_journal_tailer: Some(true),
paths: None,
}
}
}

impl Merge for JournaldConfig {
fn merge(&mut self, other: &Self, default: &Self) {
self.paths.merge(&other.paths, &default.paths);
self.systemd_journal_tailer.merge(
&other.systemd_journal_tailer,
&default.systemd_journal_tailer,
)
}
}

Expand Down Expand Up @@ -516,6 +531,8 @@ mod tests {
assert_eq!(config.startup, k8s_config);
assert_eq!(config.log.k8s_metadata_include, None);
assert_eq!(config.log.k8s_metadata_exclude, None);
assert_eq!(config.journald.systemd_journal_tailer, Some(true));
assert_eq!(config.journald.paths, None);
}

#[test]
Expand Down Expand Up @@ -1054,10 +1071,12 @@ ingest_buffer_size = 3145728
fn journald_config_merge() {
let mut left_conf = JournaldConfig {
paths: Some(vec![Path::new("/left").to_path_buf()]),
systemd_journal_tailer: Some(false),
};

let right_conf = JournaldConfig {
paths: Some(vec![Path::new("/right").to_path_buf()]),
systemd_journal_tailer: Some(true),
};

left_conf.merge(&right_conf, &JournaldConfig::default());
Expand All @@ -1066,6 +1085,7 @@ ingest_buffer_size = 3145728
.paths
.expect("expected paths to not be None after merge");
assert_eq!(actual_paths, vec![PathBuf::from("/right")]);
assert_eq!(left_conf.systemd_journal_tailer, Some(false));
}

#[test]
Expand Down
3 changes: 2 additions & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ For backward compatibility agent v1 configuration file format is still supported
|`LOGDNA_LINE_INCLUSION_REGEX`|`log.line_inclusion_regex[]`|List of regex patterns to include log lines. When set, the Agent will send ONLY log lines that match any of these patterns.||
|`LOGDNA_REDACT_REGEX`|`log.line_redact_regex`|List of regex patterns used to mask matching sensitive information (such as PII) before sending it in the log line.||
|`LOGDNA_JOURNALD_PATHS`|`journald.paths[]`|List of paths (directories or files) of journald paths to monitor||
|`MZ_SYSTEMD_JOURNAL_TAILER`||True/False toggles journald on the agent|'true'|
|`LOGDNA_LOOKBACK`|`log.lookback`|The lookback strategy on startup|`none`|
|`LOGDNA_K8S_STARTUP_LEASE`||Determines whether or not to use K8 leases on startup|`never`|
|`LOGDNA_USE_K8S_LOG_ENRICHMENT`||Determines whether the agent should query the K8s API to enrich log lines from other pods.|`always`|
Expand Down Expand Up @@ -294,7 +295,7 @@ file in docs directory of this repository.

### Configuring Journald

If the agent pods have access to journald log files or directories, monitoring can be enabled on them with the `LOGDNA_JOURNALD_PATHS`. Common values include `/var/log/journal` and `/run/systemd/journal`. To specify both, use a comma separated list: `/var/log/journal,/run/systemd/journal`.
If the agent pods have access to journald log files or directories, monitoring can be enabled on them with the `LOGDNA_JOURNALD_PATHS` as well as setting `MZ_SYSTEMD_JOURNAL_TAILER` to true, or leaving blank which defaults to true. Both variables are required to be configured. Common values include for `LOGDNA_JOURNALD_PATHS` - `/var/log/journal` and `/run/systemd/journal`. To specify both, use a comma separated list: `/var/log/journal,/run/systemd/journal`.

Take a look at enabling journald monitoring for [Kubernetes](KUBERNETES.md#collecting-node-journald-logs) or [OpenShift](OPENSHIFT.md#collecting-node-journald-logs).

Expand Down