Skip to content

Commit d22dbef

Browse files
authored
Merge pull request #72 from gianalarcon/dynamic-books
2 parents 08e6376 + ce66863 commit d22dbef

File tree

7 files changed

+351
-0
lines changed

7 files changed

+351
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ members = [
1414
"poem/upload",
1515
"poem/dynamic-schema",
1616
"poem/dynamic-starwars",
17+
"poem/dynamic-books",
1718

1819
"actix-web/token-from-header",
1920
"actix-web/subscription",

models/dynamic-books/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "dynamic-books"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
async-graphql = { path = "../../..", features = ["dynamic-schema"] }
8+
slab = "0.4.2"
9+
futures-util = "0.3.0"
10+
futures-channel = "0.3.0"
11+
once_cell = "1.0"
12+
futures-timer = "3.0.2"
13+
async-stream = "0.3.0"

models/dynamic-books/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
mod model;
2+
mod simple_broker;
3+
use async_graphql::ID;
4+
5+
use std::str::FromStr;
6+
use futures_util::lock::Mutex;
7+
pub use model::schema;
8+
use slab::Slab;
9+
use std::sync::Arc;
10+
11+
#[derive(Clone)]
12+
pub struct Book {
13+
id: ID,
14+
name: String,
15+
author: String,
16+
}
17+
18+
type Storage = Arc<Mutex<Slab<Book>>>;
19+
20+
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
21+
pub enum MutationType {
22+
Created,
23+
Deleted,
24+
}
25+
26+
impl FromStr for MutationType {
27+
type Err = String; // Error type can be customized based on your needs
28+
29+
fn from_str(s: &str) -> Result<Self, Self::Err> {
30+
match s {
31+
"CREATED" => Ok(MutationType::Created),
32+
"DELETED" => Ok(MutationType::Deleted),
33+
_ => Err(format!("Invalid MutationType: {}", s)),
34+
}
35+
}
36+
}
37+
38+
#[derive(Clone)]
39+
struct BookChanged {
40+
mutation_type: MutationType,
41+
id: ID,
42+
}

models/dynamic-books/src/model.rs

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
use async_graphql::{dynamic::*, Value, ID};
2+
use futures_util::StreamExt;
3+
4+
use crate::{simple_broker::SimpleBroker, Book, BookChanged, MutationType, Storage};
5+
6+
impl<'a> From<MutationType> for FieldValue<'a> {
7+
fn from(value: MutationType) -> Self {
8+
match value {
9+
MutationType::Created => FieldValue::value("CREATED"),
10+
MutationType::Deleted => FieldValue::value("DELETED"),
11+
}
12+
}
13+
}
14+
15+
pub fn schema() -> Result<Schema, SchemaError> {
16+
let mutation_type = Enum::new("MutationType")
17+
.item(EnumItem::new("CREATED").description("New book created."))
18+
.item(EnumItem::new("DELETED").description("Current book deleted."));
19+
20+
let book = Object::new("Book")
21+
.description("A book that will be stored.")
22+
.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| {
23+
FieldFuture::new(async move {
24+
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
25+
Ok(Some(Value::from(book.id.to_owned())))
26+
})
27+
}))
28+
.field(Field::new(
29+
"name",
30+
TypeRef::named_nn(TypeRef::STRING),
31+
|ctx| {
32+
FieldFuture::new(async move {
33+
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
34+
Ok(Some(Value::from(book.name.to_owned())))
35+
})
36+
},
37+
))
38+
.field(Field::new(
39+
"author",
40+
TypeRef::named_nn(TypeRef::STRING),
41+
|ctx| {
42+
FieldFuture::new(async move {
43+
let book = ctx.parent_value.try_downcast_ref::<Book>()?;
44+
Ok(Some(Value::from(book.author.to_owned())))
45+
})
46+
},
47+
));
48+
let book_changed = Object::new("BookChanged")
49+
.field(Field::new(
50+
"mutationType",
51+
TypeRef::named_nn(mutation_type.type_name()),
52+
|ctx| {
53+
FieldFuture::new(async move {
54+
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
55+
Ok(Some(FieldValue::from(book_changed.mutation_type)))
56+
})
57+
},
58+
))
59+
.field(Field::new("id", TypeRef::named_nn(TypeRef::ID), |ctx| {
60+
FieldFuture::new(async move {
61+
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
62+
Ok(Some(Value::from(book_changed.id.to_owned())))
63+
})
64+
}))
65+
.field(Field::new(
66+
"book",
67+
TypeRef::named(book.type_name()),
68+
|ctx| {
69+
FieldFuture::new(async move {
70+
let book_changed = ctx.parent_value.try_downcast_ref::<BookChanged>()?;
71+
let id = book_changed.id.to_owned();
72+
let book_id = id.parse::<usize>()?;
73+
let store = ctx.data_unchecked::<Storage>().lock().await;
74+
let book = store.get(book_id).cloned();
75+
Ok(book.map(FieldValue::owned_any))
76+
})
77+
},
78+
));
79+
80+
let query_root = Object::new("Query")
81+
.field(Field::new(
82+
"getBooks",
83+
TypeRef::named_list(book.type_name()),
84+
|ctx| {
85+
FieldFuture::new(async move {
86+
let store = ctx.data_unchecked::<Storage>().lock().await;
87+
let books: Vec<Book> = store.iter().map(|(_, book)| book.clone()).collect();
88+
Ok(Some(FieldValue::list(
89+
books.into_iter().map(FieldValue::owned_any),
90+
)))
91+
})
92+
},
93+
))
94+
.field(
95+
Field::new("getBook", TypeRef::named(book.type_name()), |ctx| {
96+
FieldFuture::new(async move {
97+
let id = ctx.args.try_get("id")?;
98+
let book_id = match id.string() {
99+
Ok(id) => id.to_string(),
100+
Err(_) => id.u64()?.to_string(),
101+
};
102+
let book_id = book_id.parse::<usize>()?;
103+
let store = ctx.data_unchecked::<Storage>().lock().await;
104+
let book = store.get(book_id).cloned();
105+
Ok(book.map(FieldValue::owned_any))
106+
})
107+
})
108+
.argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))),
109+
);
110+
111+
let mutatation_root = Object::new("Mutation")
112+
.field(
113+
Field::new("createBook", TypeRef::named(TypeRef::ID), |ctx| {
114+
FieldFuture::new(async move {
115+
let mut store = ctx.data_unchecked::<Storage>().lock().await;
116+
let name = ctx.args.try_get("name")?;
117+
let author = ctx.args.try_get("author")?;
118+
let entry = store.vacant_entry();
119+
let id: ID = entry.key().into();
120+
let book = Book {
121+
id: id.clone(),
122+
name: name.string()?.to_string(),
123+
author: author.string()?.to_string(),
124+
};
125+
entry.insert(book);
126+
let book_mutated = BookChanged {
127+
mutation_type: MutationType::Created,
128+
id: id.clone(),
129+
};
130+
SimpleBroker::publish(book_mutated);
131+
Ok(Some(Value::from(id)))
132+
})
133+
})
134+
.argument(InputValue::new("name", TypeRef::named_nn(TypeRef::STRING)))
135+
.argument(InputValue::new(
136+
"author",
137+
TypeRef::named_nn(TypeRef::STRING),
138+
)),
139+
)
140+
.field(
141+
Field::new("deleteBook", TypeRef::named_nn(TypeRef::BOOLEAN), |ctx| {
142+
FieldFuture::new(async move {
143+
let mut store = ctx.data_unchecked::<Storage>().lock().await;
144+
let id = ctx.args.try_get("id")?;
145+
let book_id = match id.string() {
146+
Ok(id) => id.to_string(),
147+
Err(_) => id.u64()?.to_string(),
148+
};
149+
let book_id = book_id.parse::<usize>()?;
150+
if store.contains(book_id) {
151+
store.remove(book_id);
152+
let book_mutated = BookChanged {
153+
mutation_type: MutationType::Deleted,
154+
id: book_id.into(),
155+
};
156+
SimpleBroker::publish(book_mutated);
157+
Ok(Some(Value::from(true)))
158+
} else {
159+
Ok(Some(Value::from(false)))
160+
}
161+
})
162+
})
163+
.argument(InputValue::new("id", TypeRef::named_nn(TypeRef::ID))),
164+
);
165+
let subscription_root = Subscription::new("Subscription").field(SubscriptionField::new(
166+
"bookMutation",
167+
TypeRef::named_nn(book_changed.type_name()),
168+
|_| {
169+
SubscriptionFieldFuture::new(async {
170+
Ok(SimpleBroker::<BookChanged>::subscribe()
171+
.map(|book| Ok(FieldValue::owned_any(book))))
172+
})
173+
},
174+
));
175+
176+
Schema::build(
177+
query_root.type_name(),
178+
Some(mutatation_root.type_name()),
179+
Some(subscription_root.type_name()),
180+
)
181+
.register(mutation_type)
182+
.register(book)
183+
.register(book_changed)
184+
.register(query_root)
185+
.register(subscription_root)
186+
.register(mutatation_root)
187+
.data(Storage::default())
188+
.finish()
189+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
use std::{
2+
any::{Any, TypeId},
3+
collections::HashMap,
4+
marker::PhantomData,
5+
pin::Pin,
6+
sync::Mutex,
7+
task::{Context, Poll},
8+
};
9+
10+
use futures_channel::mpsc::{self, UnboundedReceiver, UnboundedSender};
11+
use futures_util::{Stream, StreamExt};
12+
use once_cell::sync::Lazy;
13+
use slab::Slab;
14+
15+
static SUBSCRIBERS: Lazy<Mutex<HashMap<TypeId, Box<dyn Any + Send>>>> = Lazy::new(Default::default);
16+
17+
struct Senders<T>(Slab<UnboundedSender<T>>);
18+
19+
struct BrokerStream<T: Sync + Send + Clone + 'static>(usize, UnboundedReceiver<T>);
20+
21+
fn with_senders<T, F, R>(f: F) -> R
22+
where
23+
T: Sync + Send + Clone + 'static,
24+
F: FnOnce(&mut Senders<T>) -> R,
25+
{
26+
let mut map = SUBSCRIBERS.lock().unwrap();
27+
let senders = map
28+
.entry(TypeId::of::<Senders<T>>())
29+
.or_insert_with(|| Box::new(Senders::<T>(Default::default())));
30+
f(senders.downcast_mut::<Senders<T>>().unwrap())
31+
}
32+
33+
impl<T: Sync + Send + Clone + 'static> Drop for BrokerStream<T> {
34+
fn drop(&mut self) {
35+
with_senders::<T, _, _>(|senders| senders.0.remove(self.0));
36+
}
37+
}
38+
39+
impl<T: Sync + Send + Clone + 'static> Stream for BrokerStream<T> {
40+
type Item = T;
41+
42+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
43+
self.1.poll_next_unpin(cx)
44+
}
45+
}
46+
47+
/// A simple broker based on memory
48+
pub struct SimpleBroker<T>(PhantomData<T>);
49+
50+
impl<T: Sync + Send + Clone + 'static> SimpleBroker<T> {
51+
/// Publish a message that all subscription streams can receive.
52+
pub fn publish(msg: T) {
53+
with_senders::<T, _, _>(|senders| {
54+
for (_, sender) in senders.0.iter_mut() {
55+
sender.start_send(msg.clone()).ok();
56+
}
57+
});
58+
}
59+
60+
/// Subscribe to the message of the specified type and returns a `Stream`.
61+
pub fn subscribe() -> impl Stream<Item = T> {
62+
with_senders::<T, _, _>(|senders| {
63+
let (tx, rx) = mpsc::unbounded();
64+
let id = senders.0.insert(tx);
65+
BrokerStream(id, rx)
66+
})
67+
}
68+
}

poem/dynamic-books/Cargo.toml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[package]
2+
name = "poem-dynamic-books"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
async-graphql = { path = "../../..", features = ["dynamic-schema"] }
8+
async-graphql-poem = { path = "../../../integrations/poem" }
9+
tokio = { version = "1.29", features = ["macros", "rt-multi-thread"] }
10+
dynamic-books = { path = "../../models/dynamic-books" }
11+
poem = "1.3.57"

poem/dynamic-books/src/main.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
use async_graphql::http::GraphiQLSource;
2+
use async_graphql_poem::{GraphQL, GraphQLSubscription};
3+
use poem::{get, handler, listener::TcpListener, web::Html, IntoResponse, Route, Server};
4+
5+
#[handler]
6+
async fn graphiql() -> impl IntoResponse {
7+
Html(
8+
GraphiQLSource::build()
9+
.endpoint("/")
10+
.subscription_endpoint("/ws")
11+
.finish(),
12+
)
13+
}
14+
15+
#[tokio::main]
16+
async fn main() {
17+
let schema = dynamic_books::schema().unwrap();
18+
let app = Route::new()
19+
.at("/", get(graphiql).post(GraphQL::new(schema.clone())))
20+
.at("/ws", get(GraphQLSubscription::new(schema)));
21+
22+
println!("GraphiQL IDE: http://localhost:8080");
23+
Server::new(TcpListener::bind("127.0.0.1:8080"))
24+
.run(app)
25+
.await
26+
.unwrap();
27+
}

0 commit comments

Comments
 (0)