From c628c44c588ad7102ce9522594ac888e751f35ba Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Sun, 22 Sep 2024 19:22:23 +0200 Subject: [PATCH] fix: allow multiple ping messages (#2711) Read incoming ping messges in 32 byte chunks as per the spec and echo them back to the sending peer. If the remove fails to send us 32 bytes, reset the stream with a `TimeoutError`. --- packages/protocol-ping/package.json | 4 +- packages/protocol-ping/src/ping.ts | 69 ++++++++--------------- packages/protocol-ping/test/index.spec.ts | 29 ++++++---- 3 files changed, 43 insertions(+), 59 deletions(-) diff --git a/packages/protocol-ping/package.json b/packages/protocol-ping/package.json index 41e6b19a38..398faccf32 100644 --- a/packages/protocol-ping/package.json +++ b/packages/protocol-ping/package.json @@ -54,15 +54,13 @@ "@libp2p/interface": "^2.0.1", "@libp2p/interface-internal": "^2.0.1", "@multiformats/multiaddr": "^12.2.3", - "it-first": "^3.0.6", - "it-pipe": "^3.0.1", + "it-byte-stream": "^1.1.0", "uint8arrays": "^5.1.0" }, "devDependencies": { "@libp2p/logger": "^5.0.1", "@libp2p/peer-id": "^5.0.1", "aegir": "^44.0.1", - "it-byte-stream": "^1.0.10", "it-pair": "^2.0.6", "p-defer": "^4.0.1", "sinon-ts": "^2.0.0" diff --git a/packages/protocol-ping/src/ping.ts b/packages/protocol-ping/src/ping.ts index 5efabeeb4e..1e2a783cd4 100644 --- a/packages/protocol-ping/src/ping.ts +++ b/packages/protocol-ping/src/ping.ts @@ -1,7 +1,6 @@ import { randomBytes } from '@libp2p/crypto' -import { AbortError, InvalidMessageError, ProtocolError, TimeoutError } from '@libp2p/interface' -import first from 'it-first' -import { pipe } from 'it-pipe' +import { ProtocolError, TimeoutError } from '@libp2p/interface' +import { byteStream } from 'it-byte-stream' import { equals as uint8ArrayEquals } from 'uint8arrays/equals' import { PROTOCOL_PREFIX, PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION, TIMEOUT, MAX_INBOUND_STREAMS, MAX_OUTBOUND_STREAMS } from './constants.js' import type { PingServiceComponents, PingServiceInit, PingService as PingServiceInterface } from './index.js' @@ -60,37 +59,29 @@ export class PingService implements Startable, PingServiceInterface { const { stream } = data const start = Date.now() - - const signal = AbortSignal.timeout(this.timeout) - signal.addEventListener('abort', () => { - stream?.abort(new TimeoutError('ping timeout')) + const bytes = byteStream(stream) + + Promise.resolve().then(async () => { + while (true) { + const signal = AbortSignal.timeout(this.timeout) + signal.addEventListener('abort', () => { + stream?.abort(new TimeoutError('ping timeout')) + }) + + const buf = await bytes.read(PING_LENGTH, { + signal + }) + await bytes.write(buf, { + signal + }) + } }) - - void pipe( - stream, - async function * (source) { - let received = 0 - - for await (const buf of source) { - received += buf.byteLength - - if (received > PING_LENGTH) { - stream?.abort(new InvalidMessageError('Too much data received')) - return - } - - yield buf - } - }, - stream - ) .catch(err => { this.log.error('incoming ping from %p failed with error', data.connection.remotePeer, err) stream?.abort(err) }) .finally(() => { const ms = Date.now() - start - this.log('incoming ping from %p complete in %dms', data.connection.remotePeer, ms) }) } @@ -105,7 +96,6 @@ export class PingService implements Startable, PingServiceInterface { const data = randomBytes(PING_LENGTH) const connection = await this.components.connectionManager.openConnection(peer, options) let stream: Stream | undefined - let onAbort = (): void => {} if (options.signal == null) { const signal = AbortSignal.timeout(this.timeout) @@ -122,25 +112,15 @@ export class PingService implements Startable, PingServiceInterface { runOnLimitedConnection: this.runOnLimitedConnection }) - onAbort = () => { - stream?.abort(new AbortError()) - } - - // make stream abortable - options.signal?.addEventListener('abort', onAbort, { once: true }) + const bytes = byteStream(stream) - const result = await pipe( - [data], - stream, - async (source) => first(source) - ) + const [, result] = await Promise.all([ + bytes.write(data, options), + bytes.read(PING_LENGTH, options) + ]) const ms = Date.now() - start - if (result == null) { - throw new ProtocolError(`Did not receive a ping ack after ${ms}ms`) - } - if (!uint8ArrayEquals(data, result.subarray())) { throw new ProtocolError(`Received wrong ping ack after ${ms}ms`) } @@ -155,9 +135,8 @@ export class PingService implements Startable, PingServiceInterface { throw err } finally { - options.signal?.removeEventListener('abort', onAbort) if (stream != null) { - await stream.close() + await stream.close(options) } } } diff --git a/packages/protocol-ping/test/index.spec.ts b/packages/protocol-ping/test/index.spec.ts index 6ab997bf29..0e4902dbe8 100644 --- a/packages/protocol-ping/test/index.spec.ts +++ b/packages/protocol-ping/test/index.spec.ts @@ -43,7 +43,9 @@ describe('ping', () => { logger: defaultLogger() } - ping = new PingService(components) + ping = new PingService(components, { + timeout: 50 + }) await start(ping) }) @@ -105,17 +107,20 @@ describe('ping', () => { connection: stubInterface() }) - const input = Uint8Array.from([0, 1, 2, 3, 4]) - const b = byteStream(outgoingStream) - void b.write(input) + const input = new Uint8Array(32).fill(1) + void b.write(input) const output = await b.read() - expect(output).to.equalBytes(input) + + const input2 = new Uint8Array(32).fill(2) + void b.write(input2) + const output2 = await b.read() + expect(output2).to.equalBytes(input2) }) - it('should abort stream if too much ping data received', async () => { + it('should abort stream if sending stalls', async () => { const deferred = pDefer() const duplex = duplexPair() @@ -135,14 +140,16 @@ describe('ping', () => { connection: stubInterface() }) - const input = new Uint8Array(100) const b = byteStream(outgoingStream) - void b.read(100) - void b.write(input) + // send a ping message plus a few extra bytes + void b.write(new Uint8Array(35)) - const err = await deferred.promise + const pong = await b.read() + expect(pong).to.have.lengthOf(32) - expect(err).to.have.property('name', 'InvalidMessageError') + // never send the remaining 29 bytes (e.g. 64 - 35) + const err = await deferred.promise + expect(err).to.have.property('name', 'TimeoutError') }) })