-
Notifications
You must be signed in to change notification settings - Fork 180
RUST-1400 GridFS download methods #747
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,52 +1,72 @@ | ||
#![allow(dead_code, unused_variables)] | ||
// TODO(RUST-1395) Remove these allows. | ||
#![allow(dead_code, unused_variables)] | ||
|
||
mod download; | ||
pub mod options; | ||
|
||
use core::task::{Context, Poll}; | ||
use std::pin::Pin; | ||
use std::{ | ||
pin::Pin, | ||
sync::{atomic::AtomicBool, Arc}, | ||
}; | ||
|
||
use serde::{Deserialize, Serialize}; | ||
use tokio::io::ReadBuf; | ||
|
||
use crate::{ | ||
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef}, | ||
concern::{ReadConcern, WriteConcern}, | ||
cursor::Cursor, | ||
error::Result, | ||
selection_criteria::SelectionCriteria, | ||
options::SelectionCriteria, | ||
Collection, | ||
Database, | ||
}; | ||
use bson::{oid::ObjectId, Bson, DateTime, Document}; | ||
|
||
use options::*; | ||
use serde::{Deserialize, Serialize}; | ||
use tokio::io::ReadBuf; | ||
|
||
pub const DEFAULT_BUCKET_NAME: &str = "fs"; | ||
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024; | ||
|
||
// Contained in a "chunks" collection for each user file | ||
struct Chunk { | ||
#[derive(Debug, Deserialize, Serialize)] | ||
struct Chunk<'a> { | ||
#[serde(rename = "_id")] | ||
id: ObjectId, | ||
files_id: Bson, | ||
n: u32, | ||
// default size is 255 KiB | ||
data: Vec<u8>, | ||
#[serde(borrow)] | ||
data: RawBinaryRef<'a>, | ||
} | ||
|
||
/// A collection in which information about stored files is stored. There will be one files | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know this is existing code, but I think this docstring could be a tad clearer. For instance, this struct models the documents in the files collection, rather than being the collection itself. Also, it could be better to say something like "metadata" rather than information, and also could be useful to mention the collection in which the files are actually stored (the "chunks" collection, right?). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I left a comment about doing a later pass over documentation but it became outdated when I made some subsequent changes: #747 (comment) I think most of the documentation needs some cleaning up so I figured I'd do it in one swoop at the end (although I made a few update in this PR) -- does that sound good re the rest of your documentation comments or would you rather have them addressed here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Follow up PR sounds good to me! Sorry, yeah I missed your previous comment. |
||
/// collection document per stored file. | ||
#[derive(Serialize, Deserialize)] | ||
#[derive(Debug, Deserialize, Serialize)] | ||
#[serde(rename_all = "camelCase")] | ||
#[non_exhaustive] | ||
pub struct FilesCollectionDocument { | ||
patrickfreed marked this conversation as resolved.
Show resolved
Hide resolved
|
||
id: Bson, | ||
length: i64, | ||
chunk_size: u32, | ||
upload_date: DateTime, | ||
filename: String, | ||
metadata: Document, | ||
#[serde(rename = "_id")] | ||
pub id: Bson, | ||
pub length: u64, | ||
pub chunk_size: u32, | ||
pub upload_date: DateTime, | ||
pub filename: Option<String>, | ||
#[serde(skip_serializing_if = "Option::is_none")] | ||
pub metadata: Option<Document>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would be good to document each of these fields, similar to what we do for |
||
} | ||
|
||
#[derive(Debug)] | ||
struct GridFsBucketInner { | ||
options: GridFsBucketOptions, | ||
files: Collection<FilesCollectionDocument>, | ||
chunks: Collection<Chunk<'static>>, | ||
created_indexes: AtomicBool, | ||
} | ||
|
||
/// Struct for storing GridFS managed files within a [`Database`]. | ||
#[derive(Debug, Clone)] | ||
pub struct GridFsBucket { | ||
// Contains a "chunks" collection | ||
pub(crate) db: Database, | ||
pub(crate) options: GridFsBucketOptions, | ||
inner: Arc<GridFsBucketInner>, | ||
kmahar marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
// TODO: RUST-1395 Add documentation and example code for this struct. | ||
|
@@ -134,30 +154,67 @@ impl tokio::io::AsyncRead for GridFsDownloadStream { | |
} | ||
} | ||
|
||
impl futures_util::io::AsyncRead for GridFsDownloadStream { | ||
fn poll_read( | ||
self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &mut [u8], | ||
) -> Poll<core::result::Result<usize, futures_util::io::Error>> { | ||
todo!() | ||
impl GridFsBucket { | ||
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket { | ||
if options.read_concern.is_none() { | ||
options.read_concern = db.read_concern().cloned(); | ||
} | ||
if options.write_concern.is_none() { | ||
options.write_concern = db.write_concern().cloned(); | ||
} | ||
if options.selection_criteria.is_none() { | ||
options.selection_criteria = db.selection_criteria().cloned(); | ||
} | ||
|
||
let bucket_name = options | ||
.bucket_name | ||
.as_deref() | ||
.unwrap_or(DEFAULT_BUCKET_NAME); | ||
|
||
let files = db.collection::<FilesCollectionDocument>(&format!("{}.files", bucket_name)); | ||
let chunks = db.collection::<Chunk>(&format!("{}.chunks", bucket_name)); | ||
|
||
GridFsBucket { | ||
inner: Arc::new(GridFsBucketInner { | ||
options, | ||
files, | ||
chunks, | ||
created_indexes: AtomicBool::new(false), | ||
}), | ||
} | ||
} | ||
} | ||
|
||
impl GridFsBucket { | ||
/// Gets the read concern of the [`GridFsBucket`]. | ||
pub fn read_concern(&self) -> Option<&ReadConcern> { | ||
self.options.read_concern.as_ref() | ||
self.inner.options.read_concern.as_ref() | ||
} | ||
|
||
/// Gets the write concern of the [`GridFsBucket`]. | ||
pub fn write_concern(&self) -> Option<&WriteConcern> { | ||
self.options.write_concern.as_ref() | ||
self.inner.options.write_concern.as_ref() | ||
} | ||
|
||
/// Gets the selection criteria of the [`GridFsBucket`]. | ||
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> { | ||
self.options.selection_criteria.as_ref() | ||
self.inner.options.selection_criteria.as_ref() | ||
} | ||
|
||
/// Gets the chunk size in bytes for the [`GridFsBucket`]. | ||
fn chunk_size_bytes(&self) -> u32 { | ||
self.inner | ||
.options | ||
.chunk_size_bytes | ||
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES) | ||
} | ||
|
||
/// Gets a handle to the files collection for the [`GridFsBucket`]. | ||
fn files(&self) -> &Collection<FilesCollectionDocument> { | ||
&self.inner.files | ||
} | ||
|
||
/// Gets a handle to the chunks collection for the [`GridFsBucket`]. | ||
fn chunks(&self) -> &Collection<Chunk> { | ||
&self.inner.chunks | ||
} | ||
|
||
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. | ||
|
@@ -173,19 +230,6 @@ impl GridFsBucket { | |
todo!() | ||
} | ||
|
||
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. | ||
/// The driver generates a unique [`Bson::ObjectId`] for the file id. | ||
/// | ||
/// Returns a [`GridFsUploadStream`] to which the application will write the contents. | ||
pub async fn open_upload_stream( | ||
&self, | ||
filename: String, | ||
options: impl Into<Option<GridFsUploadOptions>>, | ||
) -> Result<GridFsUploadStream> { | ||
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) | ||
.await | ||
} | ||
|
||
/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the | ||
/// `tokio` crate's `AsyncRead` trait for the `source`. | ||
pub async fn upload_from_tokio_reader_with_id( | ||
|
@@ -244,6 +288,19 @@ impl GridFsBucket { | |
.await | ||
} | ||
|
||
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to. | ||
/// The driver generates a unique [`Bson::ObjectId`] for the file id. | ||
/// | ||
/// Returns a [`GridFsUploadStream`] to which the application will write the contents. | ||
pub async fn open_upload_stream( | ||
&self, | ||
filename: String, | ||
options: impl Into<Option<GridFsUploadOptions>>, | ||
) -> Result<GridFsUploadStream> { | ||
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options) | ||
.await | ||
} | ||
|
||
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read | ||
/// the contents of the stored file specified by `id`. | ||
pub async fn open_download_stream(&self, id: Bson) -> Result<GridFsDownloadStream> { | ||
|
@@ -261,52 +318,6 @@ impl GridFsBucket { | |
todo!() | ||
} | ||
|
||
/// Downloads the contents of the stored file specified by `id` and writes | ||
/// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite` | ||
/// trait for the `destination`. | ||
pub async fn download_to_tokio_writer( | ||
&self, | ||
id: Bson, | ||
destination: impl tokio::io::AsyncWrite, | ||
) { | ||
todo!() | ||
} | ||
|
||
/// Downloads the contents of the stored file specified by `id` and writes | ||
/// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite` | ||
/// trait for the `destination`. | ||
pub async fn download_to_futures_0_3_writer( | ||
&self, | ||
id: Bson, | ||
destination: impl futures_util::AsyncWrite, | ||
) { | ||
todo!() | ||
} | ||
|
||
/// Downloads the contents of the stored file specified by `filename` and by | ||
/// the revision in `options` and writes the contents to the `destination`. Uses the | ||
/// `tokio` crate's `AsyncWrite` trait for the `destination`. | ||
pub async fn download_to_tokio_writer_by_name( | ||
&self, | ||
filename: String, | ||
destination: impl tokio::io::AsyncWrite, | ||
options: impl Into<Option<GridFsDownloadByNameOptions>>, | ||
) { | ||
todo!() | ||
} | ||
|
||
/// Downloads the contents of the stored file specified by `filename` and by | ||
/// the revision in `options` and writes the contents to the `destination`. Uses the | ||
/// `futures-0.3` crate's `AsyncWrite` trait for the `destination`. | ||
pub async fn download_to_futures_0_3_writer_by_name( | ||
&self, | ||
filename: String, | ||
destination: impl futures_util::AsyncWrite, | ||
options: impl Into<Option<GridFsDownloadByNameOptions>>, | ||
) { | ||
todo!() | ||
} | ||
|
||
/// Given an `id`, deletes the stored file's files collection document and | ||
/// associated chunks from a [`GridFsBucket`]. | ||
pub async fn delete(&self, id: Bson) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little convoluted but the
FileNotFound
error can be returned from eitherdownload
(which has an id) ordownload_by_name
(which has a filename), and it didn't seem much better to have separate error cases for those.