@@ -7,8 +7,8 @@ use byteorder::{ReadBytesExt, WriteBytesExt};
7
7
use nix:: sys:: signal:: { sigaction, SaFlags , SigAction , SigHandler , SigSet , Signal } ;
8
8
use std:: collections:: HashMap ;
9
9
use std:: net:: TcpListener ;
10
+ use std:: os:: unix:: net:: UnixListener ;
10
11
use std:: sync:: { Arc , Mutex } ;
11
- use std:: { process, thread} ;
12
12
13
13
use crate :: error;
14
14
use crate :: error:: errno:: * ;
@@ -381,6 +381,37 @@ fn dispatch_once<FsFid>(
381
381
Ok ( ( response, msg. tag ) )
382
382
}
383
383
384
+ // Just for ReadBytesExt and WriteBytesExt
385
+ trait ReadWriteBytesExt : std:: io:: Read + std:: io:: Write { }
386
+
387
+ impl < T > ReadWriteBytesExt for T where T : std:: io:: Read + std:: io:: Write { }
388
+
389
+ trait SocketListener {
390
+ fn accept_client ( & self ) -> Result < ( Box < dyn ReadWriteBytesExt + Send > , String ) > ;
391
+ }
392
+
393
+ impl SocketListener for TcpListener {
394
+ fn accept_client ( & self ) -> Result < ( Box < dyn ReadWriteBytesExt + Send > , String ) > {
395
+ let ( stream, remote) = self . accept ( ) ?;
396
+ utils:: setup_tcp_stream ( & stream) ?;
397
+
398
+ return Ok ( ( Box :: new ( stream) , remote. to_string ( ) ) ) ;
399
+ }
400
+ }
401
+
402
+ impl SocketListener for UnixListener {
403
+ fn accept_client ( & self ) -> Result < ( Box < dyn ReadWriteBytesExt + Send > , String ) > {
404
+ let ( stream, remote) = self . accept ( ) ?;
405
+ let remote = remote
406
+ . as_pathname ( )
407
+ . and_then ( std:: path:: Path :: to_str)
408
+ . unwrap_or ( ":unnamed:" )
409
+ . to_owned ( ) ;
410
+
411
+ return Ok ( ( Box :: new ( stream) , remote) ) ;
412
+ }
413
+ }
414
+
384
415
/// Start the 9P filesystem (fork child processes).
385
416
///
386
417
/// This function forks a child process to handle its 9P messages
@@ -389,12 +420,16 @@ pub fn srv<Fs: Filesystem>(filesystem: Fs, addr: &str) -> Result<()> {
389
420
let ( proto, sockaddr) =
390
421
utils:: parse_proto ( addr) . ok_or ( io_err ! ( InvalidInput , "Invalid protocol or address" ) ) ?;
391
422
392
- if proto != "tcp" {
393
- return res ! ( io_err!(
394
- InvalidInput ,
395
- format!( "Unsupported protocol: {}" , proto)
396
- ) ) ;
397
- }
423
+ let listener: Box < dyn SocketListener > = match proto {
424
+ "tcp" => Box :: new ( TcpListener :: bind ( & sockaddr[ ..] ) ?) ,
425
+ "unix" => Box :: new ( UnixListener :: bind ( & sockaddr[ ..] ) ?) ,
426
+ _ => {
427
+ return res ! ( io_err!(
428
+ InvalidInput ,
429
+ format!( "Unsupported protocol: {}" , proto)
430
+ ) ) ;
431
+ }
432
+ } ;
398
433
399
434
// Do not wait for child processes
400
435
unsafe {
@@ -404,21 +439,18 @@ pub fn srv<Fs: Filesystem>(filesystem: Fs, addr: &str) -> Result<()> {
404
439
) ?;
405
440
}
406
441
407
- let listener = TcpListener :: bind ( & sockaddr[ ..] ) ?;
408
-
409
442
loop {
410
- let ( stream, remote) = listener. accept ( ) ?;
443
+ let ( stream, remote) = listener. accept_client ( ) ?;
411
444
412
445
match nix:: unistd:: fork ( ) ? {
413
446
nix:: unistd:: ForkResult :: Parent { .. } => { }
414
447
nix:: unistd:: ForkResult :: Child => {
415
448
info ! ( "ServerProcess={} starts" , remote) ;
416
449
417
- utils:: setup_tcp_stream ( & stream) ?;
418
450
let result = ServerInstance :: new ( filesystem, stream) ?. dispatch ( ) ;
419
451
420
452
info ! ( "ServerProcess={} finished: {:?}" , remote, result) ;
421
- process:: exit ( 1 ) ;
453
+ std :: process:: exit ( 1 ) ;
422
454
}
423
455
}
424
456
}
@@ -432,30 +464,29 @@ pub fn srv_spawn<Fs: Filesystem + Send + 'static>(filesystem: Fs, addr: &str) ->
432
464
let ( proto, sockaddr) =
433
465
utils:: parse_proto ( addr) . ok_or ( io_err ! ( InvalidInput , "Invalid protocol or address" ) ) ?;
434
466
435
- if proto != "tcp" {
436
- return res ! ( io_err!(
437
- InvalidInput ,
438
- format!( "Unsupported protocol: {}" , proto)
439
- ) ) ;
440
- }
467
+ let listener: Box < dyn SocketListener > = match proto {
468
+ "tcp" => Box :: new ( TcpListener :: bind ( & sockaddr[ ..] ) ?) ,
469
+ "unix" => Box :: new ( UnixListener :: bind ( & sockaddr[ ..] ) ?) ,
470
+ _ => {
471
+ return res ! ( io_err!(
472
+ InvalidInput ,
473
+ format!( "Unsupported protocol: {}" , proto)
474
+ ) )
475
+ }
476
+ } ;
441
477
442
478
let arc_fs = Arc :: new ( Mutex :: new ( filesystem) ) ;
443
- let listener = TcpListener :: bind ( & sockaddr[ ..] ) ?;
444
479
445
480
loop {
446
- let ( stream, remote) = listener. accept ( ) ?;
447
- let ( fs , thread_name ) = ( arc_fs. clone ( ) , remote . to_string ( ) ) ;
481
+ let ( stream, remote) = listener. accept_client ( ) ?;
482
+ let fs = arc_fs. clone ( ) ;
448
483
449
- let _ = thread:: Builder :: new ( )
450
- . name ( thread_name. clone ( ) )
451
- . spawn ( move || {
452
- info ! ( "ServerThread={:?} started" , thread_name) ;
484
+ let _ = std:: thread:: Builder :: new ( ) . spawn ( move || {
485
+ info ! ( "ServerThread={:?} started" , remote) ;
453
486
454
- let result = utils:: setup_tcp_stream ( & stream)
455
- . map_err ( From :: from)
456
- . and_then ( |_| SpawnServerInstance :: new ( fs, stream) ?. dispatch ( ) ) ;
487
+ let result = SpawnServerInstance :: new ( fs, stream) . and_then ( |mut s| s. dispatch ( ) ) ;
457
488
458
- info ! ( "ServerThread={:? } finished: {:?}" , thread_name , result) ;
459
- } ) ;
489
+ info ! ( "ServerThread={} finished: {:?}" , remote , result) ;
490
+ } ) ;
460
491
}
461
492
}
0 commit comments