Skip to content

Commit b4ca639

Browse files
committed
Add windows support on async
Add support for windows named pipes to the async codebase. This refactor the codes to use a transport abstraction instead of file descriptors. Signed-off-by: Jorge Prendes <jorge.prendes@gmail.com>
1 parent a9fde18 commit b4ca639

21 files changed

+535
-285
lines changed

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ log = "0.4"
1919
byteorder = "1.3.2"
2020
thiserror = "1.0"
2121
async-trait = { version = "0.1.31", optional = true }
22+
async-stream = { version = "0.3.6", optional = true }
2223
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
2324
futures = { version = "0.3", optional = true }
2425
crossbeam = "0.8.0"
@@ -27,7 +28,7 @@ crossbeam = "0.8.0"
2728
windows-sys = {version = "0.48", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}
2829

2930
[target.'cfg(any(target_os = "linux", target_os = "android"))'.dependencies]
30-
tokio-vsock = { version = "0.4.0", optional = true }
31+
tokio-vsock = { version = "0.7.0", optional = true }
3132

3233
[build-dependencies]
3334
# lock home to avoid conflict with latest version
@@ -36,7 +37,7 @@ protobuf-codegen = "3.1.0"
3637

3738
[features]
3839
default = ["sync"]
39-
async = ["async-trait", "tokio", "futures", "tokio-vsock"]
40+
async = ["async-trait", "async-stream", "tokio", "futures", "tokio-vsock"]
4041
sync = []
4142

4243
[package.metadata.docs.rs]

Makefile

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,7 @@ build: debug
2121

2222
.PHONY: test
2323
test:
24-
ifeq ($OS,Windows_NT)
25-
# async isn't enabled for windows, don't test that feature
26-
cargo test --verbose
27-
else
2824
cargo test --all-features --verbose
29-
endif
3025

3126
.PHONY: check
3227
check:

example/async-client.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,13 @@
55

66
mod protocols;
77
mod utils;
8-
#[cfg(unix)]
98
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc};
109
use ttrpc::context::{self, Context};
11-
#[cfg(unix)]
1210
use ttrpc::r#async::Client;
1311

14-
#[cfg(windows)]
15-
fn main() {
16-
println!("This example only works on Unix-like OSes");
17-
}
18-
19-
#[cfg(unix)]
2012
#[tokio::main(flavor = "current_thread")]
2113
async fn main() {
22-
let c = Client::connect(utils::SOCK_ADDR).unwrap();
14+
let c = Client::connect(utils::SOCK_ADDR).await.unwrap();
2315
let hc = health_ttrpc::HealthClient::new(c.clone());
2416
let ac = agent_ttrpc::AgentServiceClient::new(c);
2517

example/async-server.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,14 @@ use std::sync::Arc;
1010

1111
use log::LevelFilter;
1212

13-
#[cfg(unix)]
1413
use protocols::asynchronous::{agent, agent_ttrpc, health, health_ttrpc};
15-
#[cfg(unix)]
1614
use ttrpc::asynchronous::Server;
1715

18-
#[cfg(unix)]
1916
use async_trait::async_trait;
20-
#[cfg(unix)]
21-
use tokio::signal::unix::{signal, SignalKind};
2217
use tokio::time::sleep;
2318

2419
struct HealthService;
2520

26-
#[cfg(unix)]
2721
#[async_trait]
2822
impl health_ttrpc::Health for HealthService {
2923
async fn check(
@@ -46,7 +40,6 @@ impl health_ttrpc::Health for HealthService {
4640
}
4741

4842
struct AgentService;
49-
#[cfg(unix)]
5043
#[async_trait]
5144
impl agent_ttrpc::AgentService for AgentService {
5245
async fn list_interfaces(
@@ -58,12 +51,6 @@ impl agent_ttrpc::AgentService for AgentService {
5851
}
5952
}
6053

61-
#[cfg(windows)]
62-
fn main() {
63-
println!("This example only works on Unix-like OSes");
64-
}
65-
66-
#[cfg(unix)]
6754
#[tokio::main(flavor = "current_thread")]
6855
async fn main() {
6956
simple_logging::log_to_stderr(LevelFilter::Trace);
@@ -79,12 +66,10 @@ async fn main() {
7966
.register_service(hservice)
8067
.register_service(aservice);
8168

82-
let mut hangup = signal(SignalKind::hangup()).unwrap();
83-
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
8469
server.start().await.unwrap();
8570

8671
tokio::select! {
87-
_ = hangup.recv() => {
72+
_ = utils::hangup() => {
8873
// test stop_listen -> start
8974
println!("stop listen");
9075
server.stop_listen().await;
@@ -94,7 +79,7 @@ async fn main() {
9479
// hold some time for the new test connection.
9580
sleep(std::time::Duration::from_secs(100)).await;
9681
}
97-
_ = interrupt.recv() => {
82+
_ = utils::interrupt() => {
9883
// test graceful shutdown
9984
println!("graceful shutdown");
10085
server.shutdown().await.unwrap();

example/async-stream-client.rs

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,15 @@
55

66
mod protocols;
77
mod utils;
8-
#[cfg(unix)]
98
use protocols::asynchronous::{empty, streaming, streaming_ttrpc};
109
use ttrpc::context::{self, Context};
11-
#[cfg(unix)]
1210
use ttrpc::r#async::Client;
1311

14-
#[cfg(windows)]
15-
fn main() {
16-
println!("This example only works on Unix-like OSes");
17-
}
18-
19-
#[cfg(unix)]
2012
#[tokio::main(flavor = "current_thread")]
2113
async fn main() {
2214
simple_logging::log_to_stderr(log::LevelFilter::Info);
2315

24-
let c = Client::connect(utils::SOCK_ADDR).unwrap();
16+
let c = Client::connect(utils::SOCK_ADDR).await.unwrap();
2517
let sc = streaming_ttrpc::StreamingClient::new(c);
2618

2719
let _now = std::time::Instant::now();
@@ -75,7 +67,6 @@ fn default_ctx() -> Context {
7567
ctx
7668
}
7769

78-
#[cfg(unix)]
7970
async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
8071
let echo1 = streaming::EchoPayload {
8172
seq: 1,
@@ -87,7 +78,6 @@ async fn echo_request(cli: streaming_ttrpc::StreamingClient) {
8778
assert_eq!(resp.seq, echo1.seq + 1);
8879
}
8980

90-
#[cfg(unix)]
9181
async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
9282
let mut stream = cli.echo_stream(default_ctx()).await.unwrap();
9383

@@ -110,7 +100,6 @@ async fn echo_stream(cli: streaming_ttrpc::StreamingClient) {
110100
assert!(matches!(ret, Err(ttrpc::Error::Eof)));
111101
}
112102

113-
#[cfg(unix)]
114103
async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
115104
let mut stream = cli.sum_stream(default_ctx()).await.unwrap();
116105

@@ -138,7 +127,6 @@ async fn sum_stream(cli: streaming_ttrpc::StreamingClient) {
138127
assert_eq!(ssum.num, sum.num);
139128
}
140129

141-
#[cfg(unix)]
142130
async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
143131
let expected = streaming::Sum {
144132
sum: 392,
@@ -158,7 +146,6 @@ async fn divide_stream(cli: streaming_ttrpc::StreamingClient) {
158146
assert_eq!(actual.num, expected.num);
159147
}
160148

161-
#[cfg(unix)]
162149
async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
163150
let mut stream = cli.echo_null(default_ctx()).await.unwrap();
164151

@@ -174,7 +161,6 @@ async fn echo_null(cli: streaming_ttrpc::StreamingClient) {
174161
assert_eq!(res, empty::Empty::new());
175162
}
176163

177-
#[cfg(unix)]
178164
async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
179165
let stream = cli.echo_null_stream(default_ctx()).await.unwrap();
180166

@@ -206,7 +192,6 @@ async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
206192
.unwrap();
207193
}
208194

209-
#[cfg(unix)]
210195
async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
211196
let mut stream = cli
212197
.echo_default_value(default_ctx(), &Default::default()) // send default value to verify #208
@@ -219,7 +204,6 @@ async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
219204
assert_eq!(received.msg, "");
220205
}
221206

222-
#[cfg(unix)]
223207
async fn server_send_stream(cli: streaming_ttrpc::StreamingClient) {
224208
let mut stream = cli
225209
.server_send_stream(default_ctx(), &Default::default())

example/async-stream-server.rs

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -10,20 +10,14 @@ use std::sync::Arc;
1010

1111
use log::{info, LevelFilter};
1212

13-
#[cfg(unix)]
1413
use protocols::asynchronous::{empty, streaming, streaming_ttrpc};
15-
#[cfg(unix)]
1614
use ttrpc::{asynchronous::Server, Error};
1715

18-
#[cfg(unix)]
1916
use async_trait::async_trait;
20-
#[cfg(unix)]
21-
use tokio::signal::unix::{signal, SignalKind};
2217
use tokio::time::sleep;
2318

2419
struct StreamingService;
2520

26-
#[cfg(unix)]
2721
#[async_trait]
2822
impl streaming_ttrpc::Streaming for StreamingService {
2923
async fn echo(
@@ -173,12 +167,6 @@ impl streaming_ttrpc::Streaming for StreamingService {
173167
}
174168
}
175169

176-
#[cfg(windows)]
177-
fn main() {
178-
println!("This example only works on Unix-like OSes");
179-
}
180-
181-
#[cfg(unix)]
182170
#[tokio::main(flavor = "current_thread")]
183171
async fn main() {
184172
simple_logging::log_to_stderr(LevelFilter::Info);
@@ -190,12 +178,10 @@ async fn main() {
190178
.unwrap()
191179
.register_service(service);
192180

193-
let mut hangup = signal(SignalKind::hangup()).unwrap();
194-
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
195181
server.start().await.unwrap();
196182

197183
tokio::select! {
198-
_ = hangup.recv() => {
184+
_ = utils::hangup() => {
199185
// test stop_listen -> start
200186
info!("stop listen");
201187
server.stop_listen().await;
@@ -205,7 +191,7 @@ async fn main() {
205191
// hold some time for the new test connection.
206192
sleep(std::time::Duration::from_secs(100)).await;
207193
}
208-
_ = interrupt.recv() => {
194+
_ = utils::interrupt() => {
209195
// test graceful shutdown
210196
info!("graceful shutdown");
211197
server.shutdown().await.unwrap();

example/protocols/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,5 @@
22
//
33
// SPDX-License-Identifier: Apache-2.0
44
//
5-
#[cfg(unix)]
65
pub mod asynchronous;
76
pub mod sync;

example/utils.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ pub mod resp {
6565
}
6666
}
6767

68-
#[cfg(unix)]
6968
pub mod asynchronous {
7069
use crate::protocols::asynchronous::{
7170
agent::Interfaces, health::VersionCheckResponse, types::Interface,
@@ -96,3 +95,19 @@ pub mod resp {
9695
}
9796
}
9897
}
98+
99+
pub async fn hangup() {
100+
#[cfg(unix)]
101+
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::hangup())
102+
.unwrap()
103+
.recv()
104+
.await
105+
.unwrap();
106+
107+
#[cfg(not(unix))]
108+
std::future::pending::<()>().await;
109+
}
110+
111+
pub async fn interrupt() {
112+
tokio::signal::ctrl_c().await.unwrap();
113+
}

src/asynchronous/client.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66

77
use std::collections::HashMap;
88
use std::convert::TryInto;
9+
#[cfg(unix)]
910
use std::os::unix::io::RawFd;
1011
use std::sync::atomic::{AtomicU32, Ordering};
1112
use std::sync::{Arc, Mutex};
1213

1314
use async_trait::async_trait;
1415
use tokio::{self, sync::mpsc, task};
1516

16-
use crate::common::client_connect;
1717
use crate::error::{get_rpc_status, Error, Result};
1818
use crate::proto::{
1919
Code, Codec, GenMessage, Message, MessageHeader, Request, Response, FLAG_NO_DATA,
@@ -24,9 +24,9 @@ use crate::r#async::shutdown;
2424
use crate::r#async::stream::{
2525
Kind, MessageReceiver, MessageSender, ResultReceiver, ResultSender, StreamInner,
2626
};
27-
use crate::r#async::utils;
2827

2928
use super::stream::SendingMessage;
29+
use super::transport::Socket;
3030

3131
/// A ttrpc Client (async).
3232
#[derive(Clone)]
@@ -37,15 +37,23 @@ pub struct Client {
3737
}
3838

3939
impl Client {
40-
pub fn connect(sockaddr: &str) -> Result<Client> {
41-
let fd = unsafe { client_connect(sockaddr)? };
42-
Ok(Self::new(fd))
40+
pub async fn connect(sockaddr: &str) -> Result<Client> {
41+
let socket = Socket::connect(sockaddr)
42+
.await
43+
.map_err(err_to_others_err!(e, "Socket::connect error "))?;
44+
Ok(Self::new(socket))
4345
}
4446

45-
/// Initialize a new [`Client`].
46-
pub fn new(fd: RawFd) -> Client {
47-
let stream = utils::new_unix_stream_from_raw_fd(fd);
47+
#[cfg(unix)]
48+
/// # Safety
49+
/// The file descriptor must represent a unix socket.
50+
pub unsafe fn from_raw_unix_socket_fd(fd: RawFd) -> Client {
51+
let stream = unsafe { Socket::from_raw_unix_socket_fd(fd) }.unwrap();
52+
Self::new(stream)
53+
}
4854

55+
/// Initialize a new [`Client`].
56+
pub fn new(stream: Socket) -> Client {
4957
let (req_tx, rx): (MessageSender, MessageReceiver) = mpsc::channel(100);
5058

5159
let req_map = Arc::new(Mutex::new(HashMap::new()));

0 commit comments

Comments
 (0)