-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Add support for decrypting S/MIME messages #11555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 20 commits
24a89b8
370ddfb
5da5053
f5cb0d5
aa1cf3d
3bfa0ec
1be21b9
aac5dc3
008a67d
d20b72d
79ccf96
d318990
3ffcfa2
cd11be3
fa92695
4defb66
be4db81
c398b60
cde2c71
8ef2ac0
4ba77de
47e5004
0f39f75
4cd74b8
115eada
8c8523b
7b8a7c9
d521ebd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,8 +16,10 @@ use openssl::pkcs7::Pkcs7; | |
use pyo3::types::{PyAnyMethods, PyBytesMethods, PyListMethods}; | ||
|
||
use crate::asn1::encode_der_data; | ||
use crate::backend::ciphers; | ||
use crate::buf::CffiBuf; | ||
use crate::error::{CryptographyError, CryptographyResult}; | ||
use crate::padding::PKCS7UnpaddingContext; | ||
use crate::pkcs12::symmetric_encrypt; | ||
#[cfg(not(CRYPTOGRAPHY_IS_BORINGSSL))] | ||
use crate::x509::certificate::load_der_x509_certificate; | ||
|
@@ -164,6 +166,273 @@ fn encrypt_and_serialize<'p>( | |
} | ||
} | ||
|
||
#[pyo3::pyfunction] | ||
fn decrypt_smime<'p>( | ||
py: pyo3::Python<'p>, | ||
data: CffiBuf<'p>, | ||
certificate: pyo3::Bound<'p, x509::certificate::Certificate>, | ||
private_key: pyo3::Bound<'p, pyo3::types::PyAny>, | ||
options: &pyo3::Bound<'p, pyo3::types::PyList>, | ||
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> { | ||
let decoded_smime_data = types::SMIME_ENVELOPED_DECODE | ||
.get(py)? | ||
.call1((data.as_bytes(),))?; | ||
let data = decoded_smime_data.extract()?; | ||
|
||
decrypt_der(py, data, certificate, private_key, options) | ||
} | ||
#[pyo3::pyfunction] | ||
fn decrypt_pem<'p>( | ||
py: pyo3::Python<'p>, | ||
data: &[u8], | ||
certificate: pyo3::Bound<'p, x509::certificate::Certificate>, | ||
private_key: pyo3::Bound<'p, pyo3::types::PyAny>, | ||
options: &pyo3::Bound<'p, pyo3::types::PyList>, | ||
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> { | ||
let pem_str = std::str::from_utf8(data) | ||
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Invalid PEM data"))?; | ||
let pem = pem::parse(pem_str) | ||
.map_err(|_| pyo3::exceptions::PyValueError::new_err("Failed to parse PEM data"))?; | ||
|
||
// Raise error if the PEM tag is not PKCS7 | ||
if pem.tag() != "PKCS7" { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"The provided PEM data does not have the PKCS7 tag.", | ||
), | ||
)); | ||
} | ||
|
||
decrypt_der(py, &pem.into_contents(), certificate, private_key, options) | ||
} | ||
|
||
#[pyo3::pyfunction] | ||
fn decrypt_der<'p>( | ||
py: pyo3::Python<'p>, | ||
data: &[u8], | ||
certificate: pyo3::Bound<'p, x509::certificate::Certificate>, | ||
private_key: pyo3::Bound<'p, pyo3::types::PyAny>, | ||
options: &pyo3::Bound<'p, pyo3::types::PyList>, | ||
) -> CryptographyResult<pyo3::Bound<'p, pyo3::types::PyBytes>> { | ||
// Check the decrypt parameters | ||
check_decrypt_parameters(py, &certificate, &private_key, options)?; | ||
|
||
// Decrypt the data | ||
let content_info = asn1::parse_single::<pkcs7::ContentInfo<'_>>(data)?; | ||
let plain_content = match content_info.content { | ||
pkcs7::Content::EnvelopedData(data) => { | ||
// Extract enveloped data | ||
let enveloped_data = data.into_inner(); | ||
|
||
// Get recipients, and the one matching with the given certificate (if any) | ||
let mut recipient_infos = enveloped_data.recipient_infos.unwrap_read().clone(); | ||
let recipient_certificate = certificate.get().raw.borrow_dependent(); | ||
let recipient_serial_number = recipient_certificate.tbs_cert.serial; | ||
let recipient_issuer = recipient_certificate.tbs_cert.issuer.clone(); | ||
let found_recipient_info = recipient_infos.find(|info| { | ||
info.issuer_and_serial_number.serial_number == recipient_serial_number | ||
&& info.issuer_and_serial_number.issuer == recipient_issuer | ||
}); | ||
|
||
// Raise error when no recipient is found | ||
let recipient_info = match found_recipient_info { | ||
Some(info) => info, | ||
None => { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"No recipient found that matches the given certificate.", | ||
), | ||
)); | ||
} | ||
}; | ||
|
||
// Raise error when the key encryption algorithm is not RSA | ||
let key = match recipient_info.key_encryption_algorithm.oid() { | ||
&oid::RSA_OID => { | ||
let padding = types::PKCS1V15.get(py)?.call0()?; | ||
private_key | ||
.call_method1( | ||
pyo3::intern!(py, "decrypt"), | ||
(recipient_info.encrypted_key, &padding), | ||
)? | ||
.extract::<pyo3::pybacked::PyBackedBytes>()? | ||
} | ||
_ => { | ||
return Err(CryptographyError::from( | ||
exceptions::UnsupportedAlgorithm::new_err(( | ||
"Only RSA with PKCS #1 v1.5 padding is currently supported for key decryption.", | ||
exceptions::Reasons::UNSUPPORTED_SERIALIZATION, | ||
)), | ||
)); | ||
} | ||
}; | ||
|
||
// Get algorithm | ||
// TODO: implement all the possible algorithms | ||
let algorithm_identifier = enveloped_data | ||
.encrypted_content_info | ||
.content_encryption_algorithm; | ||
let (algorithm, mode) = match algorithm_identifier.params { | ||
AlgorithmParameters::Aes128Cbc(iv) => ( | ||
types::AES128.get(py)?.call1((key,))?, | ||
types::CBC | ||
.get(py)? | ||
.call1((pyo3::types::PyBytes::new(py, &iv),))?, | ||
), | ||
_ => { | ||
return Err(CryptographyError::from( | ||
exceptions::UnsupportedAlgorithm::new_err(( | ||
"Only AES-128-CBC is currently supported for content decryption.", | ||
exceptions::Reasons::UNSUPPORTED_SERIALIZATION, | ||
)), | ||
)); | ||
} | ||
}; | ||
|
||
// Decrypt the content using the key and proper algorithm | ||
let encrypted_content = match enveloped_data.encrypted_content_info.encrypted_content { | ||
Some(content) => content, | ||
None => { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"The EnvelopedData structure does not contain encrypted content.", | ||
), | ||
)); | ||
} | ||
}; | ||
let decrypted_content = symmetric_decrypt(py, algorithm, mode, encrypted_content)?; | ||
pyo3::types::PyBytes::new(py, decrypted_content.as_slice()) | ||
} | ||
_ => { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"The PKCS7 data is not an EnvelopedData structure.", | ||
), | ||
)); | ||
} | ||
}; | ||
|
||
// If text_mode, strip the header if possible, else return an error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @reaperhulk can I get your opinion on the right behavior for text vs. binary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We added these flags to behave like OpenSSL (sigh). OpenSSL calls This code appears to make two assumptions that may not match OpenSSL's though:
The behavior I believe we want is to parse the MIME headers, check that content-type is text/plain, and then discard them entirely and only keep the payload. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you have any suggestion of a lib to handle MIME efficiently in rust? For now, I'm thinking about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would just use Python for this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First version pushed, will compare with And yeah, it should be up to the user to sanitize the decrypted output after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've tested against some custom headers (you can see them in tests), behavior is the same as |
||
let plain_data = plain_content.as_bytes(); | ||
let plain_data = if options.contains(types::PKCS7_TEXT.get(py)?)? { | ||
match plain_data.strip_prefix(b"Content-Type: text/plain\r\n\r\n") { | ||
Some(stripped_data) => stripped_data, | ||
None => { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"No 'Content-Type' header to be deleted in the decrypted data. Please remove the 'text' option.", | ||
), | ||
)); | ||
} | ||
} | ||
} else { | ||
plain_data | ||
}; | ||
|
||
let plain_data = pyo3::types::PyBytes::new(py, plain_data); | ||
Ok(plain_data) | ||
} | ||
|
||
fn check_decrypt_parameters<'p>( | ||
py: pyo3::Python<'p>, | ||
certificate: &pyo3::Bound<'p, x509::certificate::Certificate>, | ||
private_key: &pyo3::Bound<'p, pyo3::PyAny>, | ||
options: &pyo3::Bound<'p, pyo3::types::PyList>, | ||
) -> Result<(), CryptographyError> { | ||
// Check if RSA encryption with PKCS1 v1.5 padding is supported (dependent of FIPS mode) | ||
if cryptography_openssl::fips::is_enabled() { | ||
return Err(CryptographyError::from( | ||
exceptions::UnsupportedAlgorithm::new_err(( | ||
"RSA with PKCS1 v1.5 padding is not supported by this version of OpenSSL.", | ||
exceptions::Reasons::UNSUPPORTED_PADDING, | ||
)), | ||
)); | ||
} | ||
|
||
// Check if all options are from the PKCS7Options enum | ||
let pkcs7_option_type = types::PKCS7_TEXT.get(py)?.get_type(); | ||
nitneuqr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if !options | ||
.iter() | ||
.all(|opt| opt.is_instance(&pkcs7_option_type).unwrap()) | ||
{ | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err("options must be from the PKCS7Options enum"), | ||
)); | ||
} | ||
nitneuqr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
// Check if any option is not PKCS7Options::Text | ||
let text_option = types::PKCS7_TEXT.get(py)?; | ||
if options | ||
.iter() | ||
.any(|opt| !opt.eq(text_option.clone()).unwrap()) | ||
nitneuqr marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyValueError::new_err( | ||
"Only the following options are supported for decryption: Text", | ||
), | ||
)); | ||
} | ||
|
||
// Check if certificate's public key is an RSA public key | ||
let public_key_type = types::RSA_PUBLIC_KEY.get(py)?; | ||
if !certificate | ||
.call_method0(pyo3::intern!(py, "public_key"))? | ||
.is_instance(&public_key_type)? | ||
{ | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyTypeError::new_err( | ||
"Only certificate with RSA public keys are supported at this time.", | ||
), | ||
)); | ||
} | ||
|
||
// Check if private_key is an instance of RSA private key | ||
let private_key_type = types::RSA_PRIVATE_KEY.get(py)?; | ||
if !private_key.is_instance(&private_key_type)? { | ||
return Err(CryptographyError::from( | ||
pyo3::exceptions::PyTypeError::new_err( | ||
"Only RSA private keys are supported at this time.", | ||
), | ||
)); | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
pub(crate) fn symmetric_decrypt( | ||
py: pyo3::Python<'_>, | ||
algorithm: pyo3::Bound<'_, pyo3::PyAny>, | ||
mode: pyo3::Bound<'_, pyo3::PyAny>, | ||
data: &[u8], | ||
) -> CryptographyResult<Vec<u8>> { | ||
let block_size = algorithm | ||
.getattr(pyo3::intern!(py, "block_size"))? | ||
.extract()?; | ||
|
||
let mut cipher = | ||
ciphers::CipherContext::new(py, algorithm, mode, openssl::symm::Mode::Decrypt)?; | ||
|
||
// Decrypt the data | ||
let mut decrypted_data = vec![0; data.len() + (block_size / 8)]; | ||
let count = cipher.update_into(py, data, &mut decrypted_data)?; | ||
let final_block = cipher.finalize(py)?; | ||
assert!(final_block.as_bytes().is_empty()); | ||
decrypted_data.truncate(count); | ||
|
||
// Unpad the data | ||
let mut unpadder = PKCS7UnpaddingContext::new(block_size); | ||
let unpadded_first_blocks = unpadder.update(py, CffiBuf::from_bytes(py, &decrypted_data))?; | ||
let unpadded_last_block = unpadder.finalize(py)?; | ||
|
||
let unpadded_data = [ | ||
unpadded_first_blocks.as_bytes(), | ||
unpadded_last_block.as_bytes(), | ||
] | ||
.concat(); | ||
|
||
Ok(unpadded_data) | ||
} | ||
|
||
#[pyo3::pyfunction] | ||
fn sign_and_serialize<'p>( | ||
py: pyo3::Python<'p>, | ||
|
@@ -507,8 +776,9 @@ fn load_der_pkcs7_certificates<'p>( | |
pub(crate) mod pkcs7_mod { | ||
#[pymodule_export] | ||
use super::{ | ||
encrypt_and_serialize, load_der_pkcs7_certificates, load_pem_pkcs7_certificates, | ||
serialize_certificates, sign_and_serialize, | ||
decrypt_der, decrypt_pem, decrypt_smime, encrypt_and_serialize, | ||
load_der_pkcs7_certificates, load_pem_pkcs7_certificates, serialize_certificates, | ||
sign_and_serialize, | ||
}; | ||
} | ||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.