Skip to content

Add support for decrypting S/MIME messages #11555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 28 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
24a89b8
first python API proposition
nitneuqr Nov 19, 2024
370ddfb
Update src/cryptography/hazmat/primitives/serialization/pkcs7.py
nitneuqr Nov 19, 2024
5da5053
fix: removed use of deprecated new_bound for PyBytes
nitneuqr Nov 19, 2024
f5cb0d5
corrected some error types
nitneuqr Nov 19, 2024
aa1cf3d
updated tests accordingly
nitneuqr Nov 19, 2024
3bfa0ec
fix: handling other key encryption algorithms
nitneuqr Nov 19, 2024
1be21b9
first attempts raising error when no header to remove
nitneuqr Nov 19, 2024
aac5dc3
one more test to handle text data without header
nitneuqr Nov 19, 2024
008a67d
fix: went back to the previous implementation
nitneuqr Nov 19, 2024
d20b72d
refacto: removed the return part
nitneuqr Nov 19, 2024
79ccf96
feat: Binary option does not seem useful for decryption
nitneuqr Nov 20, 2024
d318990
moved logic into rust
nitneuqr Nov 20, 2024
3ffcfa2
removed pyfunction for the inner decrypt one
nitneuqr Nov 20, 2024
cd11be3
added checks in rust now :)
nitneuqr Nov 20, 2024
fa92695
removed unused function
nitneuqr Nov 20, 2024
4defb66
some checks not needed anymore
nitneuqr Nov 20, 2024
be4db81
removed a parameter
nitneuqr Nov 20, 2024
c398b60
took comments into account
nitneuqr Nov 21, 2024
cde2c71
removed unused import
nitneuqr Nov 21, 2024
8ef2ac0
added first unwrap corrections
nitneuqr Nov 24, 2024
4ba77de
no more unwrap for parameter checks
nitneuqr Nov 24, 2024
47e5004
removing headers is Python now
nitneuqr Nov 25, 2024
0f39f75
final corrections?
nitneuqr Nov 26, 2024
4cd74b8
first version of documentation
nitneuqr Nov 26, 2024
115eada
corrected doctests
nitneuqr Nov 26, 2024
8c8523b
better indentation
nitneuqr Nov 26, 2024
7b8a7c9
doctest: added RSA private key
nitneuqr Nov 26, 2024
d521ebd
oops
nitneuqr Nov 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs/development/test-vectors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,9 @@ Custom PKCS7 Test Vectors
* ``pkcs7/enveloped-rsa-oaep.pem``- A PEM encoded PKCS7 file with
enveloped data, with key encrypted using RSA-OAEP, under the public key of
``x509/custom/ca/rsa_ca.pem``.
* ``pkcs7/enveloped-no-content.der``- A DER encoded PKCS7 file with
enveloped data, without encrypted content, with key encrypted under the
public key of ``x509/custom/ca/rsa_ca.pem``.

Custom OpenSSH Test Vectors
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
19 changes: 19 additions & 0 deletions src/cryptography/hazmat/bindings/_rust/pkcs7.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import typing

from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.serialization import pkcs7

def serialize_certificates(
Expand All @@ -22,6 +23,24 @@ def sign_and_serialize(
encoding: serialization.Encoding,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def decrypt_der(
data: bytes,
certificate: x509.Certificate,
private_key: rsa.RSAPrivateKey,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def decrypt_pem(
data: bytes,
certificate: x509.Certificate,
private_key: rsa.RSAPrivateKey,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def decrypt_smime(
data: bytes,
certificate: x509.Certificate,
private_key: rsa.RSAPrivateKey,
options: typing.Iterable[pkcs7.PKCS7Options],
) -> bytes: ...
def load_pem_pkcs7_certificates(
data: bytes,
) -> list[x509.Certificate]: ...
Expand Down
7 changes: 0 additions & 7 deletions src/cryptography/hazmat/bindings/_rust/test_support.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ class TestCertificate:
subject_value_tags: list[int]

def test_parse_certificate(data: bytes) -> TestCertificate: ...
def pkcs7_decrypt(
encoding: serialization.Encoding,
msg: bytes,
pkey: serialization.pkcs7.PKCS7PrivateKeyTypes,
cert_recipient: x509.Certificate,
options: list[pkcs7.PKCS7Options],
) -> bytes: ...
def pkcs7_verify(
encoding: serialization.Encoding,
sig: bytes,
Expand Down
15 changes: 15 additions & 0 deletions src/cryptography/hazmat/primitives/serialization/pkcs7.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ def encrypt(
return rust_pkcs7.encrypt_and_serialize(self, encoding, options)


pkcs7_decrypt_der = rust_pkcs7.decrypt_der
pkcs7_decrypt_pem = rust_pkcs7.decrypt_pem
pkcs7_decrypt_smime = rust_pkcs7.decrypt_smime


def _smime_signed_encode(
data: bytes, signature: bytes, micalg: str, text_mode: bool
) -> bytes:
Expand Down Expand Up @@ -328,6 +333,16 @@ def _smime_enveloped_encode(data: bytes) -> bytes:
return m.as_bytes(policy=m.policy.clone(linesep="\n", max_line_length=0))


def _smime_enveloped_decode(data: bytes) -> bytes:
m = email.message_from_bytes(data)
if m.get_content_type() not in {
"application/x-pkcs7-mime",
"application/pkcs7-mime",
}:
raise ValueError("Not an S/MIME enveloped message")
return bytes(m.get_payload(decode=True))


class OpenSSLMimePart(email.message.MIMEPart):
# A MIMEPart subclass that replicates OpenSSL's behavior of not including
# a newline if there are no headers.
Expand Down
274 changes: 272 additions & 2 deletions src/rust/src/pkcs7.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ use openssl::pkcs7::Pkcs7;
use pyo3::types::{PyAnyMethods, PyBytesMethods, PyListMethods};

use crate::asn1::encode_der_data;
use crate::backend::ciphers;
use crate::buf::CffiBuf;
use crate::error::{CryptographyError, CryptographyResult};
use crate::padding::PKCS7UnpaddingContext;
use crate::pkcs12::symmetric_encrypt;
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))]
use crate::x509::certificate::load_der_x509_certificate;
Expand Down Expand Up @@ -164,6 +166,273 @@ fn encrypt_and_serialize<'p>(
}
}

#[pyo3::pyfunction]
fn decrypt_smime<'p>(
py: pyo3::Python<'p>,
data: CffiBuf<'p>,
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let decoded_smime_data = types::SMIME_ENVELOPED_DECODE
.get(py)?
.call1((data.as_bytes(),))?;
let data = decoded_smime_data.extract()?;

decrypt_der(py, data, certificate, private_key, options)
}
#[pyo3::pyfunction]
fn decrypt_pem<'p>(
py: pyo3::Python<'p>,
data: &[u8],
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
let pem_str = std::str::from_utf8(data)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid PEM data"))?;
let pem = pem::parse(pem_str)
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Failed to parse PEM data"))?;

// Raise error if the PEM tag is not PKCS7
if pem.tag() != "PKCS7" {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The provided PEM data does not have the PKCS7 tag.",
),
));
}

decrypt_der(py, &pem.into_contents(), certificate, private_key, options)
}

#[pyo3::pyfunction]
fn decrypt_der<'p>(
py: pyo3::Python<'p>,
data: &[u8],
certificate: pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: pyo3::Bound<'p, pyo3::types::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> {
// Check the decrypt parameters
check_decrypt_parameters(py, &certificate, &private_key, options)?;

// Decrypt the data
let content_info = asn1::parse_single::<pkcs7::ContentInfo<'_>>(data)?;
let plain_content = match content_info.content {
pkcs7::Content::EnvelopedData(data) => {
// Extract enveloped data
let enveloped_data = data.into_inner();

// Get recipients, and the one matching with the given certificate (if any)
let mut recipient_infos = enveloped_data.recipient_infos.unwrap_read().clone();
let recipient_certificate = certificate.get().raw.borrow_dependent();
let recipient_serial_number = recipient_certificate.tbs_cert.serial;
let recipient_issuer = recipient_certificate.tbs_cert.issuer.clone();
let found_recipient_info = recipient_infos.find(|info| {
info.issuer_and_serial_number.serial_number == recipient_serial_number
&& info.issuer_and_serial_number.issuer == recipient_issuer
});

// Raise error when no recipient is found
let recipient_info = match found_recipient_info {
Some(info) => info,
None => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"No recipient found that matches the given certificate.",
),
));
}
};

// Raise error when the key encryption algorithm is not RSA
let key = match recipient_info.key_encryption_algorithm.oid() {
&oid::RSA_OID => {
let padding = types::PKCS1V15.get(py)?.call0()?;
private_key
.call_method1(
pyo3::intern!(py, "decrypt"),
(recipient_info.encrypted_key, &padding),
)?
.extract::<pyo3::pybacked::PyBackedBytes>()?
}
_ => {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"Only RSA with PKCS #1 v1.5 padding is currently supported for key decryption.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
};

// Get algorithm
// TODO: implement all the possible algorithms
let algorithm_identifier = enveloped_data
.encrypted_content_info
.content_encryption_algorithm;
let (algorithm, mode) = match algorithm_identifier.params {
AlgorithmParameters::Aes128Cbc(iv) => (
types::AES128.get(py)?.call1((key,))?,
types::CBC
.get(py)?
.call1((pyo3::types::PyBytes::new(py, &iv),))?,
),
_ => {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"Only AES-128-CBC is currently supported for content decryption.",
exceptions::Reasons::UNSUPPORTED_SERIALIZATION,
)),
));
}
};

// Decrypt the content using the key and proper algorithm
let encrypted_content = match enveloped_data.encrypted_content_info.encrypted_content {
Some(content) => content,
None => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The EnvelopedData structure does not contain encrypted content.",
),
));
}
};
let decrypted_content = symmetric_decrypt(py, algorithm, mode, encrypted_content)?;
pyo3::types::PyBytes::new(py, decrypted_content.as_slice())
}
_ => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"The PKCS7 data is not an EnvelopedData structure.",
),
));
}
};

// If text_mode, strip the header if possible, else return an error
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reaperhulk can I get your opinion on the right behavior for text vs. binary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added these flags to behave like OpenSSL (sigh).

OpenSSL calls SMIME_text in its decryption path when you pass PKCS7_TEXT. That function errors for a few conditions, but most notably if parsing the MIME headers fails, if there is no “content-type” header, or if the content type is not “text/plain”.

This code appears to make two assumptions that may not match OpenSSL's though:

  • Case sensitivity on Content-Type
  • That the Content-Type header is guaranteed to be the last header before the payload.

The behavior I believe we want is to parse the MIME headers, check that content-type is text/plain, and then discard them entirely and only keep the payload.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have any suggestion of a lib to handle MIME efficiently in rust? For now, I'm thinking about
using some Python function. Seems simpler to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just use Python for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First version pushed, will compare with openssl and let you know!

And yeah, it should be up to the user to sanitize the decrypted output after pkcs7_decrypt, especially with the libraries available on Python that are perfect for this ...

Copy link
Contributor Author

@nitneuqr nitneuqr Nov 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested against some custom headers (you can see them in tests), behavior is the same as openssl (using test_support.pkcs7_decrypt from main).

let plain_data = plain_content.as_bytes();
let plain_data = if options.contains(types::PKCS7_TEXT.get(py)?)? {
match plain_data.strip_prefix(b"Content-Type: text/plain\r\n\r\n") {
Some(stripped_data) => stripped_data,
None => {
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"No 'Content-Type' header to be deleted in the decrypted data. Please remove the 'text' option.",
),
));
}
}
} else {
plain_data
};

let plain_data = pyo3::types::PyBytes::new(py, plain_data);
Ok(plain_data)
}

fn check_decrypt_parameters<'p>(
py: pyo3::Python<'p>,
certificate: &pyo3::Bound<'p, x509::certificate::Certificate>,
private_key: &pyo3::Bound<'p, pyo3::PyAny>,
options: &pyo3::Bound<'p, pyo3::types::PyList>,
) -> Result<(), CryptographyError> {
// Check if RSA encryption with PKCS1 v1.5 padding is supported (dependent of FIPS mode)
if cryptography_openssl::fips::is_enabled() {
return Err(CryptographyError::from(
exceptions::UnsupportedAlgorithm::new_err((
"RSA with PKCS1 v1.5 padding is not supported by this version of OpenSSL.",
exceptions::Reasons::UNSUPPORTED_PADDING,
)),
));
}

// Check if all options are from the PKCS7Options enum
let pkcs7_option_type = types::PKCS7_TEXT.get(py)?.get_type();
if !options
.iter()
.all(|opt| opt.is_instance(&pkcs7_option_type).unwrap())
{
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err("options must be from the PKCS7Options enum"),
));
}

// Check if any option is not PKCS7Options::Text
let text_option = types::PKCS7_TEXT.get(py)?;
if options
.iter()
.any(|opt| !opt.eq(text_option.clone()).unwrap())
{
return Err(CryptographyError::from(
pyo3::exceptions::PyValueError::new_err(
"Only the following options are supported for decryption: Text",
),
));
}

// Check if certificate's public key is an RSA public key
let public_key_type = types::RSA_PUBLIC_KEY.get(py)?;
if !certificate
.call_method0(pyo3::intern!(py, "public_key"))?
.is_instance(&public_key_type)?
{
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Only certificate with RSA public keys are supported at this time.",
),
));
}

// Check if private_key is an instance of RSA private key
let private_key_type = types::RSA_PRIVATE_KEY.get(py)?;
if !private_key.is_instance(&private_key_type)? {
return Err(CryptographyError::from(
pyo3::exceptions::PyTypeError::new_err(
"Only RSA private keys are supported at this time.",
),
));
}

Ok(())
}

pub(crate) fn symmetric_decrypt(
py: pyo3::Python<'_>,
algorithm: pyo3::Bound<'_, pyo3::PyAny>,
mode: pyo3::Bound<'_, pyo3::PyAny>,
data: &[u8],
) -> CryptographyResult<Vec<u8>> {
let block_size = algorithm
.getattr(pyo3::intern!(py, "block_size"))?
.extract()?;

let mut cipher =
ciphers::CipherContext::new(py, algorithm, mode, openssl::symm::Mode::Decrypt)?;

// Decrypt the data
let mut decrypted_data = vec![0; data.len() + (block_size / 8)];
let count = cipher.update_into(py, data, &mut decrypted_data)?;
let final_block = cipher.finalize(py)?;
assert!(final_block.as_bytes().is_empty());
decrypted_data.truncate(count);

// Unpad the data
let mut unpadder = PKCS7UnpaddingContext::new(block_size);
let unpadded_first_blocks = unpadder.update(py, CffiBuf::from_bytes(py, &decrypted_data))?;
let unpadded_last_block = unpadder.finalize(py)?;

let unpadded_data = [
unpadded_first_blocks.as_bytes(),
unpadded_last_block.as_bytes(),
]
.concat();

Ok(unpadded_data)
}

#[pyo3::pyfunction]
fn sign_and_serialize<'p>(
py: pyo3::Python<'p>,
Expand Down Expand Up @@ -507,8 +776,9 @@ fn load_der_pkcs7_certificates<'p>(
pub(crate) mod pkcs7_mod {
#[pymodule_export]
use super::{
encrypt_and_serialize, load_der_pkcs7_certificates, load_pem_pkcs7_certificates,
serialize_certificates, sign_and_serialize,
decrypt_der, decrypt_pem, decrypt_smime, encrypt_and_serialize,
load_der_pkcs7_certificates, load_pem_pkcs7_certificates, serialize_certificates,
sign_and_serialize,
};
}

Expand Down
Loading