1- import { createTopology } from '@libp2p/topology'
2- import { abortableSource } from 'abortable-iterator'
1+ import { CodeError } from '@libp2p/interface/errors'
32import drain from 'it-drain'
43import * as lp from 'it-length-prefixed'
54import map from 'it-map'
@@ -13,11 +12,12 @@ import { logger } from './utils/index.js'
1312import type { DefaultBitswap } from './bitswap.js'
1413import type { MultihashHasherLoader } from './index.js'
1514import type { Stats } from './stats/index.js'
16- import type { Connection } from '@libp2p/interface-connection'
17- import type { Libp2p } from '@libp2p/interface-libp2p'
18- import type { PeerId } from '@libp2p/interface-peer-id'
19- import type { PeerInfo } from '@libp2p/interface-peer-info'
20- import type { IncomingStreamData } from '@libp2p/interface-registrar'
15+ import type { Libp2p } from '@libp2p/interface'
16+ import type { Connection } from '@libp2p/interface/connection'
17+ import type { PeerId } from '@libp2p/interface/peer-id'
18+ import type { PeerInfo } from '@libp2p/interface/peer-info'
19+ import type { IncomingStreamData } from '@libp2p/interface/stream-handler'
20+ import type { Topology } from '@libp2p/interface/topology'
2121import type { AbortOptions } from '@libp2p/interfaces'
2222import type { Logger } from '@libp2p/logger'
2323import type { Multiaddr } from '@multiformats/multiaddr'
@@ -107,10 +107,10 @@ export class Network {
107107 } )
108108
109109 // register protocol with topology
110- const topology = createTopology ( {
110+ const topology : Topology = {
111111 onConnect : this . _onPeerConnect ,
112112 onDisconnect : this . _onPeerDisconnect
113- } )
113+ }
114114
115115 /** @type {string[] } */
116116 this . _registrarIds = [ ]
@@ -153,10 +153,16 @@ export class Network {
153153 const controller = new TimeoutController ( this . _incomingStreamTimeout )
154154
155155 Promise . resolve ( ) . then ( async ( ) => {
156- this . _log ( 'incoming new bitswap %s connection from %p' , stream . stat . protocol , connection . remotePeer )
156+ this . _log ( 'incoming new bitswap %s connection from %p' , stream . protocol , connection . remotePeer )
157+ const abortListener = ( ) : void => {
158+ stream . abort ( new CodeError ( 'Incoming Bitswap stream timed out' , 'ERR_TIMEOUT' ) )
159+ }
160+
161+ let signal = AbortSignal . timeout ( this . _incomingStreamTimeout )
162+ signal . addEventListener ( 'abort' , abortListener )
157163
158164 await pipe (
159- abortableSource ( stream . source , controller . signal ) ,
165+ stream ,
160166 ( source ) => lp . decode ( source ) ,
161167 async ( source ) => {
162168 for await ( const data of source ) {
@@ -169,18 +175,23 @@ export class Network {
169175 }
170176
171177 // we have received some data so reset the timeout controller
172- controller . reset ( )
178+ signal . removeEventListener ( 'abort' , abortListener )
179+ signal = AbortSignal . timeout ( this . _incomingStreamTimeout )
180+ signal . addEventListener ( 'abort' , abortListener )
173181 }
174182 }
175183 )
184+
185+ await stream . close ( {
186+ signal
187+ } )
176188 } )
177189 . catch ( err => {
178190 this . _log ( err )
179191 stream . abort ( err )
180192 } )
181193 . finally ( ( ) => {
182194 controller . clear ( )
183- stream . close ( )
184195 } )
185196 }
186197
@@ -273,7 +284,7 @@ export class Network {
273284 try {
274285 /** @type {Uint8Array } */
275286 let serialized
276- switch ( stream . stat . protocol ) {
287+ switch ( stream . protocol ) {
277288 case BITSWAP100 :
278289 serialized = msg . serializeToBitswap100 ( )
279290 break
@@ -282,19 +293,20 @@ export class Network {
282293 serialized = msg . serializeToBitswap110 ( )
283294 break
284295 default :
285- throw new Error ( `Unknown protocol: ${ stream . stat . protocol } ` )
296+ throw new Error ( `Unknown protocol: ${ stream . protocol } ` )
286297 }
287298
288299 await pipe (
289300 [ serialized ] ,
290301 ( source ) => lp . encode ( source ) ,
291302 stream
292303 )
304+
305+ await stream . close ( )
293306 } catch ( err : any ) {
294307 options . onProgress ?.( new CustomProgressEvent < { peer : PeerId , error : Error } > ( 'bitswap:network:send-wantlist:error' , { peer : peerId , error : err } ) )
295308 this . _log ( err )
296- } finally {
297- stream . close ( )
309+ stream . abort ( err )
298310 }
299311 }
300312}
0 commit comments