@@ -15,10 +15,12 @@ import type {
1515import  type  {  CookieSerializeOptions  }  from  "cookie" ; 
1616import  type  {  CorsOptions ,  CorsOptionsDelegate  }  from  "cors" ; 
1717import  type  {  Duplex  }  from  "stream" ; 
18+ import  {  WebTransport  }  from  "./transports/webtransport" ; 
1819
1920const  debug  =  debugModule ( "engine" ) ; 
2021
2122const  kResponseHeaders  =  Symbol ( "responseHeaders" ) ; 
23+ const  TEXT_DECODER  =  new  TextDecoder ( ) ; 
2224
2325type  Transport  =  "polling"  |  "websocket" ; 
2426
@@ -78,7 +80,13 @@ export interface ServerOptions {
7880    fn : ( err : string  |  null  |  undefined ,  success : boolean )  =>  void 
7981  )  =>  void ; 
8082  /** 
81-    * the low-level transports that are enabled 
83+    * The low-level transports that are enabled. WebTransport is disabled by default and must be manually enabled: 
84+    * 
85+    * @example  
86+    * new Server({ 
87+    *   transports: ["polling", "websocket", "webtransport"] 
88+    * }); 
89+    * 
8290   * @default  ["polling", "websocket"] 
8391   */ 
8492  transports ?: Transport [ ] ; 
@@ -140,6 +148,17 @@ type Middleware = (
140148  next : ( err ?: any )  =>  void 
141149)  =>  void ; 
142150
151+ function  parseSessionId ( handshake : string )  { 
152+   if  ( handshake . startsWith ( "0{" ) )  { 
153+     try  { 
154+       const  parsed  =  JSON . parse ( handshake . substring ( 1 ) ) ; 
155+       if  ( typeof  parsed . sid  ===  "string" )  { 
156+         return  parsed . sid ; 
157+       } 
158+     }  catch  ( e )  { } 
159+   } 
160+ } 
161+ 
143162export  abstract  class  BaseServer  extends  EventEmitter  { 
144163  public  opts : ServerOptions ; 
145164
@@ -166,7 +185,7 @@ export abstract class BaseServer extends EventEmitter {
166185        pingInterval : 25000 , 
167186        upgradeTimeout : 10000 , 
168187        maxHttpBufferSize : 1e6 , 
169-         transports : Object . keys ( transports ) , 
188+         transports : [ "polling" ,   "websocket" ] ,   // WebTransport is disabled by default 
170189        allowUpgrades : true , 
171190        httpCompression : { 
172191          threshold : 1024 , 
@@ -245,7 +264,11 @@ export abstract class BaseServer extends EventEmitter {
245264  protected  verify ( req ,  upgrade ,  fn )  { 
246265    // transport check 
247266    const  transport  =  req . _query . transport ; 
248-     if  ( ! ~ this . opts . transports . indexOf ( transport ) )  { 
267+     // WebTransport does not go through the verify() method, see the onWebTransportSession() method 
268+     if  ( 
269+       ! ~ this . opts . transports . indexOf ( transport )  || 
270+       transport  ===  "webtransport" 
271+     )  { 
249272      debug ( 'unknown transport "%s"' ,  transport ) ; 
250273      return  fn ( Server . errors . UNKNOWN_TRANSPORT ,  {  transport } ) ; 
251274    } 
@@ -495,6 +518,85 @@ export abstract class BaseServer extends EventEmitter {
495518    return  transport ; 
496519  } 
497520
521+   public  async  onWebTransportSession ( session : any )  { 
522+     const  timeout  =  setTimeout ( ( )  =>  { 
523+       debug ( 
524+         "the client failed to establish a bidirectional stream in the given period" 
525+       ) ; 
526+       session . close ( ) ; 
527+     } ,  this . opts . upgradeTimeout ) ; 
528+ 
529+     const  streamReader  =  session . incomingBidirectionalStreams . getReader ( ) ; 
530+     const  result  =  await  streamReader . read ( ) ; 
531+ 
532+     if  ( result . done )  { 
533+       debug ( "session is closed" ) ; 
534+       return ; 
535+     } 
536+ 
537+     const  stream  =  result . value ; 
538+     const  reader  =  stream . readable . getReader ( ) ; 
539+ 
540+     // reading the first packet of the stream 
541+     const  {  value,  done }  =  await  reader . read ( ) ; 
542+     if  ( done )  { 
543+       debug ( "stream is closed" ) ; 
544+       return ; 
545+     } 
546+ 
547+     clearTimeout ( timeout ) ; 
548+     const  handshake  =  TEXT_DECODER . decode ( value ) ; 
549+ 
550+     // handshake is either 
551+     // "0" => new session 
552+     // '0{"sid":"xxxx"}' => upgrade 
553+     if  ( handshake  ===  "0" )  { 
554+       const  transport  =  new  WebTransport ( session ,  stream ,  reader ) ; 
555+ 
556+       // note: we cannot use "this.generateId()", because there is no "req" argument 
557+       const  id  =  base64id . generateId ( ) ; 
558+       debug ( 'handshaking client "%s" (WebTransport)' ,  id ) ; 
559+ 
560+       const  socket  =  new  Socket ( id ,  this ,  transport ,  null ,  4 ) ; 
561+ 
562+       this . clients [ id ]  =  socket ; 
563+       this . clientsCount ++ ; 
564+ 
565+       socket . once ( "close" ,  ( )  =>  { 
566+         delete  this . clients [ id ] ; 
567+         this . clientsCount -- ; 
568+       } ) ; 
569+ 
570+       this . emit ( "connection" ,  socket ) ; 
571+       return ; 
572+     } 
573+ 
574+     const  sid  =  parseSessionId ( handshake ) ; 
575+ 
576+     if  ( ! sid )  { 
577+       debug ( "invalid WebTransport handshake" ) ; 
578+       return  session . close ( ) ; 
579+     } 
580+ 
581+     const  client  =  this . clients [ sid ] ; 
582+ 
583+     if  ( ! client )  { 
584+       debug ( "upgrade attempt for closed client" ) ; 
585+       session . close ( ) ; 
586+     }  else  if  ( client . upgrading )  { 
587+       debug ( "transport has already been trying to upgrade" ) ; 
588+       session . close ( ) ; 
589+     }  else  if  ( client . upgraded )  { 
590+       debug ( "transport had already been upgraded" ) ; 
591+       session . close ( ) ; 
592+     }  else  { 
593+       debug ( "upgrading existing transport" ) ; 
594+ 
595+       const  transport  =  new  WebTransport ( session ,  stream ,  reader ) ; 
596+       client . maybeUpgrade ( transport ) ; 
597+     } 
598+   } 
599+ 
498600  protected  abstract  createTransport ( transportName ,  req ) ; 
499601
500602  /** 
0 commit comments