Skip to content

Commit f80171e

Browse files
authored
RUST-1411 Add create_encrypted_collection helper (#853)
1 parent 7a09f63 commit f80171e

File tree

12 files changed

+259
-25
lines changed

12 files changed

+259
-25
lines changed

src/client/csfle/state_machine.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,12 @@ use tokio::{
1414
};
1515

1616
use crate::{
17-
client::{auth::Credential, options::ServerAddress, WeakClient},
17+
client::{options::ServerAddress, WeakClient},
1818
coll::options::FindOptions,
1919
error::{Error, Result},
2020
operation::{RawOutput, RunCommand},
2121
options::ReadConcern,
22-
runtime::{AsyncStream, HttpClient, Process, TlsConfig},
22+
runtime::{AsyncStream, Process, TlsConfig},
2323
Client,
2424
Namespace,
2525
};
@@ -209,6 +209,7 @@ impl CryptExecutor {
209209
}
210210
State::NeedKmsCredentials => {
211211
let ctx = result_mut(&mut ctx)?;
212+
#[allow(unused_mut)]
212213
let mut out = rawdoc! {};
213214
if self
214215
.kms_providers
@@ -219,8 +220,8 @@ impl CryptExecutor {
219220
#[cfg(feature = "aws-auth")]
220221
{
221222
let aws_creds = crate::client::auth::aws::AwsCredential::get(
222-
&Credential::default(),
223-
&HttpClient::default(),
223+
&crate::client::auth::Credential::default(),
224+
&crate::runtime::HttpClient::default(),
224225
)
225226
.await?;
226227
let mut creds = rawdoc! {

src/db/mod.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use std::{fmt::Debug, sync::Arc};
66
use bson::doc;
77
use futures_util::stream::TryStreamExt;
88

9+
#[cfg(feature = "in-use-encryption-unstable")]
10+
use crate::client_encryption::{ClientEncryption, MasterKey};
911
use crate::{
1012
bson::{Bson, Document},
1113
change_stream::{
@@ -421,6 +423,56 @@ impl Database {
421423
self.create_collection_common(name, options, session).await
422424
}
423425

426+
/// Creates a new collection with encrypted fields, automatically creating new data encryption
427+
/// keys when needed based on the configured [`CreateCollectionOptions::encrypted_fields`].
428+
///
429+
/// Returns the potentially updated `encrypted_fields` along with status, as keys may have been
430+
/// created even when a failure occurs.
431+
///
432+
/// Does not affect any auto encryption settings on existing MongoClients that are already
433+
/// configured with auto encryption.
434+
#[cfg(feature = "in-use-encryption-unstable")]
435+
pub async fn create_encrypted_collection(
436+
&self,
437+
ce: &ClientEncryption,
438+
name: impl AsRef<str>,
439+
options: impl Into<Option<CreateCollectionOptions>>,
440+
master_key: MasterKey,
441+
) -> (Document, Result<()>) {
442+
let options: Option<CreateCollectionOptions> = options.into();
443+
let ef = match options.as_ref().and_then(|o| o.encrypted_fields.as_ref()) {
444+
Some(ef) => ef,
445+
None => {
446+
return (
447+
doc! {},
448+
Err(Error::invalid_argument(
449+
"no encrypted_fields defined for collection",
450+
)),
451+
);
452+
}
453+
};
454+
let mut ef_prime = ef.clone();
455+
if let Ok(fields) = ef_prime.get_array_mut("fields") {
456+
for f in fields {
457+
let f_doc = if let Some(d) = f.as_document_mut() {
458+
d
459+
} else {
460+
continue;
461+
};
462+
if f_doc.get("keyId") == Some(&Bson::Null) {
463+
let d = match ce.create_data_key(master_key.clone()).run().await {
464+
Ok(v) => v,
465+
Err(e) => return (ef_prime, Err(e)),
466+
};
467+
f_doc.insert("keyId", d);
468+
}
469+
}
470+
}
471+
let mut opts_prime = options.unwrap().clone(); // safe unwrap: no options would be caught by the encrypted_fields check
472+
opts_prime.encrypted_fields = Some(ef_prime.clone());
473+
(ef_prime, self.create_collection(name, opts_prime).await)
474+
}
475+
424476
pub(crate) async fn run_command_common(
425477
&self,
426478
command: Document,

src/error.rs

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,15 +158,15 @@ impl Error {
158158
}
159159

160160
pub(crate) fn is_max_time_ms_expired_error(&self) -> bool {
161-
self.code() == Some(50)
161+
self.sdam_code() == Some(50)
162162
}
163163

164164
/// Whether a read operation should be retried if this error occurs.
165165
pub(crate) fn is_read_retryable(&self) -> bool {
166166
if self.is_network_error() {
167167
return true;
168168
}
169-
match self.code() {
169+
match self.sdam_code() {
170170
Some(code) => RETRYABLE_READ_CODES.contains(&code),
171171
None => false,
172172
}
@@ -187,7 +187,7 @@ impl Error {
187187
if self.is_network_error() {
188188
return true;
189189
}
190-
match &self.code() {
190+
match &self.sdam_code() {
191191
Some(code) => RETRYABLE_WRITE_CODES.contains(code),
192192
None => false,
193193
}
@@ -201,7 +201,7 @@ impl Error {
201201
{
202202
return true;
203203
}
204-
match self.code() {
204+
match self.sdam_code() {
205205
Some(code) => UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL_CODES.contains(&code),
206206
None => false,
207207
}
@@ -259,7 +259,7 @@ impl Error {
259259

260260
/// Gets the code from this error for performing SDAM updates, if applicable.
261261
/// Any codes contained in WriteErrors are ignored.
262-
pub(crate) fn code(&self) -> Option<i32> {
262+
pub(crate) fn sdam_code(&self) -> Option<i32> {
263263
match self.kind.as_ref() {
264264
ErrorKind::Command(command_error) => Some(command_error.code),
265265
// According to SDAM spec, write concern error codes MUST also be checked, and
@@ -271,7 +271,22 @@ impl Error {
271271
ErrorKind::Write(WriteFailure::WriteConcernError(wc_error)) => Some(wc_error.code),
272272
_ => None,
273273
}
274-
.or_else(|| self.source.as_ref().and_then(|s| s.code()))
274+
.or_else(|| self.source.as_ref().and_then(|s| s.sdam_code()))
275+
}
276+
277+
/// Gets the code from this error.
278+
#[allow(unused)]
279+
pub(crate) fn code(&self) -> Option<i32> {
280+
match self.kind.as_ref() {
281+
ErrorKind::Command(command_error) => Some(command_error.code),
282+
ErrorKind::BulkWrite(BulkWriteFailure {
283+
write_concern_error: Some(wc_error),
284+
..
285+
}) => Some(wc_error.code),
286+
ErrorKind::Write(e) => Some(e.code()),
287+
_ => None,
288+
}
289+
.or_else(|| self.source.as_ref().and_then(|s| s.sdam_code()))
275290
}
276291

277292
/// Gets the message for this error, if applicable, for use in testing.
@@ -333,21 +348,21 @@ impl Error {
333348

334349
/// If this error corresponds to a "not writable primary" error as per the SDAM spec.
335350
pub(crate) fn is_notwritableprimary(&self) -> bool {
336-
self.code()
351+
self.sdam_code()
337352
.map(|code| NOTWRITABLEPRIMARY_CODES.contains(&code))
338353
.unwrap_or(false)
339354
}
340355

341356
/// If this error corresponds to a "node is recovering" error as per the SDAM spec.
342357
pub(crate) fn is_recovering(&self) -> bool {
343-
self.code()
358+
self.sdam_code()
344359
.map(|code| RECOVERING_CODES.contains(&code))
345360
.unwrap_or(false)
346361
}
347362

348363
/// If this error corresponds to a "node is shutting down" error as per the SDAM spec.
349364
pub(crate) fn is_shutting_down(&self) -> bool {
350-
self.code()
365+
self.sdam_code()
351366
.map(|code| SHUTTING_DOWN_CODES.contains(&code))
352367
.unwrap_or(false)
353368
}
@@ -361,7 +376,7 @@ impl Error {
361376
if !self.is_server_error() {
362377
return true;
363378
}
364-
let code = self.code();
379+
let code = self.sdam_code();
365380
if code == Some(43) {
366381
return true;
367382
}
@@ -388,6 +403,11 @@ impl Error {
388403
matches!(self.kind.as_ref(), ErrorKind::IncompatibleServer { .. })
389404
}
390405

406+
#[allow(unused)]
407+
pub(crate) fn is_invalid_argument(&self) -> bool {
408+
matches!(self.kind.as_ref(), ErrorKind::InvalidArgument { .. })
409+
}
410+
391411
pub(crate) fn with_source<E: Into<Option<Error>>>(mut self, source: E) -> Self {
392412
self.source = source.into().map(Box::new);
393413
self
@@ -825,6 +845,13 @@ impl WriteFailure {
825845
.into())
826846
}
827847
}
848+
849+
pub(crate) fn code(&self) -> i32 {
850+
match self {
851+
Self::WriteConcernError(e) => e.code,
852+
Self::WriteError(e) => e.code,
853+
}
854+
}
828855
}
829856

830857
/// An error that occurred during a GridFS operation.

src/gridfs/upload.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl GridFsBucket {
180180
.build();
181181
// Ignore NamespaceExists errors if the collection has already been created.
182182
if let Err(error) = self.inner.db.create_collection(coll.name(), options).await {
183-
if error.code() != Some(48) {
183+
if error.sdam_code() != Some(48) {
184184
return Err(error);
185185
}
186186
}

src/sync/db.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,28 @@ impl Database {
219219
))
220220
}
221221

222+
/// Creates a new collection with encrypted fields, automatically creating new data encryption
223+
/// keys when needed based on the configured [`CreateCollectionOptions::encrypted_fields`].
224+
///
225+
/// Returns the potentially updated `encrypted_fields` along with status, as keys may have been
226+
/// created even when a failure occurs.
227+
///
228+
/// Does not affect any auto encryption settings on existing MongoClients that are already
229+
/// configured with auto encryption.
230+
#[cfg(feature = "in-use-encryption-unstable")]
231+
pub fn create_encrypted_collection(
232+
&self,
233+
ce: &crate::client_encryption::ClientEncryption,
234+
name: impl AsRef<str>,
235+
options: impl Into<Option<CreateCollectionOptions>>,
236+
master_key: crate::client_encryption::MasterKey,
237+
) -> (Document, Result<()>) {
238+
runtime::block_on(
239+
self.async_database
240+
.create_encrypted_collection(ce, name, options, master_key),
241+
)
242+
}
243+
222244
/// Runs a database-level command.
223245
///
224246
/// Note that no inspection is done on `doc`, so the command will not use the database's default

0 commit comments

Comments
 (0)