Skip to content

Commit

Permalink
Properly handle missing security features worker-side; big refactor
Browse files Browse the repository at this point in the history
- Shutdown the worker when we can’t enable security features that we were able to enable in the host.

- Add `secure_validator_mode` to `SecurityStatus`, pass it to workers this way.

- Pass security options to worker over the socket instead of CLI args. (Looks much cleaner.)

- Refactor `spawn_with_program_path` to only log once

- Refactored worker data (see “Worker data” at #1576). This was driving me crazy.

- Cleaned up a couple of old TODOs.
  • Loading branch information
mrcnski committed Nov 22, 2023
1 parent f44ca5f commit a9cf82d
Show file tree
Hide file tree
Showing 13 changed files with 215 additions and 260 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/node/core/pvf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pin-project = "1.0.9"
rand = "0.8.5"
slotmap = "1.0"
tempfile = "3.3.0"
thiserror = "1.0.31"
tokio = { version = "1.24.2", features = ["fs", "process"] }

parity-scale-codec = { version = "3.6.1", default-features = false, features = ["derive"] }
Expand Down
11 changes: 10 additions & 1 deletion polkadot/node/core/pvf/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const LOG_TARGET: &str = "parachain::pvf-common";

pub const RUNTIME_VERSION: &str = env!("SUBSTRATE_WASMTIME_VERSION");

use parity_scale_codec::{Decode, Encode};
use std::{
io::{self, Read, Write},
mem,
Expand All @@ -47,8 +48,10 @@ pub mod tests {
}

/// Status of security features on the current system.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone, Default, Encode, Decode)]
pub struct SecurityStatus {
/// Whether Secure Validator Mode is enabled.
pub secure_validator_mode: bool,
/// Whether the landlock features we use are fully available on this system.
pub can_enable_landlock: bool,
/// Whether the seccomp features we use are fully available on this system.
Expand All @@ -57,6 +60,12 @@ pub struct SecurityStatus {
pub can_unshare_user_namespace_and_change_root: bool,
}

/// A handshake with information for the worker.
#[derive(Debug, Encode, Decode)]
pub struct WorkerHandshake {
pub security_status: SecurityStatus,
}

/// Write some data prefixed by its length into `w`. Sync version of `framed_send` to avoid
/// dependency on tokio.
pub fn framed_send_blocking(w: &mut (impl Write + Unpin), buf: &[u8]) -> io::Result<()> {
Expand Down
200 changes: 95 additions & 105 deletions polkadot/node/core/pvf/common/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
pub mod security;

use crate::{SecurityStatus, LOG_TARGET};
use crate::{framed_recv_blocking, WorkerHandshake, LOG_TARGET};
use cpu_time::ProcessTime;
use futures::never::Never;
use parity_scale_codec::Decode;
use std::{
any::Any,
fmt, io,
Expand Down Expand Up @@ -50,8 +51,6 @@ macro_rules! decl_worker_main {
#[cfg(target_os = "linux")]
use $crate::worker::security;

// TODO: Remove this dependency, and `pub use sp_tracing` in `lib.rs`.
// See <https://github.com/paritytech/polkadot/issues/7117>.
$crate::sp_tracing::try_init_simple();

let worker_pid = std::process::id();
Expand Down Expand Up @@ -134,9 +133,6 @@ macro_rules! decl_worker_main {
let mut socket_path = None;
let mut worker_dir_path = None;
let mut node_version = None;
let mut can_enable_landlock = false;
let mut can_enable_seccomp = false;
let mut can_unshare_user_namespace_and_change_root = false;

let mut i = 2;
while i < args.len() {
Expand All @@ -153,10 +149,6 @@ macro_rules! decl_worker_main {
node_version = Some(args[i + 1].as_str());
i += 1
},
"--can-enable-landlock" => can_enable_landlock = true,
"--can-enable-seccomp" => can_enable_seccomp = true,
"--can-unshare-user-namespace-and-change-root" =>
can_unshare_user_namespace_and_change_root = true,
arg => panic!("Unexpected argument found: {}", arg),
}
i += 1;
Expand All @@ -167,19 +159,8 @@ macro_rules! decl_worker_main {

let socket_path = std::path::Path::new(socket_path).to_owned();
let worker_dir_path = std::path::Path::new(worker_dir_path).to_owned();
let security_status = $crate::SecurityStatus {
can_enable_landlock,
can_enable_seccomp,
can_unshare_user_namespace_and_change_root,
};

$entrypoint(
socket_path,
worker_dir_path,
node_version,
Some($worker_version),
security_status,
);

$entrypoint(socket_path, worker_dir_path, node_version, Some($worker_version));
}
};
}
Expand All @@ -205,6 +186,16 @@ impl fmt::Display for WorkerKind {
}
}

// The code is not really dead, it's only used for logging, and dead-code analysis ignores Debug.
#[allow(dead_code)]
#[derive(Debug)]
pub struct WorkerInfo {
pid: u32,
kind: WorkerKind,
version: Option<String>,
worker_dir_path: PathBuf,
}

// NOTE: The worker version must be passed in so that we accurately get the version of the worker,
// and not the version that this crate was compiled with.
//
Expand All @@ -216,62 +207,54 @@ impl fmt::Display for WorkerKind {
pub fn run_worker<F>(
worker_kind: WorkerKind,
socket_path: PathBuf,
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))] mut worker_dir_path: PathBuf,
worker_dir_path: PathBuf,
node_version: Option<&str>,
worker_version: Option<&str>,
security_status: &SecurityStatus,
mut event_loop: F,
) where
F: FnMut(UnixStream, PathBuf) -> io::Result<Never>,
{
let worker_pid = std::process::id();
#[cfg_attr(not(target_os = "linux"), allow(unused_mut))]
let mut worker_info = WorkerInfo {
pid: std::process::id(),
kind: worker_kind,
version: worker_version.map(|v| v.to_string()),
worker_dir_path,
};
gum::debug!(
target: LOG_TARGET,
%worker_pid,
?worker_info,
?socket_path,
?worker_dir_path,
?security_status,
"starting pvf worker ({})",
worker_kind
worker_info.kind
);

// Check for a mismatch between the node and worker versions.
if let (Some(node_version), Some(worker_version)) = (node_version, worker_version) {
if let (Some(node_version), Some(worker_version)) = (node_version, &worker_info.version) {
if node_version != worker_version {
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
?worker_info,
%node_version,
%worker_version,
"Node and worker version mismatch, node needs restarting, forcing shutdown",
);
kill_parent_node_in_emergency();
worker_shutdown_message(worker_kind, worker_pid, "Version mismatch");
return
worker_shutdown(worker_info, "Version mismatch");
}
}

// Make sure that we can read the worker dir path, and log its contents.
let entries = || -> Result<Vec<_>, io::Error> {
std::fs::read_dir(&worker_dir_path)?
std::fs::read_dir(&worker_info.worker_dir_path)?
.map(|res| res.map(|e| e.file_name()))
.collect()
}();
match entries {
Ok(entries) =>
gum::trace!(target: LOG_TARGET, %worker_pid, ?worker_dir_path, "content of worker dir: {:?}", entries),
gum::trace!(target: LOG_TARGET, ?worker_info, "content of worker dir: {:?}", entries),
Err(err) => {
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
?worker_dir_path,
"Could not read worker dir: {}",
err.to_string()
);
worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
return
let err = format!("Could not read worker dir: {}", err.to_string());
worker_shutdown_error(worker_info, &err);
},
}

Expand All @@ -281,23 +264,20 @@ pub fn run_worker<F>(
let _ = std::fs::remove_file(&socket_path);
Ok(stream)
}();
let stream = match stream {
Ok(s) => s,
Err(err) => {
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
"{}",
err
);
worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
return
},
let mut stream = match stream {
Ok(ok) => ok,
Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
};

let WorkerHandshake { security_status } = match recv_worker_handshake(&mut stream) {
Ok(ok) => ok,
Err(err) => worker_shutdown_error(worker_info, &err.to_string()),
};

// Enable some security features.
{
gum::trace!(target: LOG_TARGET, ?security_status, "Enabling security features");

// Call based on whether we can change root. Error out if it should work but fails.
//
// NOTE: This should not be called in a multi-threaded context (i.e. inside the tokio
Expand All @@ -306,88 +286,86 @@ pub fn run_worker<F>(
// > CLONE_NEWUSER requires that the calling process is not threaded.
#[cfg(target_os = "linux")]
if security_status.can_unshare_user_namespace_and_change_root {
if let Err(err) = security::unshare_user_namespace_and_change_root(
worker_kind,
worker_pid,
&worker_dir_path,
) {
// The filesystem may be in an inconsistent state, bail out.
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
?worker_dir_path,
"Could not change root to be the worker cache path: {}",
err
);
worker_shutdown_message(worker_kind, worker_pid, &err);
return
if let Err(err) = security::unshare_user_namespace_and_change_root(&worker_info) {
// The filesystem may be in an inconsistent state, always bail out.
let err = format!("Could not change root to be the worker cache path: {}", err);
worker_shutdown_error(worker_info, &err);
}
worker_dir_path = std::path::Path::new("/").to_owned();
worker_info.worker_dir_path = std::path::Path::new("/").to_owned();
}

#[cfg(target_os = "linux")]
if security_status.can_enable_landlock {
let landlock_status =
security::landlock::enable_for_worker(worker_kind, worker_pid, &worker_dir_path);
let landlock_status = security::landlock::enable_for_worker(&worker_info);
if !matches!(landlock_status, Ok(landlock::RulesetStatus::FullyEnforced)) {
// We previously were able to enable, so this should never happen.
// We previously were able to enable, so this should never happen. Shutdown if
// running in secure mode.
let err = format!("could not fully enable landlock: {:?}", landlock_status);
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
"could not fully enable landlock: {:?}. This should not happen, please report an issue",
landlock_status
?worker_info,
"{}. This should not happen, please report an issue",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, &err);
}
}
}

// TODO: We can enable the seccomp networking blacklist on aarch64 as well, but we need a CI
// job to catch regressions. See <https://github.com/paritytech/ci_cd/issues/609>.
#[cfg(all(target_os = "linux", target_arch = "x86_64"))]
if security_status.can_enable_seccomp {
let seccomp_status =
security::seccomp::enable_for_worker(worker_kind, worker_pid, &worker_dir_path);
let seccomp_status = security::seccomp::enable_for_worker(&worker_info);
if !matches!(seccomp_status, Ok(())) {
// We previously were able to enable, so this should never happen.
//
// TODO: Make this a real error in secure-mode. See:
// <https://github.com/paritytech/polkadot-sdk/issues/1444>
// We previously were able to enable, so this should never happen. Shutdown if
// running in secure mode.
let err = format!("could not fully enable seccomp: {:?}", seccomp_status);
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
"could not fully enable seccomp: {:?}. This should not happen, please report an issue",
seccomp_status
?worker_info,
"{}. This should not happen, please report an issue",
err
);
if security_status.secure_validator_mode {
worker_shutdown(worker_info, &err);
}
}
}

if !security::check_env_vars_were_cleared(worker_kind, worker_pid) {
if !security::check_env_vars_were_cleared(&worker_info) {
let err = "not all env vars were cleared when spawning the process";
gum::error!(
target: LOG_TARGET,
%worker_kind,
%worker_pid,
?worker_info,
"{}",
err
);
worker_shutdown_message(worker_kind, worker_pid, err);
return
if security_status.secure_validator_mode {
worker_shutdown(worker_info, err);
}
}
}

// Run the main worker loop.
let err = event_loop(stream, worker_dir_path)
let err = event_loop(stream, worker_info.worker_dir_path.clone())
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();

worker_shutdown_message(worker_kind, worker_pid, &err.to_string());
worker_shutdown(worker_info, &err.to_string());
}

/// Provide a consistent message on worker shutdown.
fn worker_shutdown_message(worker_kind: WorkerKind, worker_pid: u32, err: &str) {
gum::debug!(target: LOG_TARGET, %worker_pid, "quitting pvf worker ({}): {}", worker_kind, err);
/// Provide a consistent message on unexpected worker shutdown.
fn worker_shutdown(worker_info: WorkerInfo, err: &str) -> ! {
gum::warn!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
std::process::exit(1);
}

/// Provide a consistent error on unexpected worker shutdown.
fn worker_shutdown_error(worker_info: WorkerInfo, err: &str) -> ! {
gum::error!(target: LOG_TARGET, ?worker_info, "quitting pvf worker ({}): {}", worker_info.kind, err);
std::process::exit(1);
}

/// Loop that runs in the CPU time monitor thread on prepare and execute jobs. Continuously wakes up
Expand Down Expand Up @@ -458,6 +436,18 @@ fn kill_parent_node_in_emergency() {
}
}

/// Receives a handshake with information for the worker.
fn recv_worker_handshake(stream: &mut UnixStream) -> io::Result<WorkerHandshake> {
let worker_handshake = framed_recv_blocking(stream)?;
let worker_handshake = WorkerHandshake::decode(&mut &worker_handshake[..]).map_err(|e| {
io::Error::new(
io::ErrorKind::Other,
format!("recv_worker_handshake: failed to decode WorkerHandshake: {}", e),
)
})?;
Ok(worker_handshake)
}

/// Functionality related to threads spawned by the workers.
///
/// The motivation for this module is to coordinate worker threads without using async Rust.
Expand Down
Loading

0 comments on commit a9cf82d

Please sign in to comment.