@@ -13,12 +13,14 @@ import { pEvent } from 'p-event'
1313import Sinon from 'sinon'
1414import { stubInterface } from 'sinon-ts'
1515import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
16- import { Upgrader , type UpgraderInit } from '../../src/upgrader.js'
16+ import { Upgrader } from '../../src/upgrader.js'
1717import { createDefaultUpgraderComponents } from './utils.js'
18+ import type { UpgraderComponents , UpgraderInit } from '../../src/upgrader.js'
1819import type { ConnectionEncrypter , StreamMuxerFactory , MultiaddrConnection , StreamMuxer , ConnectionProtector , PeerId , SecuredConnection , Stream , StreamMuxerInit , Connection } from '@libp2p/interface'
1920import type { ConnectionManager , Registrar } from '@libp2p/interface-internal'
2021
2122describe ( 'upgrader' , ( ) => {
23+ let components : UpgraderComponents
2224 let init : UpgraderInit
2325 const encrypterProtocol = '/test-encrypter'
2426 const muxerProtocol = '/test-muxer'
@@ -36,6 +38,7 @@ describe('upgrader', () => {
3638 beforeEach ( async ( ) => {
3739 remotePeer = peerIdFromPrivateKey ( await generateKeyPair ( 'Ed25519' ) )
3840 remoteAddr = multiaddr ( `/ip4/123.123.123.123/tcp/1234/p2p/${ remotePeer } ` )
41+ components = await createDefaultUpgraderComponents ( )
3942
4043 init = {
4144 connectionEncrypters : [
@@ -76,7 +79,7 @@ describe('upgrader', () => {
7679 } )
7780
7881 it ( 'should upgrade outbound with valid muxers and crypto' , async ( ) => {
79- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , init )
82+ const upgrader = new Upgrader ( components , init )
8083 const conn = await upgrader . upgradeOutbound ( maConn , {
8184 signal : AbortSignal . timeout ( 5_000 )
8285 } )
@@ -85,7 +88,7 @@ describe('upgrader', () => {
8588 } )
8689
8790 it ( 'should upgrade outbound with only crypto' , async ( ) => {
88- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
91+ const upgrader = new Upgrader ( components , {
8992 ...init ,
9093 streamMuxers : [ ]
9194 } )
@@ -102,9 +105,10 @@ describe('upgrader', () => {
102105 const connectionProtector = stubInterface < ConnectionProtector > ( )
103106 connectionProtector . protect . callsFake ( async ( conn ) => conn )
104107
105- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( {
108+ const upgrader = new Upgrader ( {
109+ ...components ,
106110 connectionProtector
107- } ) , init )
111+ } , init )
108112
109113 await upgrader . upgradeInbound ( maConn , {
110114 signal : AbortSignal . timeout ( 5_000 )
@@ -117,9 +121,10 @@ describe('upgrader', () => {
117121 const connectionProtector = stubInterface < ConnectionProtector > ( )
118122 connectionProtector . protect . callsFake ( async ( conn ) => conn )
119123
120- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( {
124+ const upgrader = new Upgrader ( {
125+ ...components ,
121126 connectionProtector
122- } ) , init )
127+ } , init )
123128
124129 await upgrader . upgradeOutbound ( maConn , {
125130 signal : AbortSignal . timeout ( 5_000 )
@@ -129,7 +134,7 @@ describe('upgrader', () => {
129134 } )
130135
131136 it ( 'should fail inbound if crypto fails' , async ( ) => {
132- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
137+ const upgrader = new Upgrader ( components , {
133138 ...init ,
134139 connectionEncrypters : [
135140 new BoomCrypto ( )
@@ -143,7 +148,7 @@ describe('upgrader', () => {
143148 } )
144149
145150 it ( 'should fail outbound if crypto fails' , async ( ) => {
146- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
151+ const upgrader = new Upgrader ( components , {
147152 ...init ,
148153 connectionEncrypters : [
149154 new BoomCrypto ( )
@@ -157,7 +162,7 @@ describe('upgrader', () => {
157162 } )
158163
159164 it ( 'should abort if inbound upgrade is slow' , async ( ) => {
160- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
165+ const upgrader = new Upgrader ( components , {
161166 ...init ,
162167 inboundUpgradeTimeout : 100
163168 } )
@@ -174,7 +179,7 @@ describe('upgrader', () => {
174179 } )
175180
176181 it ( 'should abort by signal if inbound upgrade is slow' , async ( ) => {
177- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
182+ const upgrader = new Upgrader ( components , {
178183 ...init ,
179184 inboundUpgradeTimeout : 10000
180185 } )
@@ -212,7 +217,7 @@ describe('upgrader', () => {
212217 } )
213218
214219 it ( 'should not abort if outbound upgrade is successful' , async ( ) => {
215- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
220+ const upgrader = new Upgrader ( components , {
216221 ...init ,
217222 inboundUpgradeTimeout : 100
218223 } )
@@ -248,7 +253,7 @@ describe('upgrader', () => {
248253 } )
249254
250255 it ( 'should not abort by signal if outbound upgrade is successful' , async ( ) => {
251- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
256+ const upgrader = new Upgrader ( components , {
252257 ...init ,
253258 inboundUpgradeTimeout : 10000
254259 } )
@@ -263,7 +268,7 @@ describe('upgrader', () => {
263268 } )
264269
265270 it ( 'should abort protocol selection for slow outbound stream creation' , async ( ) => {
266- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
271+ const upgrader = new Upgrader ( components , {
267272 ...init ,
268273 streamMuxers : [
269274 stubInterface < StreamMuxerFactory > ( {
@@ -298,7 +303,7 @@ describe('upgrader', () => {
298303 it ( 'should abort stream when protocol negotiation fails on outbound stream' , async ( ) => {
299304 let stream : Stream | undefined
300305
301- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( ) , {
306+ const upgrader = new Upgrader ( components , {
302307 ...init ,
303308 streamMuxers : [
304309 stubInterface < StreamMuxerFactory > ( {
@@ -376,9 +381,10 @@ describe('upgrader', () => {
376381 protocol : encrypterProtocol
377382 } )
378383
379- const upgrader = new Upgrader ( await createDefaultUpgraderComponents ( {
384+ const upgrader = new Upgrader ( {
385+ ...components ,
380386 connectionProtector
381- } ) , {
387+ } , {
382388 ...init ,
383389 connectionEncrypters : [
384390 connectionEncrypter
@@ -617,4 +623,84 @@ describe('upgrader', () => {
617623 await expect ( conn . newStream ( protocol , opts ) ) . to . eventually . be . rejected
618624 . with . property ( 'name' , 'TooManyOutboundProtocolStreamsError' )
619625 } )
626+
627+ describe ( 'early muxer selection' , ( ) => {
628+ let earlyMuxerProtocol : string
629+ let streamMuxerFactory : StreamMuxerFactory
630+ let upgrader : Upgrader
631+ let maConn : MultiaddrConnection
632+ let encrypterProtocol : string
633+
634+ beforeEach ( async ( ) => {
635+ encrypterProtocol = '/test-encrypt-with-early'
636+ earlyMuxerProtocol = '/early-muxer'
637+ streamMuxerFactory = stubInterface < StreamMuxerFactory > ( {
638+ protocol : earlyMuxerProtocol ,
639+ createStreamMuxer : ( ) => stubInterface < StreamMuxer > ( {
640+ protocol : earlyMuxerProtocol ,
641+ sink : async ( source ) => drain ( source ) ,
642+ source : ( async function * ( ) { } ) ( )
643+ } )
644+ } )
645+
646+ upgrader = new Upgrader ( components , {
647+ connectionEncrypters : [
648+ stubInterface < ConnectionEncrypter > ( {
649+ protocol : encrypterProtocol ,
650+ secureOutbound : async ( connection ) => ( {
651+ conn : connection ,
652+ remotePeer,
653+ streamMuxer : streamMuxerFactory
654+ } ) ,
655+ secureInbound : async ( connection ) => ( {
656+ conn : connection ,
657+ remotePeer,
658+ streamMuxer : streamMuxerFactory
659+ } )
660+ } )
661+ ] ,
662+ streamMuxers : [
663+ stubInterface < StreamMuxerFactory > ( {
664+ protocol : '/late-muxer' ,
665+ createStreamMuxer : ( ) => stubInterface < StreamMuxer > ( {
666+ protocol : '/late-muxer' ,
667+ sink : async ( source ) => drain ( source ) ,
668+ source : ( async function * ( ) { } ) ( )
669+ } )
670+ } )
671+ ]
672+ } )
673+
674+ maConn = stubInterface < MultiaddrConnection > ( {
675+ remoteAddr,
676+ log : logger ( 'test' ) ,
677+ sink : async ( source ) => drain ( source ) ,
678+ source : map ( ( async function * ( ) {
679+ yield '/multistream/1.0.0\n'
680+ yield `${ encrypterProtocol } \n`
681+ } ) ( ) , str => encode . single ( uint8ArrayFromString ( str ) ) )
682+ } )
683+ } )
684+
685+ it ( 'should allow early muxer selection on inbound connection' , async ( ) => {
686+ const connectionPromise = pEvent < 'connection:open' , CustomEvent < Connection > > ( components . events , 'connection:open' )
687+
688+ await upgrader . upgradeInbound ( maConn , {
689+ signal : AbortSignal . timeout ( 5_000 )
690+ } )
691+
692+ const event = await connectionPromise
693+ const conn = event . detail
694+
695+ expect ( conn . multiplexer ) . to . equal ( earlyMuxerProtocol )
696+ } )
697+
698+ it ( 'should allow early muxer selection on outbound connection' , async ( ) => {
699+ const conn = await upgrader . upgradeOutbound ( maConn , {
700+ signal : AbortSignal . timeout ( 5_000 )
701+ } )
702+
703+ expect ( conn . multiplexer ) . to . equal ( earlyMuxerProtocol )
704+ } )
705+ } )
620706} )
0 commit comments