Skip to content

RUST-1400 GridFS download methods #747

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" }
chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] }
derivative = "2.1.1"
flate2 = { version = "1.0", optional = true }
futures-io = "0.3.21"
futures-core = "0.3.14"
futures-util = { version = "0.3.14", features = ["io"] }
futures-executor = "0.3.14"
Expand Down Expand Up @@ -150,7 +151,7 @@ features = ["dangerous_configuration"]

[dependencies.tokio-util]
version = "0.7.0"
features = ["io"]
features = ["io", "compat"]

[dependencies.uuid]
version = "1.1.2"
Expand All @@ -163,6 +164,7 @@ ctrlc = "3.2.2"
derive_more = "0.99.13"
function_name = "0.2.1"
futures = "0.3"
hex = "0.4"
home = "0.5"
lambda_runtime = "0.6.0"
pretty_assertions = "1.3.0"
Expand Down
2 changes: 1 addition & 1 deletion src/client/csfle/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl CryptExecutor {
State::NeedKmsCredentials => {
// TODO(RUST-1314, RUST-1417): support fetching KMS credentials.
return Err(Error::internal("KMS credentials are not yet supported"));
},
}
State::Ready => {
let (tx, rx) = oneshot::channel();
let mut thread_ctx = std::mem::replace(
Expand Down
26 changes: 2 additions & 24 deletions src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,7 @@ use crate::{
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
error::{Error, ErrorKind, Result},
gridfs::{
options::GridFsBucketOptions,
GridFsBucket,
DEFAULT_BUCKET_NAME,
DEFAULT_CHUNK_SIZE_BYTES,
},
gridfs::{options::GridFsBucketOptions, GridFsBucket},
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
options::{
AggregateOptions,
Expand Down Expand Up @@ -573,23 +568,6 @@ impl Database {

/// Creates a new GridFsBucket in the database with the given options.
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
let mut options = options.into().unwrap_or_default();
options.read_concern = options
.read_concern
.or_else(|| self.read_concern().cloned());
options.write_concern = options
.write_concern
.or_else(|| self.write_concern().cloned());
options.selection_criteria = options
.selection_criteria
.or_else(|| self.selection_criteria().cloned());
options.bucket_name = options
.bucket_name
.or_else(|| Some(DEFAULT_BUCKET_NAME.to_string()));
options.chunk_size_bytes = options.chunk_size_bytes.or(Some(DEFAULT_CHUNK_SIZE_BYTES));
GridFsBucket {
db: self.clone(),
options,
}
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
}
}
48 changes: 48 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ pub enum ErrorKind {
#[non_exhaustive]
DnsResolve { message: String },

/// A GridFS error occurred.
#[error("{0:?}")]
#[non_exhaustive]
GridFs(GridFsErrorKind),

#[error("Internal error: {message}")]
#[non_exhaustive]
Internal { message: String },
Expand Down Expand Up @@ -693,6 +698,49 @@ impl WriteFailure {
}
}

/// An error that occurred during a GridFS operation.
#[derive(Clone, Debug)]
#[allow(missing_docs)]
#[non_exhaustive]
pub enum GridFsErrorKind {
/// The file with the given identifier was not found.
#[non_exhaustive]
FileNotFound { identifier: GridFsFileIdentifier },
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little convoluted but the FileNotFound error can be returned from either download (which has an id) or download_by_name (which has a filename), and it didn't seem much better to have separate error cases for those.


/// The file with the given revision was not found.
#[non_exhaustive]
RevisionNotFound { revision: i32 },

/// The chunk at index 'n' was missing.
#[non_exhaustive]
MissingChunk { n: u32 },

/// The chunk was the incorrect size.
#[non_exhaustive]
WrongSizeChunk {
actual_size: u32,
expected_size: u32,
},

/// An incorrect number of chunks was present for the file.
#[non_exhaustive]
WrongNumberOfChunks {
actual_number: u32,
expected_number: u32,
},
}

/// An identifier for a file stored in a GridFS bucket.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum GridFsFileIdentifier {
/// The name of the file. Not guaranteed to be unique.
Filename(String),

/// The file's unique [`Bson`] ID.
Id(Bson),
}

/// Translates ErrorKind::BulkWriteError cases to ErrorKind::WriteErrors, leaving all other errors
/// untouched.
pub(crate) fn convert_bulk_errors(error: Error) -> Error {
Expand Down
191 changes: 101 additions & 90 deletions src/gridfs.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,72 @@
#![allow(dead_code, unused_variables)]
// TODO(RUST-1395) Remove these allows.
#![allow(dead_code, unused_variables)]

mod download;
pub mod options;

use core::task::{Context, Poll};
use std::pin::Pin;
use std::{
pin::Pin,
sync::{atomic::AtomicBool, Arc},
};

use serde::{Deserialize, Serialize};
use tokio::io::ReadBuf;

use crate::{
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef},
concern::{ReadConcern, WriteConcern},
cursor::Cursor,
error::Result,
selection_criteria::SelectionCriteria,
options::SelectionCriteria,
Collection,
Database,
};
use bson::{oid::ObjectId, Bson, DateTime, Document};

use options::*;
use serde::{Deserialize, Serialize};
use tokio::io::ReadBuf;

pub const DEFAULT_BUCKET_NAME: &str = "fs";
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;

// Contained in a "chunks" collection for each user file
struct Chunk {
#[derive(Debug, Deserialize, Serialize)]
struct Chunk<'a> {
#[serde(rename = "_id")]
id: ObjectId,
files_id: Bson,
n: u32,
// default size is 255 KiB
data: Vec<u8>,
#[serde(borrow)]
data: RawBinaryRef<'a>,
}

/// A collection in which information about stored files is stored. There will be one files
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is existing code, but I think this docstring could be a tad clearer. For instance, this struct models the documents in the files collection, rather than being the collection itself. Also, it could be better to say something like "metadata" rather than information, and also could be useful to mention the collection in which the files are actually stored (the "chunks" collection, right?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I left a comment about doing a later pass over documentation but it became outdated when I made some subsequent changes: #747 (comment)

I think most of the documentation needs some cleaning up so I figured I'd do it in one swoop at the end (although I made a few update in this PR) -- does that sound good re the rest of your documentation comments or would you rather have them addressed here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Follow up PR sounds good to me! Sorry, yeah I missed your previous comment.

/// collection document per stored file.
#[derive(Serialize, Deserialize)]
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
#[non_exhaustive]
pub struct FilesCollectionDocument {
id: Bson,
length: i64,
chunk_size: u32,
upload_date: DateTime,
filename: String,
metadata: Document,
#[serde(rename = "_id")]
pub id: Bson,
pub length: u64,
pub chunk_size: u32,
pub upload_date: DateTime,
pub filename: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<Document>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to document each of these fields, similar to what we do for ChangeStreamEvent.

}

#[derive(Debug)]
struct GridFsBucketInner {
options: GridFsBucketOptions,
files: Collection<FilesCollectionDocument>,
chunks: Collection<Chunk<'static>>,
created_indexes: AtomicBool,
}

/// Struct for storing GridFS managed files within a [`Database`].
#[derive(Debug, Clone)]
pub struct GridFsBucket {
// Contains a "chunks" collection
pub(crate) db: Database,
pub(crate) options: GridFsBucketOptions,
inner: Arc<GridFsBucketInner>,
}

// TODO: RUST-1395 Add documentation and example code for this struct.
Expand Down Expand Up @@ -134,30 +154,67 @@ impl tokio::io::AsyncRead for GridFsDownloadStream {
}
}

impl futures_util::io::AsyncRead for GridFsDownloadStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<core::result::Result<usize, futures_util::io::Error>> {
todo!()
impl GridFsBucket {
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
if options.read_concern.is_none() {
options.read_concern = db.read_concern().cloned();
}
if options.write_concern.is_none() {
options.write_concern = db.write_concern().cloned();
}
if options.selection_criteria.is_none() {
options.selection_criteria = db.selection_criteria().cloned();
}

let bucket_name = options
.bucket_name
.as_deref()
.unwrap_or(DEFAULT_BUCKET_NAME);

let files = db.collection::<FilesCollectionDocument>(&format!("{}.files", bucket_name));
let chunks = db.collection::<Chunk>(&format!("{}.chunks", bucket_name));

GridFsBucket {
inner: Arc::new(GridFsBucketInner {
options,
files,
chunks,
created_indexes: AtomicBool::new(false),
}),
}
}
}

impl GridFsBucket {
/// Gets the read concern of the [`GridFsBucket`].
pub fn read_concern(&self) -> Option<&ReadConcern> {
self.options.read_concern.as_ref()
self.inner.options.read_concern.as_ref()
}

/// Gets the write concern of the [`GridFsBucket`].
pub fn write_concern(&self) -> Option<&WriteConcern> {
self.options.write_concern.as_ref()
self.inner.options.write_concern.as_ref()
}

/// Gets the selection criteria of the [`GridFsBucket`].
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
self.options.selection_criteria.as_ref()
self.inner.options.selection_criteria.as_ref()
}

/// Gets the chunk size in bytes for the [`GridFsBucket`].
fn chunk_size_bytes(&self) -> u32 {
self.inner
.options
.chunk_size_bytes
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES)
}

/// Gets a handle to the files collection for the [`GridFsBucket`].
fn files(&self) -> &Collection<FilesCollectionDocument> {
&self.inner.files
}

/// Gets a handle to the chunks collection for the [`GridFsBucket`].
fn chunks(&self) -> &Collection<Chunk> {
&self.inner.chunks
}

/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
Expand All @@ -173,19 +230,6 @@ impl GridFsBucket {
todo!()
}

/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
///
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
pub async fn open_upload_stream(
&self,
filename: String,
options: impl Into<Option<GridFsUploadOptions>>,
) -> Result<GridFsUploadStream> {
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
.await
}

/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the
/// `tokio` crate's `AsyncRead` trait for the `source`.
pub async fn upload_from_tokio_reader_with_id(
Expand Down Expand Up @@ -244,6 +288,19 @@ impl GridFsBucket {
.await
}

/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
///
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
pub async fn open_upload_stream(
&self,
filename: String,
options: impl Into<Option<GridFsUploadOptions>>,
) -> Result<GridFsUploadStream> {
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
.await
}

/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
/// the contents of the stored file specified by `id`.
pub async fn open_download_stream(&self, id: Bson) -> Result<GridFsDownloadStream> {
Expand All @@ -261,52 +318,6 @@ impl GridFsBucket {
todo!()
}

/// Downloads the contents of the stored file specified by `id` and writes
/// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite`
/// trait for the `destination`.
pub async fn download_to_tokio_writer(
&self,
id: Bson,
destination: impl tokio::io::AsyncWrite,
) {
todo!()
}

/// Downloads the contents of the stored file specified by `id` and writes
/// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite`
/// trait for the `destination`.
pub async fn download_to_futures_0_3_writer(
&self,
id: Bson,
destination: impl futures_util::AsyncWrite,
) {
todo!()
}

/// Downloads the contents of the stored file specified by `filename` and by
/// the revision in `options` and writes the contents to the `destination`. Uses the
/// `tokio` crate's `AsyncWrite` trait for the `destination`.
pub async fn download_to_tokio_writer_by_name(
&self,
filename: String,
destination: impl tokio::io::AsyncWrite,
options: impl Into<Option<GridFsDownloadByNameOptions>>,
) {
todo!()
}

/// Downloads the contents of the stored file specified by `filename` and by
/// the revision in `options` and writes the contents to the `destination`. Uses the
/// `futures-0.3` crate's `AsyncWrite` trait for the `destination`.
pub async fn download_to_futures_0_3_writer_by_name(
&self,
filename: String,
destination: impl futures_util::AsyncWrite,
options: impl Into<Option<GridFsDownloadByNameOptions>>,
) {
todo!()
}

/// Given an `id`, deletes the stored file's files collection document and
/// associated chunks from a [`GridFsBucket`].
pub async fn delete(&self, id: Bson) {
Expand Down
Loading