@@ -4,7 +4,7 @@ use crate::{error::RequestError, pubsub::PubSubConnection, PubSubRpcHandler};
44use  anvil_rpc:: request:: Request ; 
55use  bytes:: BytesMut ; 
66use  futures:: { ready,  Sink ,  Stream ,  StreamExt } ; 
7- use  parity_tokio_ipc :: Endpoint ; 
7+ use  interprocess :: local_socket :: { self   as  ls ,  tokio :: prelude :: * } ; 
88use  std:: { 
99    future:: Future , 
1010    io, 
@@ -18,14 +18,14 @@ use std::{
1818pub  struct  IpcEndpoint < Handler >  { 
1919    /// the handler for the websocket connection 
2020handler :  Handler , 
21-     /// The endpoint we listen for incoming transactions  
22- endpoint :   Endpoint , 
21+     /// The path to the socket  
22+ path :   String , 
2323} 
2424
2525impl < Handler :  PubSubRpcHandler >  IpcEndpoint < Handler >  { 
2626    /// Creates a new endpoint with the given handler 
27- pub  fn  new ( handler :  Handler ,  endpoint :  String )  -> Self  { 
28-         Self  {  handler,  endpoint :   Endpoint :: new ( endpoint )  } 
27+ pub  fn  new ( handler :  Handler ,  path :  String )  -> Self  { 
28+         Self  {  handler,  path  } 
2929    } 
3030
3131    /// Returns a stream of incoming connection handlers 
@@ -34,39 +34,43 @@ impl<Handler: PubSubRpcHandler> IpcEndpoint<Handler> {
3434/// connections, See [`PubSubConnection`] that should be spawned 
3535#[ instrument( target = "ipc" ,  skip_all) ]  
3636    pub  fn  incoming ( self )  -> io:: Result < impl  Stream < Item  = impl  Future < Output  = ( ) > > >  { 
37-         let  IpcEndpoint  {  handler,  endpoint }  = self ; 
38-         trace ! ( endpoint=?endpoint. path( ) ,  "starting IPC server"  ) ; 
37+         let  IpcEndpoint  {  handler,  path }  = self ; 
38+ 
39+         trace ! ( %path,  "starting IPC server" ) ; 
3940
4041        if  cfg ! ( unix)  { 
4142            // ensure the file does not exist 
42-             if  std:: fs:: remove_file ( endpoint . path ( ) ) . is_ok ( )  { 
43-                 warn ! ( endpoint=?endpoint . path( ) ,  "removed existing file" ) ; 
43+             if  std:: fs:: remove_file ( & path) . is_ok ( )  { 
44+                 warn ! ( % path,  "removed existing file" ) ; 
4445            } 
4546        } 
4647
47-         let  connections  = match  endpoint . incoming ( )   { 
48-              Ok ( connections )  => connections , 
49-              Err ( err )  =>  { 
50-                  error ! ( %err ,   "Failed to create IPC listener" ) ; 
51-                  return   Err ( err ) 
52-             } 
53-         } ; 
48+         let  name  = to_name ( path . as_ref ( ) ) ? ; 
49+         let  listener = ls :: ListenerOptions :: new ( ) . name ( name ) . create_tokio ( ) ? ; 
50+         // TODO: https://github.com/kotauskas/interprocess/issues/64 
51+         let  connections = futures :: stream :: unfold ( listener ,  |listener|  async   move   { 
52+             let  conn = listener . accept ( ) . await ; 
53+             Some ( ( conn ,  listener ) ) 
54+         } ) ; 
5455
5556        trace ! ( "established connection listener" ) ; 
5657
57-         let  connections =  connections. filter_map ( move  |stream| { 
58+         Ok ( connections. filter_map ( move  |stream| { 
5859            let  handler = handler. clone ( ) ; 
59-             Box :: pin ( async  move  { 
60-                 if  let  Ok ( stream)  = stream { 
61-                     trace ! ( "successful incoming IPC connection" ) ; 
62-                     let  framed = tokio_util:: codec:: Decoder :: framed ( JsonRpcCodec ,  stream) ; 
63-                     Some ( PubSubConnection :: new ( IpcConn ( framed) ,  handler) ) 
64-                 }  else  { 
65-                     None 
60+             async  move  { 
61+                 match  stream { 
62+                     Ok ( stream)  => { 
63+                         trace ! ( "successful incoming IPC connection" ) ; 
64+                         let  framed = tokio_util:: codec:: Decoder :: framed ( JsonRpcCodec ,  stream) ; 
65+                         Some ( PubSubConnection :: new ( IpcConn ( framed) ,  handler) ) 
66+                     } 
67+                     Err ( err)  => { 
68+                         trace ! ( %err,  "unsuccessful incoming IPC connection" ) ; 
69+                         None 
70+                     } 
6671                } 
67-             } ) 
68-         } ) ; 
69-         Ok ( connections) 
72+             } 
73+         } ) ) 
7074    } 
7175} 
7276
@@ -118,7 +122,7 @@ where
118122
119123struct  JsonRpcCodec ; 
120124
121- // Adapted from <https://github.dev /paritytech/jsonrpc/blob/38af3c9439aa75481805edf6c05c6622a5ab1e70/server-utils/src/stream_codec.rs#L47-L105> 
125+ // Adapted from <https://github.com /paritytech/jsonrpc/blob/38af3c9439aa75481805edf6c05c6622a5ab1e70/server-utils/src/stream_codec.rs#L47-L105> 
122126impl  tokio_util:: codec:: Decoder  for  JsonRpcCodec  { 
123127    type  Item  = String ; 
124128    type  Error  = io:: Error ; 
@@ -171,3 +175,11 @@ impl tokio_util::codec::Encoder<String> for JsonRpcCodec {
171175        Ok ( ( ) ) 
172176    } 
173177} 
178+ 
179+ fn  to_name ( path :  & std:: ffi:: OsStr )  -> io:: Result < ls:: Name < ' _ > >  { 
180+     if  cfg ! ( windows)  && !path. as_encoded_bytes ( ) . starts_with ( br"\\.\pipe\" )  { 
181+         ls:: ToNsName :: to_ns_name :: < ls:: GenericNamespaced > ( path) 
182+     }  else  { 
183+         ls:: ToFsName :: to_fs_name :: < ls:: GenericFilePath > ( path) 
184+     } 
185+ } 
0 commit comments