Skip to content

Commit

Permalink
fix: remove old relay tags on start
Browse files Browse the repository at this point in the history
To ensure we don't think we have a reserved slot on a relay, remove
the old tags on startup.
  • Loading branch information
achingbrain committed Oct 9, 2024
1 parent 29b47ad commit c91d6c2
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 26 deletions.
30 changes: 24 additions & 6 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,10 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
'peer:connect': CustomEvent<PeerId>

/**
* This event will be triggered any time we are disconnected from another peer, regardless of
* the circumstances of that disconnection. If we happen to have multiple connections to a
* peer, this event will **only** be triggered when the last connection is closed.
* This event will be triggered any time we are disconnected from another
* peer, regardless of the circumstances of that disconnection. If we happen
* to have multiple connections to a peer, this event will **only** be
* triggered when the last connection is closed.
*
* @example
*
Expand All @@ -177,9 +178,26 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
'peer:disconnect': CustomEvent<PeerId>

/**
* This event is dispatched after a remote peer has successfully responded to the identify
* protocol. Note that for this to be emitted, both peers must have an identify service
* configured.
* When a peer tagged with `keep-alive` disconnects, we will make multiple
* attempts to reconnect to it with a backoff factor (see the connection
* manager settings for details). If these all fail, the `keep-alive` tag will
* be removed and this event will be emitted.
*
* @example
*
* ```TypeScript
* libp2p.addEventListener('peer:reconnect-failure', (event) => {
* const peerId = event.detail
* // ...
* })
* ```
*/
'peer:reconnect-failure': CustomEvent<PeerId>

/**
* This event is dispatched after a remote peer has successfully responded to
* the identify protocol. Note that for this to be emitted, both peers must
* have an identify service configured.
*
* @example
*
Expand Down
10 changes: 8 additions & 2 deletions packages/libp2p/src/connection-manager/reconnect-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class ReconnectQueue implements Startable {
private readonly retryInterval?: number
private readonly backoffFactor?: number
private readonly connectionManager: ConnectionManager
private readonly events: TypedEventTarget<Libp2pEvents>

constructor (components: ReconnectQueueComponents, init: ReconnectQueueInit = {}) {
this.log = components.logger.forComponent('libp2p:reconnect-queue')
Expand All @@ -47,11 +48,12 @@ export class ReconnectQueue implements Startable {
this.retries = init.retries ?? 5
this.backoffFactor = init.backoffFactor
this.retryInterval = init.retryInterval
this.events = components.events

components.events.addEventListener('peer:disconnect', (evt) => {
this.maybeReconnect(evt.detail)
.catch(err => {
this.log.error('failed to maybe reconnect to %p', evt.detail, err)
this.log.error('failed to maybe reconnect to %p - %e', evt.detail, err)

Check warning on line 56 in packages/libp2p/src/connection-manager/reconnect-queue.ts

View check run for this annotation

Codecov / codecov/patch

packages/libp2p/src/connection-manager/reconnect-queue.ts#L56

Added line #L56 was not covered by tests
})
})
}
Expand Down Expand Up @@ -82,7 +84,7 @@ export class ReconnectQueue implements Startable {
signal: options?.signal
})
} catch (err) {
this.log('reconnecting to %p attempt %d of %d failed', peerId, attempt, this.retries, err)
this.log('reconnecting to %p attempt %d of %d failed - %e', peerId, attempt, this.retries, err)
throw err
}
}, {
Expand All @@ -108,6 +110,10 @@ export class ReconnectQueue implements Startable {
await this.peerStore.merge(peerId, {
tags
})

this.events.safeDispatchEvent('peer:reconnect-failure', {
detail: peerId
})
})
.catch(async err => {
this.log.error('failed to remove keep-alive tag from %p - %e', peerId, err)
Expand Down
4 changes: 2 additions & 2 deletions packages/transport-circuit-relay-v2/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ export interface CircuitRelayService extends TypedEventEmitter<CircuitRelayServi

export { circuitRelayServer } from './server/index.js'
export type { CircuitRelayServerInit, CircuitRelayServerComponents } from './server/index.js'
export type { ReservationStoreInit } from './server/reservation-store.js'
export type { ReservationStoreInit as ServerReservationStoreInit } from './server/reservation-store.js'
export { circuitRelayTransport } from './transport/index.js'
export type { RelayDiscoveryComponents } from './transport/discovery.js'
export type { RelayStoreInit } from './transport/reservation-store.js'
export type { ReservationStoreInit as TransportReservationStoreInit } from './transport/reservation-store.js'
export type { CircuitRelayTransportInit, CircuitRelayTransportComponents } from './transport/index.js'

export {
Expand Down
4 changes: 2 additions & 2 deletions packages/transport-circuit-relay-v2/src/transport/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { CircuitRelayTransport } from './transport.js'
import type { RelayDiscoveryComponents } from './discovery.js'
import type { RelayStoreInit } from './reservation-store.js'
import type { ReservationStoreInit } from './reservation-store.js'
import type { Transport, Upgrader, Libp2pEvents, ConnectionGater, TypedEventTarget, PeerId, TopologyFilter } from '@libp2p/interface'
import type { AddressManager, Registrar } from '@libp2p/interface-internal'

Expand All @@ -16,7 +16,7 @@ export interface CircuitRelayTransportComponents extends RelayDiscoveryComponent
/**
* RelayConfig configures the circuit v2 relay transport.
*/
export interface CircuitRelayTransportInit extends RelayStoreInit {
export interface CircuitRelayTransportInit extends ReservationStoreInit {
/**
* The number of peers running diable relays to search for and connect to
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { DEFAULT_MAX_RESERVATION_QUEUE_LENGTH, DEFAULT_RESERVATION_COMPLETION_TI
import { HopMessage, Status } from '../pb/index.js'
import { getExpirationMilliseconds } from '../utils.js'
import type { Reservation } from '../pb/index.js'
import type { TypedEventTarget, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, PeerId, PeerStore, Startable, Metrics } from '@libp2p/interface'
import type { TypedEventTarget, Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, PeerId, PeerStore, Startable, Metrics, Peer } from '@libp2p/interface'
import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal'
import type { Filter } from '@libp2p/utils/filters'

Expand All @@ -22,7 +22,7 @@ const REFRESH_TIMEOUT = (60 * 1000) * 5
// minimum duration before which a reservation must not be refreshed
const REFRESH_TIMEOUT_MIN = 30 * 1000

export interface RelayStoreComponents {
export interface ReservationStoreComponents {
peerId: PeerId
connectionManager: ConnectionManager
transportManager: TransportManager
Expand All @@ -32,7 +32,7 @@ export interface RelayStoreComponents {
metrics?: Metrics
}

export interface RelayStoreInit {
export interface ReservationStoreInit {
/**
* Multiple relays may be discovered simultaneously - to prevent listening
* on too many relays, this value controls how many to attempt to reserve a
Expand Down Expand Up @@ -94,7 +94,7 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
private readonly log: Logger
private readonly relayFilter: Filter

constructor (components: RelayStoreComponents, init?: RelayStoreInit) {
constructor (components: ReservationStoreComponents, init?: ReservationStoreInit) {
super()

this.log = components.logger.forComponent('libp2p:circuit-relay:transport:reservation-store')
Expand All @@ -120,7 +120,7 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
// When a peer disconnects, if we had a reservation on that peer
// remove the reservation and multiaddr and maybe trigger search
// for new relays
this.events.addEventListener('peer:disconnect', (evt) => {
this.events.addEventListener('peer:reconnect-failure', (evt) => {
this.#removeRelay(evt.detail)
})
}
Expand All @@ -134,10 +134,37 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
}

afterStart (): void {
if (this.reservations.size < this.maxDiscoveredRelays) {
this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}
// remove old relay tags
void Promise.resolve()
.then(async () => {
const relayPeers: Peer[] = await this.peerStore.all({
filters: [(peer) => {
return peer.tags.has(RELAY_TAG)

Check warning on line 142 in packages/transport-circuit-relay-v2/src/transport/reservation-store.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/reservation-store.ts#L142

Added line #L142 was not covered by tests
}]
})

this.log('removing tag from %d old relays', relayPeers.length)

// remove old relay tag and redial
await Promise.all(
relayPeers.map(async peer => {
await this.peerStore.merge(peer.id, {
tags: {
[RELAY_TAG]: undefined,
[KEEP_ALIVE_TAG]: undefined
}
})
})
)

if (this.reservations.size < this.maxDiscoveredRelays) {
this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}

Check warning on line 163 in packages/transport-circuit-relay-v2/src/transport/reservation-store.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/reservation-store.ts#L161-L163

Added lines #L161 - L163 were not covered by tests
})
.catch(err => {
this.log.error(err)
})
}

stop (): void {
Expand Down Expand Up @@ -360,11 +387,23 @@ export class ReservationStore extends TypedEventEmitter<ReservationStoreEvents>
clearTimeout(existingReservation.timeout)
this.reservations.delete(peerId)

this.safeDispatchEvent('relay:removed', { detail: peerId })
// ensure we don't close the connection to the relay
this.peerStore.merge(peerId, {
tags: {
[RELAY_TAG]: undefined,
[KEEP_ALIVE_TAG]: undefined
}
})
.then(() => {
this.safeDispatchEvent('relay:removed', { detail: peerId })

Check warning on line 398 in packages/transport-circuit-relay-v2/src/transport/reservation-store.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/reservation-store.ts#L390-L398

Added lines #L390 - L398 were not covered by tests

if (this.reservations.size < this.maxDiscoveredRelays) {
this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}
if (this.reservations.size < this.maxDiscoveredRelays) {
this.log('not enough relays %d/%d', this.reservations.size, this.maxDiscoveredRelays)
this.safeDispatchEvent('relay:not-enough-relays', {})
}
})
.catch(err => {
this.log('could not update tags for relay %p - %e', peerId, err)
})

Check warning on line 407 in packages/transport-circuit-relay-v2/src/transport/reservation-store.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-circuit-relay-v2/src/transport/reservation-store.ts#L400-L407

Added lines #L400 - L407 were not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { generateKeyPair } from '@libp2p/crypto/keys'
import { TypedEventEmitter, start } from '@libp2p/interface'
import { defaultLogger } from '@libp2p/logger'
import { peerIdFromPrivateKey } from '@libp2p/peer-id'
import { expect } from 'aegir/chai'
import delay from 'delay'
import { stubInterface } from 'sinon-ts'
import { KEEP_ALIVE_TAG, RELAY_TAG } from '../../src/constants.js'
import { ReservationStore } from '../../src/transport/reservation-store.js'
import type { ComponentLogger, Libp2pEvents, Peer, PeerId, PeerStore, TypedEventTarget } from '@libp2p/interface'
import type { ConnectionManager, TransportManager } from '@libp2p/interface-internal'
import type { StubbedInstance } from 'sinon-ts'

export interface StubbedReservationStoreComponents {
peerId: PeerId
connectionManager: StubbedInstance<ConnectionManager>
transportManager: StubbedInstance<TransportManager>
peerStore: StubbedInstance<PeerStore>
events: TypedEventTarget<Libp2pEvents>
logger: ComponentLogger
}

describe('transport reservation-store', () => {
let store: ReservationStore
let components: StubbedReservationStoreComponents

beforeEach(async () => {
const privateKey = await generateKeyPair('Ed25519')

components = {
peerId: peerIdFromPrivateKey(privateKey),
connectionManager: stubInterface(),
transportManager: stubInterface(),
peerStore: stubInterface(),
events: new TypedEventEmitter(),
logger: defaultLogger()
}

store = new ReservationStore(components)
})

it('should remove relay tags on start', async () => {
const peer: Peer = {
id: peerIdFromPrivateKey(await generateKeyPair('Ed25519')),
addresses: [],
metadata: new Map(),
tags: new Map([[RELAY_TAG, { value: 1 }]]),
protocols: []
}

components.peerStore.all.resolves([peer])

await start(store)

await delay(100)

expect(components.peerStore.merge.calledWith(peer.id, {
tags: {
[RELAY_TAG]: undefined,
[KEEP_ALIVE_TAG]: undefined
}
})).to.be.true()
})
})

0 comments on commit c91d6c2

Please sign in to comment.