Skip to content

Add example dynamic subscription books #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Aug 31, 2023
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ members = [
"poem/upload",
"poem/dynamic-schema",
"poem/dynamic-starwars",
"poem/dynamic-books",

"actix-web/token-from-header",
"actix-web/subscription",
Expand Down
13 changes: 13 additions & 0 deletions models/dynamic-books/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "dynamic-books"
version = "0.1.0"
edition = "2021"

[dependencies]
async-graphql = { path = "../../..", features = ["dynamic-schema"] }
slab = "0.4.2"
futures-util = "0.3.0"
futures-channel = "0.3.0"
once_cell = "1.0"
futures-timer = "3.0.2"
async-stream = "0.3.0"
42 changes: 42 additions & 0 deletions models/dynamic-books/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
mod model;
mod simple_broker;
use async_graphql::ID;

use std::str::FromStr;
use futures_util::lock::Mutex;
pub use model::schema;
use slab::Slab;
use std::sync::Arc;

#[derive(Clone)]
pub struct Book {
id: ID,
name: String,
author: String,
}

type Storage = Arc<Mutex<Slab<Book>>>;

#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum MutationType {
Created,
Deleted,
}

impl FromStr for MutationType {
type Err = String; // Error type can be customized based on your needs

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"CREATED" => Ok(MutationType::Created),
"DELETED" => Ok(MutationType::Deleted),
_ => Err(format!("Invalid MutationType: {}", s)),
}
}
}

#[derive(Clone)]
struct BookChanged {
mutation_type: MutationType,
id: ID,
}
189 changes: 189 additions & 0 deletions models/dynamic-books/src/model.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
use async_graphql::{dynamic::*, Value, ID};
use futures_util::StreamExt;

use crate::{simple_broker::SimpleBroker, Book, BookChanged, MutationType, Storage};

impl<'a> From<MutationType> for FieldValue<'a> {
fn from(value: MutationType) -> Self {
match value {
MutationType::Created => FieldValue::value("CREATED"),
MutationType::Deleted => FieldValue::value("DELETED"),
}
}
}

pub fn schema() -> Result<Schema, SchemaError> {
let mutation_type = Enum::new("MutationType")
.item(EnumItem::new("CREATED").description("New book created."))
.item(EnumItem::new("DELETED").description("Current book deleted."));

let book = Object::new("Book")
.description("A book that will be stored.")
.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| {
FieldFuture::new(async move {
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
Ok(Some(Value::from(book.id.to_owned())))
})
}))
.field(Field::new(
"name",
TypeRef::named_nn(TypeRef::STRING),
|ctx| {
FieldFuture::new(async move {
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
Ok(Some(Value::from(book.name.to_owned())))
})
},
))
.field(Field::new(
"author",
TypeRef::named_nn(TypeRef::STRING),
|ctx| {
FieldFuture::new(async move {
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
Ok(Some(Value::from(book.author.to_owned())))
})
},
));
let book_changed = Object::new("BookChanged")
.field(Field::new(
"mutationType",
TypeRef::named_nn(mutation_type.type_name()),
|ctx| {
FieldFuture::new(async move {
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
Ok(Some(FieldValue::from(book_changed.mutation_type)))
})
},
))
.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| {
FieldFuture::new(async move {
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
Ok(Some(Value::from(book_changed.id.to_owned())))
})
}))
.field(Field::new(
"book",
TypeRef::named(book.type_name()),
|ctx| {
FieldFuture::new(async move {
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
let id = book_changed.id.to_owned();
let book_id = id.parse::<usize>()?;
let store = ctx.data_unchecked::<Storage>().lock().await;
let book = store.get(book_id).cloned();
Ok(book.map(FieldValue::owned_any))
})
},
));

let query_root = Object::new("Query")
.field(Field::new(
"getBooks",
TypeRef::named_list(book.type_name()),
|ctx| {
FieldFuture::new(async move {
let store = ctx.data_unchecked::<Storage>().lock().await;
let books: Vec<Book> = store.iter().map(|(_, book)| book.clone()).collect();
Ok(Some(FieldValue::list(
books.into_iter().map(FieldValue::owned_any),
)))
})
},
))
.field(
Field::new("getBook", TypeRef::named(book.type_name()), |ctx| {
FieldFuture::new(async move {
let id = ctx.args.try_get("id")?;
let book_id = match id.string() {
Ok(id) => id.to_string(),
Err(_) => id.u64()?.to_string(),
};
let book_id = book_id.parse::<usize>()?;
let store = ctx.data_unchecked::<Storage>().lock().await;
let book = store.get(book_id).cloned();
Ok(book.map(FieldValue::owned_any))
})
})
.argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))),
);

let mutatation_root = Object::new("Mutation")
.field(
Field::new("createBook", TypeRef::named(TypeRef::ID), |ctx| {
FieldFuture::new(async move {
let mut store = ctx.data_unchecked::<Storage>().lock().await;
let name = ctx.args.try_get("name")?;
let author = ctx.args.try_get("author")?;
let entry = store.vacant_entry();
let id: ID = entry.key().into();
let book = Book {
id: id.clone(),
name: name.string()?.to_string(),
author: author.string()?.to_string(),
};
entry.insert(book);
let book_mutated = BookChanged {
mutation_type: MutationType::Created,
id: id.clone(),
};
SimpleBroker::publish(book_mutated);
Ok(Some(Value::from(id)))
})
})
.argument(InputValue::new("name", TypeRef::named_nn(TypeRef::STRING)))
.argument(InputValue::new(
"author",
TypeRef::named_nn(TypeRef::STRING),
)),
)
.field(
Field::new("deleteBook", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
FieldFuture::new(async move {
let mut store = ctx.data_unchecked::<Storage>().lock().await;
let id = ctx.args.try_get("id")?;
let book_id = match id.string() {
Ok(id) => id.to_string(),
Err(_) => id.u64()?.to_string(),
};
let book_id = book_id.parse::<usize>()?;
if store.contains(book_id) {
store.remove(book_id);
let book_mutated = BookChanged {
mutation_type: MutationType::Deleted,
id: book_id.into(),
};
SimpleBroker::publish(book_mutated);
Ok(Some(Value::from(true)))
} else {
Ok(Some(Value::from(false)))
}
})
})
.argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))),
);
let subscription_root = Subscription::new("Subscription").field(SubscriptionField::new(
"bookMutation",
TypeRef::named_nn(book_changed.type_name()),
|_| {
SubscriptionFieldFuture::new(async {
Ok(SimpleBroker::<BookChanged>::subscribe()
.map(|book| Ok(FieldValue::owned_any(book))))
})
},
));

Schema::build(
query_root.type_name(),
Some(mutatation_root.type_name()),
Some(subscription_root.type_name()),
)
.register(mutation_type)
.register(book)
.register(book_changed)
.register(query_root)
.register(subscription_root)
.register(mutatation_root)
.data(Storage::default())
.finish()
}
68 changes: 68 additions & 0 deletions models/dynamic-books/src/simple_broker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use std::{
any::{Any, TypeId},
collections::HashMap,
marker::PhantomData,
pin::Pin,
sync::Mutex,
task::{Context, Poll},
};

use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
use futures_util::{Stream, StreamExt};
use once_cell::sync::Lazy;
use slab::Slab;

static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);

struct Senders<T>(Slab<UnboundedSender<T>>);

struct BrokerStream<T: Sync + Send + Clone + 'static>(usize, UnboundedReceiver<T>);

fn with_senders<T, F, R>(f: F) -> R
where
T: Sync + Send + Clone + 'static,
F: FnOnce(&mut Senders<T>) -> R,
{
let mut map = SUBSCRIBERS.lock().unwrap();
let senders = map
.entry(TypeId::of::<Senders<T>>())
.or_insert_with(|| Box::new(Senders::<T>(Default::default())));
f(senders.downcast_mut::<Senders<T>>().unwrap())
}

impl<T: Sync + Send + Clone + 'static> Drop for BrokerStream<T> {
fn drop(&mut self) {
with_senders::<T, _, _>(|senders| senders.0.remove(self.0));
}
}

impl<T: Sync + Send + Clone + 'static> Stream for BrokerStream<T> {
type Item = T;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.1.poll_next_unpin(cx)
}
}

/// A simple broker based on memory
pub struct SimpleBroker<T>(PhantomData<T>);

impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
/// Publish a message that all subscription streams can receive.
pub fn publish(msg: T) {
with_senders::<T, _, _>(|senders| {
for (_, sender) in senders.0.iter_mut() {
sender.start_send(msg.clone()).ok();
}
});
}

/// Subscribe to the message of the specified type and returns a `Stream`.
pub fn subscribe() -> impl Stream<Item = T> {
with_senders::<T, _, _>(|senders| {
let (tx, rx) = mpsc::unbounded();
let id = senders.0.insert(tx);
BrokerStream(id, rx)
})
}
}
11 changes: 11 additions & 0 deletions poem/dynamic-books/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
[package]
name = "poem-dynamic-books"
version = "0.1.0"
edition = "2021"

[dependencies]
async-graphql = { path = "../../..", features = ["dynamic-schema"] }
async-graphql-poem = { path = "../../../integrations/poem" }
tokio = { version = "1.29", features = ["macros", "rt-multi-thread"] }
dynamic-books = { path = "../../models/dynamic-books" }
poem = "1.3.57"
27 changes: 27 additions & 0 deletions poem/dynamic-books/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use async_graphql::http::GraphiQLSource;
use async_graphql_poem::{GraphQL, GraphQLSubscription};
use poem::{get, handler, listener::TcpListener, web::Html, IntoResponse, Route, Server};

#[handler]
async fn graphiql() -> impl IntoResponse {
Html(
GraphiQLSource::build()
.endpoint("/")
.subscription_endpoint("/ws")
.finish(),
)
}

#[tokio::main]
async fn main() {
let schema = dynamic_books::schema().unwrap();
let app = Route::new()
.at("/", get(graphiql).post(GraphQL::new(schema.clone())))
.at("/ws", get(GraphQLSubscription::new(schema)));

println!("GraphiQL IDE: http://localhost:8080");
Server::new(TcpListener::bind("127.0.0.1:8080"))
.run(app)
.await
.unwrap();
}