Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,129 changes: 1,110 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ mailparse = "0.16.1"
log = "0.4.29"
env_logger = "0.11.8"
governor = "0.10.4"
viadkim = { version = "0.2.0" }
hickory-resolver = "0.25.2"
lru = "0.16.3"
parking_lot = "0.12.5"

[dev-dependencies]
rstest = "0.26.1"
Expand Down
674 changes: 674 additions & 0 deletions LICENSE-GPL

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ Rust drop-in reimplementation of chatmaild's filtermail.
```plain
filtermail <config> (incoming|outgoing)
```

## License

Code licensed under [MIT](LICENSE).

Binaries distributed under [GPL-3.0-or-later](LICENSE).
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub struct Config {
pub passthrough_senders: Vec<String>,
#[serde(default, deserialize_with = "deserialize_sequence")]
pub passthrough_recipients: Vec<String>,
mail_domain: String,
pub mail_domain: String,
mailboxes_dir: Option<PathBuf>,
}

Expand Down
221 changes: 221 additions & 0 deletions src/dkim_verifier.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
use crate::utils::get_domain_from_address;
use hickory_resolver::name_server::TokioConnectionProvider;
use hickory_resolver::{Name, TokioResolver};
use lru::LruCache;
use std::io;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use viadkim::VerificationStatus;
use viadkim::message_hash::BodyHasherStance;
use viadkim::verifier::LookupTxt;

// ~500kB when fully saturated (~420B per RDATA + selector).
// "top 1000 relays" is much more than enough, the limit is mostly to prevent DoS attacks.
const LRU_CACHE_CAPACITY: NonZeroUsize = NonZeroUsize::new(1000).expect("1000 != 0");

/// DNS resolver for DKIM TXT records, that caches RDATA in memory.
#[derive(Clone)]
struct CachedResolver {
dns_resolver: TokioResolver,
// Note: Arc is required despite we are holding the whole handler in an Arc,
// because viadkim will internally clone the resolver (LookupTxt + Clone + 'static)
// to parallelize lookups in case of multiple signatures...
cache: Arc<parking_lot::Mutex<LruCache<Name, Vec<u8>>>>,
}

impl CachedResolver {
/// Creates a new [`CachedResolver`].
pub fn new() -> Result<Self, crate::error::Error> {
// Use resolv.conf
let dns_resolver = TokioResolver::builder(TokioConnectionProvider::default())?.build();

let cache = Arc::new(parking_lot::Mutex::new(LruCache::new(LRU_CACHE_CAPACITY)));

Ok(Self {
dns_resolver,
cache,
})
}

/// Normalizes a TXT record RDATA by removing whitespace characters.
///
/// Some DKIM key records use e.g. LF + WSP line breaks.
/// This is technically not correct, and `viadkim` fails to parse such records,
/// but in practice this is accepted by many implementations.
fn normalize_rdata(txt_data: &str) -> String {
txt_data.replace([' ', '\t', '\n', '\r'], "")
}

/// Invalidates the cached RDATA for a given selector and domain.
///
/// Fails silently.
fn invalidate_cache(&self, selector: &str, domain: &str) {
let selector_domain_str = format!("{}._domainkey.{}.", selector, domain);
if let Ok(selector_domain) = Name::from_ascii(&selector_domain_str) {
let mut cache = self.cache.lock();
cache.pop(&selector_domain);
log::debug!("Cache invalidated for {}", selector_domain_str);
} else {
log::warn!(
"Failed to parse selector domain for cache invalidation: {}",
selector_domain_str
);
}
}
}

impl LookupTxt for CachedResolver {
type Answer = Box<dyn Iterator<Item = io::Result<Vec<u8>>>>;
type Query<'a> = Pin<Box<dyn Future<Output = io::Result<Self::Answer>> + Send + 'a>>;

fn lookup_txt(&self, domain: &str) -> Self::Query<'_> {
let name = Name::from_ascii(domain);
Box::pin(async move {
let name = name.map_err(|_| io::ErrorKind::InvalidInput)?;

{
let mut cache = self.cache.lock();
if let Some(rdata) = cache.get(&name) {
let txts: Self::Answer = Box::new(std::iter::once(Ok(rdata.clone())));
log::debug!("Using cached RDATA for {}", name);
return Ok(txts);
}
}

log::debug!("Trying to resolve TXT for {}", name);
let txt = {
let lookup = self.dns_resolver.txt_lookup(name.clone()).await?;

// viadkim would filter out non-DKIM TXT records,
// but we filter it here anyway so that we know which one should be cached.
lookup
.into_iter()
.find_map(|txt| {
let rdata_raw = txt.txt_data().concat();
let rdata = String::from_utf8_lossy(&rdata_raw);
// naive check, but this can't be an attack vector,
// and selector domain should only have a single TXT record anyway.
if rdata.contains("DKIM") {
Some(Self::normalize_rdata(&rdata).into_bytes())
} else {
None
}
})
.ok_or(io::ErrorKind::NotFound)?
};

{
let mut cache = self.cache.lock();
cache.put(name, txt.clone());
}

let txts: Self::Answer = Box::new(std::iter::once(Ok(txt)));
Ok(txts)
})
}
}

/// DKIM verifier using a pre-configured [`viadkim`] verifier, a [`CachedResolver`] for DNS lookups,
/// and strict domain name alignment check.
pub struct DkimVerifier {
resolver: CachedResolver,
config: viadkim::Config,
}

impl DkimVerifier {
/// Creates a new [`DkimVerifier`] with the provided resolver.
pub fn new() -> Result<Self, crate::error::Error> {
let resolver = CachedResolver::new()?;
let config = viadkim::Config {
lookup_timeout: Duration::from_secs(60),
..Default::default()
};
Ok(Self { resolver, config })
}

/// Verifies the DKIM signature of a raw email message and alignment with the domain of
/// provided `From` address.
pub async fn verify(&self, raw_mail: &[u8], from_address: &str) -> Result<(), String> {
let (headers, body_start) = {
use viadkim::{FieldBody, FieldName, HeaderField};

let Ok((headers, body_start)) = mailparse::parse_headers(raw_mail) else {
return Err("500 Failed to parse message headers".to_string());
};

let mut viadkim_headers: Vec<HeaderField> = Vec::new();
for header in headers {
match (
FieldName::new(header.get_key()),
FieldBody::new(header.get_value_raw()),
) {
(Ok(name), Ok(body)) => viadkim_headers.push((name, body)),
(Err(e), _) | (_, Err(e)) => {
log::debug!("Failed to parse header {header:?}, skipping: {e}");
}
}
}

let viadkim_headers = viadkim::HeaderFields::new(viadkim_headers).map_err(|e| {
log::error!("Failed to parse headers for DKIM verification: {e}");
"500 Failed to parse message headers".to_string()
})?;

(viadkim_headers, body_start)
};

let Some(from_domain) = get_domain_from_address(from_address) else {
return Err("501 Invalid From address".to_string());
};

log::debug!("`From` header domain: {from_domain}");

let Some(mut verifier) =
viadkim::Verifier::verify_header(&self.resolver, &headers, &self.config).await
else {
return Err("554 5.7.1 No DKIM signature found".to_string());
};

'hasher: for chunk in raw_mail.get(body_start..).unwrap_or_default().chunks(8192) {
if verifier.process_body_chunk(chunk) == BodyHasherStance::Done {
break 'hasher;
}
}

for res in verifier.finish() {
log::debug!("Signature {}: {:?}", res.index, res.status);

let Some(signature) = &res.signature else {
log::debug!("Signature {}: No signature found, skipping", res.index);
continue;
};

if matches!(res.status, VerificationStatus::Failure(_)) {
log::debug!("Signature {}: Verification failed, skipping", res.index);
// We only invalidate cache on actual validation error, and not alignment error.
// TODO: ideally we should retry without cache and swap cached value only on success.
self.resolver
.invalidate_cache(signature.selector.as_ref(), signature.domain.as_ref());
continue;
}

if !signature
.domain
.to_string()
.eq_ignore_ascii_case(&from_domain)
{
log::debug!(
"Signature {}: Domain different than in From header, skipping",
res.index
);
continue;
}

return Ok(());
}

Err("554 5.7.1 No valid DKIM signature found".to_string())
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub enum Error {
Config(#[from] serini::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
#[error(transparent)]
Resolve(#[from] hickory_resolver::ResolveError),
#[error("OpenPGP packet header is truncated - can't validate!")]
TruncatedHeader,
#[error("Unable to send email, Error during {context}, server said: {raw_smtp_answer}")]
Expand Down
47 changes: 28 additions & 19 deletions src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,26 @@

use crate::ENCRYPTION_NEEDED_523;
use crate::config::Config;
use crate::dkim_verifier::DkimVerifier;
use crate::message::{check_encrypted, is_securejoin};
pub use crate::smtp_server::Envelope;
use crate::smtp_server::SmtpHandler;
use crate::utils::extract_address;
use async_trait::async_trait;
use mailparse::{MailHeaderMap, parse_mail};

pub use crate::smtp_server::Envelope;
use crate::utils::extract_address;

/// Handler for incoming SMTP messages.
pub struct IncomingBeforeQueueHandler {
config: Config,
dkim_verifier: DkimVerifier,
}

impl IncomingBeforeQueueHandler {
pub fn new(config: Config) -> Self {
Self { config }
pub fn new(config: Config) -> Result<Self, crate::error::Error> {
Ok(Self {
config,
dkim_verifier: DkimVerifier::new()?,
})
}
}

Expand All @@ -27,14 +31,29 @@ impl SmtpHandler for IncomingBeforeQueueHandler {
Ok(())
}

fn check_data(&self, envelope: &Envelope) -> Result<(), String> {
async fn check_data(&self, envelope: &Envelope) -> Result<(), String> {
log::debug!("Processing DATA message from {}", envelope.mail_from);

let message = match parse_mail(&envelope.data) {
Ok(m) => m,
Err(e) => return Err(format!("500 Failed to parse message: {}", e)),
};

let from_header = message
.headers
.get_first_value("From")
.unwrap_or_default()
.trim()
.to_string();

let Some(from_addr) = extract_address(&from_header) else {
return Err(format!("500 Invalid FROM header: {from_header}"));
};

self.dkim_verifier
.verify(&envelope.data, &from_addr)
.await?;

let mail_encrypted = check_encrypted(&message, false);
log::debug!("mail_encrypted: {mail_encrypted}");
log::debug!("is_securejoin: {}", is_securejoin(&message));
Expand All @@ -50,20 +69,10 @@ impl SmtpHandler for IncomingBeforeQueueHandler {
// Allow cleartext mailer-daemon messages
if let Some(auto_submitted) = message.headers.get_first_value("Auto-Submitted")
&& !auto_submitted.is_empty()
&& from_addr.to_lowercase().starts_with("mailer-daemon@")
&& message.ctype.mimetype == "multipart/report"
{
let from_header = message
.headers
.get_first_value("From")
.unwrap_or_default()
.trim()
.to_string();

if let Some(from_addr) = extract_address(&from_header)
&& from_addr.to_lowercase().starts_with("mailer-daemon@")
&& message.ctype.mimetype == "multipart/report"
{
return Ok(());
}
return Ok(());
}

for recipient in &envelope.rcpt_to {
Expand Down
6 changes: 5 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
clippy::bool_to_int_with_if
)]
mod config;
mod dkim_verifier;
pub(crate) mod error;
pub(crate) mod inbound;
pub(crate) mod message;
Expand Down Expand Up @@ -95,7 +96,10 @@ async fn main() {
process::exit(1);
}
} else {
let handler = Arc::new(IncomingBeforeQueueHandler::new(config.clone()));
let handler = Arc::new(
// We want to panic here if the handler cannot be created.
IncomingBeforeQueueHandler::new(config.clone()).unwrap(),
);
let addr = format!("127.0.0.1:{}", config.filtermail_smtp_port_incoming);
let max_size = config.max_message_size;
log::debug!("Incoming SMTP server listening on {addr}");
Expand Down
2 changes: 1 addition & 1 deletion src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl SmtpHandler for OutgoingBeforeQueueHandler {
Ok(())
}

fn check_data(&self, envelope: &Envelope) -> Result<(), String> {
async fn check_data(&self, envelope: &Envelope) -> Result<(), String> {
log::debug!("Processing DATA message from {}", envelope.mail_from);

let message = match parse_mail(&envelope.data) {
Expand Down
4 changes: 2 additions & 2 deletions src/smtp_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ pub trait SmtpHandler: Send + Sync {
fn handle_mail(&self, address: &str) -> Result<(), String>;

/// Checks the DATA command before reinjection.
fn check_data(&self, envelope: &Envelope) -> Result<(), String>;
async fn check_data(&self, envelope: &Envelope) -> Result<(), String>;

/// Reinjects the mail back to postfix.
async fn reinject_mail(&self, envelope: &Envelope) -> Result<(), String>;

/// Handles the DATA command.
async fn handle_data(&self, envelope: &Envelope) -> Result<String, String> {
log::debug!("handle_DATA before-queue");
self.check_data(envelope)?;
self.check_data(envelope).await?;
self.reinject_mail(envelope).await.map_err(|e| {
log::warn!("Failed to reinject mail: {e}");
e
Expand Down
Loading