diff --git a/packages/protocol-echo/package.json b/packages/protocol-echo/package.json index 8a24e9ec7e..7d29a7bf78 100644 --- a/packages/protocol-echo/package.json +++ b/packages/protocol-echo/package.json @@ -53,6 +53,8 @@ "dependencies": { "@libp2p/interface": "^2.1.3", "@libp2p/interface-internal": "^2.0.8", + "@multiformats/multiaddr": "^12.3.1", + "it-byte-stream": "^1.1.0", "it-pipe": "^3.0.1" }, "devDependencies": { diff --git a/packages/protocol-echo/src/echo.ts b/packages/protocol-echo/src/echo.ts index 88461f966e..1a531f77a2 100644 --- a/packages/protocol-echo/src/echo.ts +++ b/packages/protocol-echo/src/echo.ts @@ -1,7 +1,9 @@ +import { byteStream } from 'it-byte-stream' import { pipe } from 'it-pipe' import { PROTOCOL_NAME, PROTOCOL_VERSION } from './constants.js' import type { Echo as EchoInterface, EchoComponents, EchoInit } from './index.js' -import type { Logger, Startable } from '@libp2p/interface' +import type { AbortOptions, Logger, PeerId, Startable } from '@libp2p/interface' +import type { Multiaddr } from '@multiformats/multiaddr' /** * A simple echo stream, any data received will be sent back to the sender @@ -31,7 +33,8 @@ export class Echo implements Startable, EchoInterface { }) }, { maxInboundStreams: this.init.maxInboundStreams, - maxOutboundStreams: this.init.maxOutboundStreams + maxOutboundStreams: this.init.maxOutboundStreams, + runOnLimitedConnection: this.init.runOnLimitedConnection }) this.started = true } @@ -44,4 +47,22 @@ export class Echo implements Startable, EchoInterface { isStarted (): boolean { return this.started } + + async echo (peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array, options?: AbortOptions): Promise { + const conn = await this.components.connectionManager.openConnection(peer, options) + const stream = await conn.newStream(this.protocol, { + ...this.init, + ...options + }) + const bytes = byteStream(stream) + + const [, output] = await Promise.all([ + bytes.write(buf, options), + bytes.read(buf.byteLength, options) + ]) + + await stream.close(options) + + return output.subarray() + } } diff --git a/packages/protocol-echo/src/index.ts b/packages/protocol-echo/src/index.ts index 5a2c3f605d..a8df2eac7f 100644 --- a/packages/protocol-echo/src/index.ts +++ b/packages/protocol-echo/src/index.ts @@ -43,13 +43,15 @@ */ import { Echo as EchoClass } from './echo.js' -import type { ComponentLogger } from '@libp2p/interface' +import type { ComponentLogger, PeerId } from '@libp2p/interface' import type { ConnectionManager, Registrar } from '@libp2p/interface-internal' +import type { Multiaddr } from '@multiformats/multiaddr' export interface EchoInit { protocolPrefix?: string maxInboundStreams?: number maxOutboundStreams?: number + runOnLimitedConnection?: boolean } export interface EchoComponents { @@ -60,6 +62,7 @@ export interface EchoComponents { export interface Echo { protocol: string + echo(peer: PeerId | Multiaddr | Multiaddr[], buf: Uint8Array): Promise } export function echo (init: EchoInit = {}): (components: EchoComponents) => Echo { diff --git a/packages/protocol-echo/test/index.spec.ts b/packages/protocol-echo/test/index.spec.ts index 32d9e52e90..4e0a788f08 100644 --- a/packages/protocol-echo/test/index.spec.ts +++ b/packages/protocol-echo/test/index.spec.ts @@ -2,6 +2,7 @@ import { start, stop } from '@libp2p/interface' import { defaultLogger } from '@libp2p/logger' +import { multiaddr } from '@multiformats/multiaddr' import { expect } from 'aegir/chai' import all from 'it-all' import { duplexPair } from 'it-pair/duplex' @@ -76,4 +77,35 @@ describe('echo', () => { expect(output).to.equalBytes(input) }) + + it('should echo data using method', async () => { + await start(echo) + + const duplex = duplexPair() + const outgoingStream = stubInterface() + outgoingStream.source = duplex[0].source + outgoingStream.sink.callsFake(async source => duplex[0].sink(source)) + + const incomingStream = stubInterface() + incomingStream.source = duplex[1].source + incomingStream.sink.callsFake(async source => duplex[1].sink(source)) + + const handler = components.registrar.handle.getCall(0).args[1] + handler({ + stream: incomingStream, + connection: stubInterface() + }) + + const ma = multiaddr('/ip4/123.123.123.123/tcp/1234') + + components.connectionManager.openConnection.withArgs(ma).resolves(stubInterface({ + newStream: async () => outgoingStream + })) + + const input = Uint8Array.from([0, 1, 2, 3]) + + const output = await echo.echo(ma, input) + + expect(output).to.equalBytes(output) + }) })