Skip to content

RUST-1358 Remove most type constraints on cursor values #891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/change_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use crate::{
#[derivative(Debug)]
pub struct ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
/// The cursor to iterate over event instances.
cursor: Cursor<T>,
Expand All @@ -103,7 +103,7 @@ where

impl<T> ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
pub(crate) fn new(cursor: Cursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
let pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>> = None;
Expand All @@ -126,7 +126,7 @@ where
}

/// Update the type streamed values will be parsed as.
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
ChangeStream {
cursor: self.cursor.with_type(),
args: self.args,
Expand Down Expand Up @@ -256,7 +256,7 @@ fn get_resume_token(

impl<T> CursorStream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
loop {
Expand Down Expand Up @@ -316,7 +316,7 @@ where

impl<T> Stream for ChangeStream<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
type Item = Result<T>;

Expand Down
39 changes: 4 additions & 35 deletions src/cursor/common.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
use std::{
collections::VecDeque,
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use bson::{RawDocument, RawDocumentBuf};
use derivative::Derivative;
use futures_core::{future::BoxFuture, Future, Stream};
use serde::{de::DeserializeOwned, Deserialize};
use futures_core::{future::BoxFuture, Future};
#[cfg(test)]
use tokio::sync::oneshot;

Expand All @@ -29,7 +27,7 @@ use crate::{
/// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`.
#[derive(Derivative)]
#[derivative(Debug)]
pub(super) struct GenericCursor<P, T>
pub(super) struct GenericCursor<P>
where
P: GetMoreProvider,
{
Expand All @@ -40,10 +38,9 @@ where
/// This is an `Option` to allow it to be "taken" when the cursor is no longer needed
/// but may be resumed in the future for `SessionCursor`.
state: Option<CursorState>,
_phantom: PhantomData<T>,
}

impl<P, T> GenericCursor<P, T>
impl<P> GenericCursor<P>
where
P: GetMoreProvider,
{
Expand All @@ -64,7 +61,6 @@ where
post_batch_resume_token: None,
pinned_connection,
}),
_phantom: Default::default(),
}
}

Expand All @@ -78,7 +74,6 @@ where
provider,
client,
info,
_phantom: Default::default(),
state: state.into(),
}
}
Expand Down Expand Up @@ -192,19 +187,6 @@ where
pub(super) fn provider_mut(&mut self) -> &mut P {
&mut self.provider
}

pub(super) fn with_type<'a, D>(self) -> GenericCursor<P, D>
where
D: Deserialize<'a>,
{
GenericCursor {
client: self.client,
provider: self.provider,
info: self.info,
state: self.state,
_phantom: Default::default(),
}
}
}

pub(crate) trait CursorStream {
Expand All @@ -217,10 +199,9 @@ pub(crate) enum BatchValue {
Exhausted,
}

impl<P, T> CursorStream for GenericCursor<P, T>
impl<P> CursorStream for GenericCursor<P>
where
P: GetMoreProvider,
T: DeserializeOwned + Unpin,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
// If there is a get more in flight, check on its status.
Expand Down Expand Up @@ -300,18 +281,6 @@ where
}
}

impl<P, T> Stream for GenericCursor<P, T>
where
P: GetMoreProvider,
T: DeserializeOwned + Unpin,
{
type Item = Result<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
stream_poll_next(Pin::into_inner(self), cx)
}
}

/// A trait implemented by objects that can provide batches of documents to a cursor via the getMore
/// command.
pub(super) trait GetMoreProvider: Unpin {
Expand Down
14 changes: 7 additions & 7 deletions src/cursor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,11 +104,11 @@ pub struct Cursor<T> {
client: Client,
// `wrapped_cursor` is an `Option` so that it can be `None` for the `drop` impl for a cursor
// that's had `with_type` called; in all other circumstances it will be `Some`.
wrapped_cursor: Option<ImplicitSessionCursor<T>>,
wrapped_cursor: Option<ImplicitSessionCursor>,
drop_address: Option<ServerAddress>,
#[cfg(test)]
kill_watcher: Option<oneshot::Sender<()>>,
_phantom: std::marker::PhantomData<T>,
_phantom: std::marker::PhantomData<fn() -> T>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity, why was this change needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a gnarly little corner of the Rust typesystem: https://doc.rust-lang.org/nomicon/phantom-data.html

TL;DR changing it from PhantomData<T> to PhantomData<fn() -> T> means the compiler will no longer consider dropping Cursor<T> to drop a T, which can relax restrictions in some hypothetical situations.

}

impl<T> Cursor<T> {
Expand Down Expand Up @@ -271,7 +271,7 @@ impl<T> Cursor<T> {
{
Cursor {
client: self.client.clone(),
wrapped_cursor: self.wrapped_cursor.take().map(|c| c.with_type()),
wrapped_cursor: self.wrapped_cursor.take(),
drop_address: self.drop_address.take(),
#[cfg(test)]
kill_watcher: self.kill_watcher.take(),
Expand Down Expand Up @@ -301,7 +301,7 @@ impl<T> Cursor<T> {

impl<T> CursorStream for Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
self.wrapped_cursor.as_mut().unwrap().poll_next_in_batch(cx)
Expand All @@ -310,13 +310,13 @@ where

impl<T> Stream for Cursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// This `unwrap` is safe because `wrapped_cursor` is always `Some` outside of `drop`.
Pin::new(self.wrapped_cursor.as_mut().unwrap()).poll_next(cx)
stream_poll_next(self.wrapped_cursor.as_mut().unwrap(), cx)
}
}

Expand Down Expand Up @@ -344,7 +344,7 @@ impl<T> Drop for Cursor<T> {

/// A `GenericCursor` that optionally owns its own sessions.
/// This is to be used by cursors associated with implicit sessions.
type ImplicitSessionCursor<T> = GenericCursor<ImplicitSessionGetMoreProvider, T>;
type ImplicitSessionCursor = GenericCursor<ImplicitSessionGetMoreProvider>;

struct ImplicitSessionGetMoreResult {
get_more_result: Result<GetMoreResult>,
Expand Down
16 changes: 8 additions & 8 deletions src/cursor/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::{
GetMoreProviderResult,
PinnedConnection,
},
stream_poll_next,
BatchValue,
CursorStream,
};
Expand Down Expand Up @@ -108,7 +109,7 @@ impl<T> SessionCursor<T> {

impl<T> SessionCursor<T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
/// Retrieves a [`SessionCursorStream`] to iterate this cursor. The session provided must be the
/// same session used to create the cursor.
Expand Down Expand Up @@ -379,8 +380,7 @@ impl<T> Drop for SessionCursor<T> {

/// A `GenericCursor` that borrows its session.
/// This is to be used with cursors associated with explicit sessions borrowed from the user.
type ExplicitSessionCursor<'session, T> =
GenericCursor<ExplicitSessionGetMoreProvider<'session>, T>;
type ExplicitSessionCursor<'session> = GenericCursor<ExplicitSessionGetMoreProvider<'session>>;

/// A type that implements [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) which can be used to
/// stream the results of a [`SessionCursor`]. Returned from [`SessionCursor::stream`].
Expand All @@ -389,12 +389,12 @@ type ExplicitSessionCursor<'session, T> =
/// any further streams created from [`SessionCursor::stream`] will pick up where this one left off.
pub struct SessionCursorStream<'cursor, 'session, T = Document> {
session_cursor: &'cursor mut SessionCursor<T>,
generic_cursor: ExplicitSessionCursor<'session, T>,
generic_cursor: ExplicitSessionCursor<'session>,
}

impl<'cursor, 'session, T> SessionCursorStream<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
pub(crate) fn post_batch_resume_token(&self) -> Option<&ResumeToken> {
self.generic_cursor.post_batch_resume_token()
Expand All @@ -407,18 +407,18 @@ where

impl<'cursor, 'session, T> Stream for SessionCursorStream<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
type Item = Result<T>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.generic_cursor).poll_next(cx)
stream_poll_next(&mut self.generic_cursor, cx)
}
}

impl<'cursor, 'session, T> CursorStream for SessionCursorStream<'cursor, 'session, T>
where
T: DeserializeOwned + Unpin + Send + Sync,
T: DeserializeOwned,
{
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
self.generic_cursor.poll_next_in_batch(cx)
Expand Down