@@ -29,7 +29,7 @@ export interface PubSubComponents {
2929 * PubSubBaseProtocol handles the peers and connections logic for pubsub routers
3030 * and specifies the API that pubsub routers should have.
3131 */
32- export abstract class PubSubBaseProtocol < Events extends { [ s : string ] : any } = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > {
32+ export abstract class PubSubBaseProtocol < Events extends Record < string , any > = PubSubEvents > extends EventEmitter < Events > implements PubSub < Events > {
3333 public started : boolean
3434 /**
3535 * Map of topics to which peers are subscribed to
@@ -108,10 +108,8 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
108108
109109 /**
110110 * Register the pubsub protocol onto the libp2p node.
111- *
112- * @returns {void }
113111 */
114- async start ( ) {
112+ async start ( ) : Promise < void > {
115113 if ( this . started || ! this . enabled ) {
116114 return
117115 }
@@ -121,10 +119,12 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
121119 const registrar = this . components . registrar
122120 // Incoming streams
123121 // Called after a peer dials us
124- await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . handle ( multicodec , this . _onIncomingStream , {
125- maxInboundStreams : this . maxInboundStreams ,
126- maxOutboundStreams : this . maxOutboundStreams
127- } ) ) )
122+ await Promise . all ( this . multicodecs . map ( async multicodec => {
123+ await registrar . handle ( multicodec , this . _onIncomingStream , {
124+ maxInboundStreams : this . maxInboundStreams ,
125+ maxOutboundStreams : this . maxOutboundStreams
126+ } )
127+ } ) )
128128
129129 // register protocol with topology
130130 // Topology callbacks called on connection manager changes
@@ -141,7 +141,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
141141 /**
142142 * Unregister the pubsub protocol and the streams with other peers will be closed.
143143 */
144- async stop ( ) {
144+ async stop ( ) : Promise < void > {
145145 if ( ! this . started || ! this . enabled ) {
146146 return
147147 }
@@ -150,10 +150,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
150150
151151 // unregister protocol and handlers
152152 if ( this . _registrarTopologyIds != null ) {
153- this . _registrarTopologyIds ?. map ( id => registrar . unregister ( id ) )
153+ this . _registrarTopologyIds ?. forEach ( id => {
154+ registrar . unregister ( id )
155+ } )
154156 }
155157
156- await Promise . all ( this . multicodecs . map ( async multicodec => await registrar . unhandle ( multicodec ) ) )
158+ await Promise . all ( this . multicodecs . map ( async multicodec => {
159+ await registrar . unhandle ( multicodec )
160+ } ) )
157161
158162 log ( 'stopping' )
159163 for ( const peerStreams of this . peers . values ( ) ) {
@@ -166,14 +170,14 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
166170 log ( 'stopped' )
167171 }
168172
169- isStarted ( ) {
173+ isStarted ( ) : boolean {
170174 return this . started
171175 }
172176
173177 /**
174178 * On an inbound stream opened
175179 */
176- protected _onIncomingStream ( data : IncomingStreamData ) {
180+ protected _onIncomingStream ( data : IncomingStreamData ) : void {
177181 const { stream, connection } = data
178182 const peerId = connection . remotePeer
179183
@@ -186,13 +190,13 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
186190 const inboundStream = peer . attachInboundStream ( stream )
187191
188192 this . processMessages ( peerId , inboundStream , peer )
189- . catch ( err => log ( err ) )
193+ . catch ( err => { log ( err ) } )
190194 }
191195
192196 /**
193197 * Registrar notifies an established connection with pubsub protocol
194198 */
195- protected _onPeerConnected ( peerId : PeerId , conn : Connection ) {
199+ protected _onPeerConnected ( peerId : PeerId , conn : Connection ) : void {
196200 log ( 'connected %p' , peerId )
197201
198202 void Promise . resolve ( ) . then ( async ( ) => {
@@ -221,7 +225,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
221225 /**
222226 * Registrar notifies a closing connection with pubsub protocol
223227 */
224- protected _onPeerDisconnected ( peerId : PeerId , conn ?: Connection ) {
228+ protected _onPeerDisconnected ( peerId : PeerId , conn ?: Connection ) : void {
225229 const idB58Str = peerId . toString ( )
226230
227231 log ( 'connection ended' , idB58Str )
@@ -258,7 +262,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
258262 /**
259263 * Notifies the router that a peer has been disconnected
260264 */
261- protected _removePeer ( peerId : PeerId ) {
265+ protected _removePeer ( peerId : PeerId ) : PeerStreams | undefined {
262266 const peerStreams = this . peers . get ( peerId )
263267 if ( peerStreams == null ) {
264268 return
@@ -284,7 +288,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
284288 /**
285289 * Responsible for processing each RPC message received by other peers.
286290 */
287- async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) {
291+ async processMessages ( peerId : PeerId , stream : AsyncIterable < Uint8ArrayList > , peerStreams : PeerStreams ) : Promise < void > {
288292 try {
289293 await pipe (
290294 stream ,
@@ -320,7 +324,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
320324 } ) ) ,
321325 messages
322326 } )
323- . catch ( err => log ( err ) )
327+ . catch ( err => { log ( err ) } )
324328 }
325329 }
326330 )
@@ -378,7 +382,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
378382 log . error ( err )
379383 }
380384 } ) )
381- . catch ( err => log ( err ) )
385+ . catch ( err => { log ( err ) } )
382386 }
383387
384388 return true
@@ -387,7 +391,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
387391 /**
388392 * Handles a subscription change from a peer
389393 */
390- processRpcSubOpt ( id : PeerId , subOpt : PubSubRPCSubscription ) {
394+ processRpcSubOpt ( id : PeerId , subOpt : PubSubRPCSubscription ) : void {
391395 const t = subOpt . topic
392396
393397 if ( t == null ) {
@@ -412,7 +416,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
412416 /**
413417 * Handles a message from a peer
414418 */
415- async processMessage ( from : PeerId , msg : Message ) {
419+ async processMessage ( from : PeerId , msg : Message ) : Promise < void > {
416420 if ( this . components . peerId . equals ( from ) && ! this . emitSelf ) {
417421 return
418422 }
@@ -442,7 +446,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
442446 * The default msgID implementation
443447 * Child class can override this.
444448 */
445- getMsgId ( msg : Message ) {
449+ getMsgId ( msg : Message ) : Promise < Uint8Array > | Uint8Array {
446450 const signaturePolicy = this . globalSignaturePolicy
447451 switch ( signaturePolicy ) {
448452 case 'StrictSign' :
@@ -470,7 +474,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
470474 * Whether to accept a message from a peer
471475 * Override to create a graylist
472476 */
473- acceptFrom ( id : PeerId ) {
477+ acceptFrom ( id : PeerId ) : boolean {
474478 return true
475479 }
476480
@@ -495,10 +499,10 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
495499 /**
496500 * Send an rpc object to a peer
497501 */
498- send ( peer : PeerId , data : { messages ?: Message [ ] , subscriptions ?: string [ ] , subscribe ?: boolean } ) {
502+ send ( peer : PeerId , data : { messages ?: Message [ ] , subscriptions ?: string [ ] , subscribe ?: boolean } ) : void {
499503 const { messages, subscriptions, subscribe } = data
500504
501- return this . sendRpc ( peer , {
505+ this . sendRpc ( peer , {
502506 subscriptions : ( subscriptions ?? [ ] ) . map ( str => ( { topic : str , subscribe : Boolean ( subscribe ) } ) ) ,
503507 messages : ( messages ?? [ ] ) . map ( toRpcMessage )
504508 } )
@@ -507,7 +511,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
507511 /**
508512 * Send an rpc object to a peer
509513 */
510- sendRpc ( peer : PeerId , rpc : PubSubRPC ) {
514+ sendRpc ( peer : PeerId , rpc : PubSubRPC ) : void {
511515 const peerStreams = this . peers . get ( peer )
512516
513517 if ( peerStreams == null || ! peerStreams . isWritable ) {
@@ -523,7 +527,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
523527 * Validates the given message. The signature will be checked for authenticity.
524528 * Throws an error on invalid messages
525529 */
526- async validate ( from : PeerId , message : Message ) { // eslint-disable-line require-await
530+ async validate ( from : PeerId , message : Message ) : Promise < void > { // eslint-disable-line require-await
527531 const signaturePolicy = this . globalSignaturePolicy
528532 switch ( signaturePolicy ) {
529533 case 'StrictNoSign' :
@@ -671,7 +675,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
671675 /**
672676 * Subscribes to a given topic.
673677 */
674- subscribe ( topic : string ) {
678+ subscribe ( topic : string ) : void {
675679 if ( ! this . started ) {
676680 throw new Error ( 'Pubsub has not started' )
677681 }
@@ -690,7 +694,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
690694 /**
691695 * Unsubscribe from the given topic
692696 */
693- unsubscribe ( topic : string ) {
697+ unsubscribe ( topic : string ) : void {
694698 if ( ! this . started ) {
695699 throw new Error ( 'Pubsub is not started' )
696700 }
@@ -713,7 +717,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
713717 /**
714718 * Get the list of topics which the peer is subscribed to.
715719 */
716- getTopics ( ) {
720+ getTopics ( ) : string [ ] {
717721 if ( ! this . started ) {
718722 throw new Error ( 'Pubsub is not started' )
719723 }
0 commit comments