@@ -14,15 +14,13 @@ import {
1414import type { PeerId } from '@libp2p/interface-peer-id'
1515import type { IncomingStreamData } from '@libp2p/interface-registrar'
1616import type { Connection } from '@libp2p/interface-connection'
17- import type { PubSub , Message , StrictNoSign , StrictSign , PubSubInit , PubSubEvents , PeerStreams , PubSubRPCMessage , PubSubRPC , PubSubRPCSubscription , SubscriptionChangeData , PublishResult } from '@libp2p/interface-pubsub'
17+ import { PubSub , Message , StrictNoSign , StrictSign , PubSubInit , PubSubEvents , PeerStreams , PubSubRPCMessage , PubSubRPC , PubSubRPCSubscription , SubscriptionChangeData , PublishResult , TopicValidatorFn , TopicValidatorResult } from '@libp2p/interface-pubsub'
1818import { PeerMap , PeerSet } from '@libp2p/peer-collections'
1919import { Components , Initializable } from '@libp2p/components'
2020import type { Uint8ArrayList } from 'uint8arraylist'
2121
2222const log = logger ( 'libp2p:pubsub' )
2323
24- export interface TopicValidator { ( topic : string , message : Message ) : Promise < void > }
25-
2624/**
2725 * PubSubBaseProtocol handles the peers and connections logic for pubsub routers
2826 * and specifies the API that pubsub routers should have.
@@ -59,7 +57,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
5957 * Keyed by topic
6058 * Topic validators are functions with the following input:
6159 */
62- public topicValidators : Map < string , TopicValidator >
60+ public topicValidators : Map < string , TopicValidatorFn >
6361 public queue : Queue
6462 public multicodecs : string [ ]
6563 public components : Components = new Components ( )
@@ -420,7 +418,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
420418
421419 // Ensure the message is valid before processing it
422420 try {
423- await this . validate ( msg )
421+ await this . validate ( from , msg )
424422 } catch ( err : any ) {
425423 log ( 'Message is invalid, dropping it. %O' , err )
426424 return
@@ -524,7 +522,7 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
524522 * Validates the given message. The signature will be checked for authenticity.
525523 * Throws an error on invalid messages
526524 */
527- async validate ( message : Message ) { // eslint-disable-line require-await
525+ async validate ( from : PeerId , message : Message ) { // eslint-disable-line require-await
528526 const signaturePolicy = this . globalSignaturePolicy
529527 switch ( signaturePolicy ) {
530528 case 'StrictNoSign' :
@@ -570,9 +568,11 @@ export abstract class PubSubBaseProtocol<Events extends { [s: string]: any } = P
570568 }
571569
572570 const validatorFn = this . topicValidators . get ( message . topic )
573-
574571 if ( validatorFn != null ) {
575- await validatorFn ( message . topic , message )
572+ const result = await validatorFn ( from , message )
573+ if ( result === TopicValidatorResult . Reject || result === TopicValidatorResult . Ignore ) {
574+ throw errcode ( new Error ( 'Message validation failed' ) , codes . ERR_TOPIC_VALIDATOR_REJECT )
575+ }
576576 }
577577 }
578578
0 commit comments