From b76b5d86c58543e05ff748117396d9c03d29a06b Mon Sep 17 00:00:00 2001
From: kuronyago
Date: Wed, 8 Apr 2020 11:02:03 +0300
Subject: [PATCH 01/70] sync README with rsocket v0.5.1
---
rsocket-transport-websocket/README.md | 67 ++++++++++++++++-----------
1 file changed, 41 insertions(+), 26 deletions(-)
diff --git a/rsocket-transport-websocket/README.md b/rsocket-transport-websocket/README.md
index e57f9cc..ed040f4 100644
--- a/rsocket-transport-websocket/README.md
+++ b/rsocket-transport-websocket/README.md
@@ -6,35 +6,41 @@ Add dependencies in your `Cargo.toml`.
```toml
[dependencies]
-tokio = "0.2.11"
-rsocket_rust = "0.5.0"
-
-# choose transport:
-# rsocket_rust_transport_tcp = "0.5.0"
-# rsocket_rust_transport_websocket = "0.5.0"
+tokio = "0.2.16"
+rsocket_rust = "0.5.1"
+rsocket_rust_transport_websocket = "0.5.1"
```
### Server
```rust
-use rsocket_rust::prelude::*;
+use log::info;
+use rsocket_rust::prelude::{EchoRSocket, RSocketFactory, ServerResponder};
use rsocket_rust_transport_websocket::WebsocketServerTransport;
-use std::env;
use std::error::Error;
#[tokio::main]
async fn main() -> Result<(), Box> {
+ let transport: WebsocketServerTransport = WebsocketServerTransport::from("127.0.0.1:8080");
+
+ let responder: ServerResponder = Box::new(|setup, _socket| {
+ info!("accept setup: {:?}", setup);
+ Ok(Box::new(EchoRSocket))
+ // Or you can reject setup
+ // Err(From::from("SETUP_NOT_ALLOW"))
+ });
+
+ let on_start: Box =
+ Box::new(|| info!("+++++++ echo server started! +++++++"));
+
RSocketFactory::receive()
- .transport(WebsocketServerTransport::from("127.0.0.1:8080"))
- .acceptor(|setup, _socket| {
- println!("accept setup: {:?}", setup)
- Ok(Box::new(EchoRSocket))
- // Or you can reject setup
- // Err(From::from("SETUP_NOT_ALLOW"))
- })
- .on_start(|| println!("+++++++ echo server started! +++++++"))
+ .transport(transport)
+ .acceptor(responder)
+ .on_start(on_start)
.serve()
- .await
+ .await?;
+
+ Ok(())
}
```
@@ -42,27 +48,36 @@ async fn main() -> Result<(), Box> {
### Client
```rust
-use rsocket_rust::prelude::*;
+use log::info;
+use rsocket_rust::prelude::{ClientResponder, EchoRSocket, Payload, RSocket, RSocketFactory};
use rsocket_rust_transport_websocket::WebsocketClientTransport;
+use std::error::Error;
#[tokio::main]
-#[test]
-async fn test() {
- let cli = RSocketFactory::connect()
- .acceptor(|| Box::new(EchoRSocket))
+async fn main() -> Result<(), Box> {
+ let responder: ClientResponder = Box::new(|| Box::new(EchoRSocket));
+
+ let client = RSocketFactory::connect()
+ .acceptor(responder)
.transport(WebsocketClientTransport::from("127.0.0.1:8080"))
.setup(Payload::from("READY!"))
.mime_type("text/plain", "text/plain")
.start()
.await
.unwrap();
- let req = Payload::builder()
+
+ let request_payload: Payload = Payload::builder()
.set_data_utf8("Hello World!")
.set_metadata_utf8("Rust")
.build();
- let res = cli.request_response(req).await.unwrap();
- println!("got: {:?}", res);
- cli.close();
+
+ let res = client.request_response(request_payload).await.unwrap();
+
+ info!("got: {:?}", res);
+
+ client.close();
+
+ Ok(())
}
```
From 6255e6fcdb763847b5205981f22954b5d29a3451 Mon Sep 17 00:00:00 2001
From: kuronyago
Date: Tue, 28 Apr 2020 14:23:55 -0700
Subject: [PATCH 02/70] fire and forget: naive implementation
---
rsocket-transport-wasm/src/misc.rs | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git a/rsocket-transport-wasm/src/misc.rs b/rsocket-transport-wasm/src/misc.rs
index 3147fc1..324790c 100644
--- a/rsocket-transport-wasm/src/misc.rs
+++ b/rsocket-transport-wasm/src/misc.rs
@@ -2,7 +2,7 @@ use super::client::WebsocketClientTransport;
use super::runtime::WASMSpawner;
use js_sys::{Promise, Uint8Array};
use rsocket_rust::prelude::*;
-use wasm_bindgen::prelude::*;
+use wasm_bindgen::prelude::{wasm_bindgen, JsValue};
use wasm_bindgen_futures::future_to_promise;
#[derive(Serialize, Deserialize)]
@@ -81,6 +81,16 @@ impl JsClient {
}
})
}
+
+ pub fn fire_and_forget(&self, request: &JsValue) -> Promise {
+ let inner = self.inner.clone();
+ let request: JsPayload = request.into_serde().unwrap();
+
+ future_to_promise(async move {
+ inner.fire_and_forget(request.into());
+ Ok(JsValue::NULL)
+ })
+ }
}
#[inline]
From 5c8a8eed6e54f98fc90042008b63d35b493fbe8f Mon Sep 17 00:00:00 2001
From: Jeffsky
Date: Sun, 12 Apr 2020 20:47:17 +0800
Subject: [PATCH 03/70] New feature: messaging.
---
Cargo.toml | 7 +-
examples/Cargo.toml | 2 +-
rsocket-messaging/Cargo.toml | 18 ++
rsocket-messaging/src/lib.rs | 7 +
rsocket-messaging/src/misc.rs | 38 +++
rsocket-messaging/src/requester.rs | 118 +++++++
rsocket-test/Cargo.toml | 7 +-
rsocket-test/tests/test_composite_metadata.rs | 19 +-
rsocket-test/tests/test_messaging.rs | 52 +++
rsocket-test/tests/test_mimes.rs | 15 +-
rsocket-transport-tcp/Cargo.toml | 4 +-
rsocket-transport-tcp/src/codec.rs | 3 +-
rsocket-transport-wasm/src/misc.rs | 2 +-
rsocket-transport-websocket/Cargo.toml | 2 +-
rsocket/Cargo.toml | 2 +-
rsocket/src/extension/composite.rs | 133 ++++----
rsocket/src/extension/mime.rs | 305 ++++++++++++++++++
rsocket/src/extension/mod.rs | 4 +-
rsocket/src/extension/routing.rs | 1 -
rsocket/src/frame/error.rs | 17 +-
rsocket/src/frame/keepalive.rs | 9 +-
rsocket/src/frame/lease.rs | 9 +-
rsocket/src/frame/metadata_push.rs | 9 +-
rsocket/src/frame/mod.rs | 4 -
rsocket/src/frame/payload.rs | 16 +-
rsocket/src/frame/request_channel.rs | 16 +-
rsocket/src/frame/request_fnf.rs | 16 +-
rsocket/src/frame/request_n.rs | 2 +-
rsocket/src/frame/request_response.rs | 16 +-
rsocket/src/frame/request_stream.rs | 16 +-
rsocket/src/frame/resume.rs | 2 +-
rsocket/src/frame/resume_ok.rs | 2 +-
rsocket/src/frame/setup.rs | 121 ++++---
rsocket/src/frame/utils.rs | 10 +-
rsocket/src/lib.rs | 1 -
rsocket/src/mime.rs | 256 ---------------
rsocket/src/payload/normal.rs | 32 +-
rsocket/src/payload/setup.rs | 32 +-
rsocket/src/transport/fragmentation.rs | 4 -
rsocket/src/transport/socket.rs | 54 +++-
40 files changed, 892 insertions(+), 491 deletions(-)
create mode 100644 rsocket-messaging/Cargo.toml
create mode 100644 rsocket-messaging/src/lib.rs
create mode 100644 rsocket-messaging/src/misc.rs
create mode 100644 rsocket-messaging/src/requester.rs
create mode 100644 rsocket-test/tests/test_messaging.rs
create mode 100644 rsocket/src/extension/mime.rs
delete mode 100644 rsocket/src/mime.rs
diff --git a/Cargo.toml b/Cargo.toml
index 064b121..97b5c80 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,12 +1,15 @@
[workspace]
members = [
+# core
"rsocket",
+# transports
"rsocket-transport-tcp",
"rsocket-transport-websocket",
"rsocket-transport-wasm",
-
-# Internal
+# extra
+"rsocket-messaging",
+# internal
"examples",
"rsocket-test",
]
diff --git a/examples/Cargo.toml b/examples/Cargo.toml
index 1e6130e..390b270 100644
--- a/examples/Cargo.toml
+++ b/examples/Cargo.toml
@@ -21,7 +21,7 @@ path = "../rsocket-transport-tcp"
path = "../rsocket-transport-websocket"
[dev-dependencies.tokio]
-version = "0.2.16"
+version = "0.2.19"
default-features = false
features = ["full"]
diff --git a/rsocket-messaging/Cargo.toml b/rsocket-messaging/Cargo.toml
new file mode 100644
index 0000000..8782ca6
--- /dev/null
+++ b/rsocket-messaging/Cargo.toml
@@ -0,0 +1,18 @@
+[package]
+name = "rsocket_rust_messaging"
+version = "0.1.0"
+authors = ["Jeffsky "]
+edition = "2018"
+
+[dependencies]
+log = "0.4.8"
+futures = "0.3.4"
+bytes = "0.5.4"
+serde = "1.0.106"
+serde_json = "1.0.52"
+serde_cbor = "0.11.1"
+hex = "0.4.2"
+
+[dependencies.rsocket_rust]
+path = "../rsocket"
+features = ["frame"]
\ No newline at end of file
diff --git a/rsocket-messaging/src/lib.rs b/rsocket-messaging/src/lib.rs
new file mode 100644
index 0000000..689dff2
--- /dev/null
+++ b/rsocket-messaging/src/lib.rs
@@ -0,0 +1,7 @@
+#[macro_use]
+extern crate log;
+
+mod misc;
+mod requester;
+
+pub use requester::{RequestSpec, Requester};
diff --git a/rsocket-messaging/src/misc.rs b/rsocket-messaging/src/misc.rs
new file mode 100644
index 0000000..a8e2bd5
--- /dev/null
+++ b/rsocket-messaging/src/misc.rs
@@ -0,0 +1,38 @@
+use bytes::{BufMut, BytesMut};
+use rsocket_rust::extension::{MimeType, MIME_APPLICATION_CBOR, MIME_APPLICATION_JSON};
+use serde::{Deserialize, Serialize};
+use std::error::Error;
+
+pub(crate) fn unmarshal<'a, T>(mime_type: &MimeType, raw: &'a [u8]) -> Result>
+where
+ T: Deserialize<'a>,
+{
+ match *mime_type {
+ MIME_APPLICATION_JSON => Ok(serde_json::from_slice(raw)?),
+ MIME_APPLICATION_CBOR => Ok(serde_cbor::from_slice(raw)?),
+ _ => panic!(""),
+ }
+}
+
+pub(crate) fn marshal(
+ mime_type: &MimeType,
+ bf: &mut BytesMut,
+ data: &T,
+) -> Result<(), Box>
+where
+ T: Sized + Serialize,
+{
+ match *mime_type {
+ MIME_APPLICATION_JSON => {
+ let raw = serde_json::to_vec(data)?;
+ bf.put_slice(&raw[..]);
+ Ok(())
+ }
+ MIME_APPLICATION_CBOR => {
+ let raw = serde_cbor::to_vec(data)?;
+ bf.put_slice(&raw[..]);
+ Ok(())
+ }
+ _ => panic!(""),
+ }
+}
diff --git a/rsocket-messaging/src/requester.rs b/rsocket-messaging/src/requester.rs
new file mode 100644
index 0000000..5218a3b
--- /dev/null
+++ b/rsocket-messaging/src/requester.rs
@@ -0,0 +1,118 @@
+use super::misc::{marshal, unmarshal};
+use bytes::{BufMut, BytesMut};
+use rsocket_rust::error::RSocketError;
+use rsocket_rust::extension::{
+ CompositeMetadata, MimeType, RoutingMetadata, MIME_APPLICATION_JSON,
+ MIME_MESSAGE_X_RSOCKET_ROUTING_V0,
+};
+use rsocket_rust::prelude::*;
+use rsocket_rust::utils::Writeable;
+use serde::{Deserialize, Serialize};
+use std::collections::LinkedList;
+use std::error::Error;
+
+pub struct Requester
+where
+ S: RSocket + Clone,
+{
+ rsocket: S,
+}
+
+pub struct RequestSpec
+where
+ S: RSocket + Clone,
+{
+ data_buf: BytesMut,
+ rsocket: S,
+ data_mime_type: MimeType,
+ metadatas: LinkedList<(MimeType, Vec)>,
+}
+
+impl Requester
+where
+ C: RSocket + Clone,
+{
+ pub fn new(rsocket: C) -> Requester {
+ Requester { rsocket }
+ }
+
+ pub fn route(&self, route: &str) -> RequestSpec {
+ let routing = RoutingMetadata::builder().push_str(route).build();
+ let mut buf = BytesMut::new();
+ routing.write_to(&mut buf);
+
+ let mut metadatas: LinkedList<(MimeType, Vec)> = Default::default();
+ metadatas.push_back((MIME_MESSAGE_X_RSOCKET_ROUTING_V0, buf.to_vec()));
+ RequestSpec {
+ data_buf: BytesMut::new(),
+ rsocket: self.rsocket.clone(),
+ data_mime_type: MIME_APPLICATION_JSON,
+ metadatas,
+ }
+ }
+}
+
+impl RequestSpec
+where
+ C: RSocket + Clone,
+{
+ pub fn metadata(&mut self, metadata: &T, mime_type: &str) -> Result<(), Box>
+ where
+ T: Sized + Serialize,
+ {
+ let mime_type = MimeType::from(mime_type);
+ let mut b = BytesMut::new();
+ marshal(&mime_type, &mut b, metadata)?;
+ self.metadatas.push_back((mime_type, b.to_vec()));
+ Ok(())
+ }
+
+ pub fn data(&mut self, data: &T) -> Result<(), Box>
+ where
+ T: Sized + Serialize,
+ {
+ marshal(&self.data_mime_type, &mut self.data_buf, data)
+ }
+
+ pub async fn retrieve_mono(&self) -> Unpacker {
+ let req = self.to_req();
+ let res = self.rsocket.request_response(req).await;
+ Unpacker {
+ mime_type: self.data_mime_type.clone(),
+ inner: res,
+ }
+ }
+
+ fn to_req(&self) -> Payload {
+ let mut b = BytesMut::new();
+ let mut c = CompositeMetadata::builder();
+ for (a, b) in self.metadatas.iter() {
+ c = c.push(a.clone(), b);
+ }
+ c.build().write_to(&mut b);
+ Payload::builder()
+ .set_metadata(b.to_vec())
+ .set_data(self.data_buf.to_vec())
+ .build()
+ }
+}
+
+pub struct Unpacker {
+ mime_type: MimeType,
+ inner: Result,
+}
+
+impl Unpacker {
+ pub fn block<'a, T>(&'a self) -> Result