diff --git a/go-peer/chatroom.go b/go-peer/chatroom.go index e2496044..f5fcd6a6 100644 --- a/go-peer/chatroom.go +++ b/go-peer/chatroom.go @@ -15,6 +15,12 @@ import ( // ChatRoomBufSize is the number of incoming messages to buffer for each topic. const ChatRoomBufSize = 128 +// Topic used to broadcast browser WebRTC addresses +const PubSubDiscoveryTopic string = "universal-connectivity-browser-peer-discovery" + +const ChatTopic string = "universal-connectivity" +const ChatFileTopic string = "universal-connectivity-file" + // ChatRoom represents a subscription to a single PubSub topic. Messages // can be published to the topic with ChatRoom.Publish, and received // messages are pushed to the Messages channel. @@ -23,13 +29,15 @@ type ChatRoom struct { Messages chan *ChatMessage SysMessages chan *ChatMessage - ctx context.Context - h host.Host - ps *pubsub.PubSub - chatTopic *pubsub.Topic - chatSub *pubsub.Subscription - fileTopic *pubsub.Topic - fileSub *pubsub.Subscription + ctx context.Context + h host.Host + ps *pubsub.PubSub + chatTopic *pubsub.Topic + chatSub *pubsub.Subscription + fileTopic *pubsub.Topic + fileSub *pubsub.Subscription + peerDiscoveryTopic *pubsub.Topic + peerDiscoverySub *pubsub.Subscription roomName string nick string @@ -44,9 +52,9 @@ type ChatMessage struct { // JoinChatRoom tries to subscribe to the PubSub topic for the room name, returning // a ChatRoom on success. -func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string, roomName string) (*ChatRoom, error) { +func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname string) (*ChatRoom, error) { // join the pubsub chatTopic - chatTopic, err := ps.Join(chatTopicName(roomName)) + chatTopic, err := ps.Join(ChatTopic) if err != nil { return nil, err } @@ -58,7 +66,7 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname } // join the pubsub fileTopic - fileTopic, err := ps.Join(fileTopicName(roomName)) + fileTopic, err := ps.Join(ChatFileTopic) if err != nil { return nil, err } @@ -69,18 +77,31 @@ func JoinChatRoom(ctx context.Context, h host.Host, ps *pubsub.PubSub, nickname return nil, err } + // join the pubsub peer disovery topic + peerDiscoveryTopic, err := ps.Join(PubSubDiscoveryTopic) + if err != nil { + return nil, err + } + + // and subscribe to it + peerDiscoverySub, err := peerDiscoveryTopic.Subscribe() + if err != nil { + return nil, err + } + cr := &ChatRoom{ - ctx: ctx, - h: h, - ps: ps, - chatTopic: chatTopic, - chatSub: chatSub, - fileTopic: fileTopic, - fileSub: fileSub, - nick: nickname, - roomName: roomName, - Messages: make(chan *ChatMessage, ChatRoomBufSize), - SysMessages: make(chan *ChatMessage, ChatRoomBufSize), + ctx: ctx, + h: h, + ps: ps, + chatTopic: chatTopic, + chatSub: chatSub, + fileTopic: fileTopic, + fileSub: fileSub, + peerDiscoveryTopic: peerDiscoveryTopic, + peerDiscoverySub: peerDiscoverySub, + nick: nickname, + Messages: make(chan *ChatMessage, ChatRoomBufSize), + SysMessages: make(chan *ChatMessage, ChatRoomBufSize), } // start reading messages from the subscription in a loop @@ -94,7 +115,7 @@ func (cr *ChatRoom) Publish(message string) error { } func (cr *ChatRoom) ListPeers() []peer.ID { - return cr.ps.ListPeers(chatTopicName(cr.roomName)) + return cr.ps.ListPeers(ChatTopic) } // readLoop pulls messages from the pubsub chat/file topic and handles them. @@ -187,13 +208,3 @@ func (cr *ChatRoom) requestFile(toPeer peer.ID, fileID []byte) ([]byte, error) { return fileBody, nil } - -// chatTopicName returns the name of the pubsub topic for the chat room. -func chatTopicName(roomName string) string { - return roomName -} - -// fileTopicName returns the name of the pubsub topic used for sending/recieving files in the chat room. -func fileTopicName(roomName string) string { - return fmt.Sprintf("%s-file", roomName) -} diff --git a/go-peer/main.go b/go-peer/main.go index 6734cf3b..c845963c 100644 --- a/go-peer/main.go +++ b/go-peer/main.go @@ -72,10 +72,12 @@ func NewDHT(ctx context.Context, host host.Host, bootstrapPeers []multiaddr.Mult } // Borrowed from https://medium.com/rahasak/libp2p-pubsub-peer-discovery-with-kademlia-dht-c8b131550ac7 -func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous string) { +// Only used by Go peer to find each other. +// TODO: since this isn't implemented on the Rust or the JS side, can probably be removed +func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT) { routingDiscovery := routing.NewRoutingDiscovery(dht) - discovery.Advertise(ctx, routingDiscovery, rendezvous) + discovery.Advertise(ctx, routingDiscovery, DiscoveryServiceTag) ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() @@ -86,7 +88,7 @@ func Discover(ctx context.Context, h host.Host, dht *dht.IpfsDHT, rendezvous str return case <-ticker.C: - peers, err := discovery.FindPeers(ctx, routingDiscovery, rendezvous) + peers, err := discovery.FindPeers(ctx, routingDiscovery, DiscoveryServiceTag) if err != nil { panic(err) } @@ -115,7 +117,6 @@ func LogMsgf(f string, msg ...any) { func main() { // parse some flags to set our nickname and the room to join nickFlag := flag.String("nick", "", "nickname to use in chat. will be generated if empty") - roomFlag := flag.String("room", "universal-connectivity", "name of chat room to join") idPath := flag.String("identity", "identity.key", "path to the private key (PeerID) file") certPath := flag.String("tls-cert-path", "", "path to the tls cert file (for websockets)") keyPath := flag.String("tls-key-path", "", "path to the tls key file (for websockets") @@ -194,11 +195,8 @@ func main() { nick = defaultNick(h.ID()) } - // join the room from the cli flag, or the flag default - room := *roomFlag - // join the chat room - cr, err := JoinChatRoom(ctx, h, ps, nick, room) + cr, err := JoinChatRoom(ctx, h, ps, nick) if err != nil { panic(err) } @@ -213,7 +211,7 @@ func main() { } // setup peer discovery - go Discover(ctx, h, dht, "universal-connectivity") + go Discover(ctx, h, dht) // setup local mDNS discovery if err := setupDiscovery(h); err != nil { diff --git a/js-peer/package-lock.json b/js-peer/package-lock.json index 041a49e0..d20eab67 100644 --- a/js-peer/package-lock.json +++ b/js-peer/package-lock.json @@ -18,11 +18,11 @@ "@libp2p/identify": "^1.0.19", "@libp2p/interface-pubsub": "^4.0.1", "@libp2p/kad-dht": "^12.0.13", + "@libp2p/pubsub-peer-discovery": "^10.0.2", "@libp2p/webrtc": "^4.0.28", "@libp2p/websockets": "^8.0.20", "@libp2p/webtransport": "^4.0.28", "@multiformats/multiaddr": "^12.2.1", - "datastore-idb": "^2.1.9", "debug": "^4.3.4", "it-length-prefixed": "^9.0.4", "it-map": "^3.1.0", @@ -3115,6 +3115,37 @@ "uint8arrays": "^5.0.2" } }, + "node_modules/@libp2p/pubsub-peer-discovery": { + "version": "10.0.2", + "resolved": "https://registry.npmjs.org/@libp2p/pubsub-peer-discovery/-/pubsub-peer-discovery-10.0.2.tgz", + "integrity": "sha512-7DLasMSo443nxPJ+X95tXazXgO96K2/TafoexDxi4QVWIKgkmK+HyoFRcmwog2pjhA1/KQUsPu8S8wH6Ns9Oow==", + "dependencies": { + "@libp2p/interface": "^1.0.1", + "@libp2p/interface-internal": "^1.0.1", + "@libp2p/peer-id": "^4.0.1", + "@multiformats/multiaddr": "^12.0.0", + "protons-runtime": "^5.0.0", + "uint8arraylist": "^2.4.3", + "uint8arrays": "^4.0.9" + } + }, + "node_modules/@libp2p/pubsub-peer-discovery/node_modules/multiformats": { + "version": "12.1.3", + "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-12.1.3.tgz", + "integrity": "sha512-eajQ/ZH7qXZQR2AgtfpmSMizQzmyYVmCql7pdhldPuYQi4atACekbJaQplk6dWyIi10jCaFnd6pqvcEFXjbaJw==", + "engines": { + "node": ">=16.0.0", + "npm": ">=7.0.0" + } + }, + "node_modules/@libp2p/pubsub-peer-discovery/node_modules/uint8arrays": { + "version": "4.0.10", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-4.0.10.tgz", + "integrity": "sha512-AnJNUGGDJAgFw/eWu/Xb9zrVKEGlwJJCaeInlf3BkecE/zcTobk5YXYIPNQJO1q5Hh1QZrQQHf0JvcHqz2hqoA==", + "dependencies": { + "multiformats": "^12.0.1" + } + }, "node_modules/@libp2p/pubsub/node_modules/multiformats": { "version": "13.1.0", "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-13.1.0.tgz", @@ -5901,18 +5932,6 @@ "it-take": "^3.0.4" } }, - "node_modules/datastore-idb": { - "version": "2.1.9", - "resolved": "https://registry.npmjs.org/datastore-idb/-/datastore-idb-2.1.9.tgz", - "integrity": "sha512-o1LAE2VgVMeEWOP/zHYHV6MetGzbN+C72nRRUFwPXk0xLopsPN9wlH73D3De3pyDZKoKp2875XDFpRkDJ8mGWA==", - "dependencies": { - "datastore-core": "^9.0.0", - "idb": "^8.0.0", - "interface-datastore": "^8.0.0", - "it-filter": "^3.0.4", - "it-sort": "^3.0.4" - } - }, "node_modules/dayjs": { "version": "1.11.10", "resolved": "https://registry.npmjs.org/dayjs/-/dayjs-1.11.10.tgz", @@ -7660,11 +7679,6 @@ "node": ">=10.17.0" } }, - "node_modules/idb": { - "version": "8.0.0", - "resolved": "https://registry.npmjs.org/idb/-/idb-8.0.0.tgz", - "integrity": "sha512-l//qvlAKGmQO31Qn7xdzagVPPaHTxXx199MhrAFuVBTPqydcPYBWjkrbv4Y0ktB+GmWOiwHl237UUOrLmQxLvw==" - }, "node_modules/ieee754": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.2.1.tgz", diff --git a/js-peer/package.json b/js-peer/package.json index c89ee7a4..f2081041 100644 --- a/js-peer/package.json +++ b/js-peer/package.json @@ -19,11 +19,11 @@ "@libp2p/identify": "^1.0.19", "@libp2p/interface-pubsub": "^4.0.1", "@libp2p/kad-dht": "^12.0.13", + "@libp2p/pubsub-peer-discovery": "^10.0.2", "@libp2p/webrtc": "^4.0.28", "@libp2p/websockets": "^8.0.20", "@libp2p/webtransport": "^4.0.28", "@multiformats/multiaddr": "^12.2.1", - "datastore-idb": "^2.1.9", "debug": "^4.3.4", "it-length-prefixed": "^9.0.4", "it-map": "^3.1.0", diff --git a/js-peer/src/context/chat-ctx.tsx b/js-peer/src/context/chat-ctx.tsx index 2b189ac5..fcfb7959 100644 --- a/js-peer/src/context/chat-ctx.tsx +++ b/js-peer/src/context/chat-ctx.tsx @@ -1,7 +1,7 @@ import React, { createContext, useContext, useEffect, useState } from 'react'; import { useLibp2pContext } from './ctx'; import type { Message } from '@libp2p/interface' -import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL } from '@/lib/constants' +import { CHAT_FILE_TOPIC, CHAT_TOPIC, FILE_EXCHANGE_PROTOCOL, PUBSUB_PEER_DISCOVERY } from '@/lib/constants' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { pipe } from 'it-pipe' @@ -58,8 +58,11 @@ export const ChatProvider = ({ children }: any) => { chatFileMessageCB(evt, topic, data) break } + case PUBSUB_PEER_DISCOVERY: { + break + } default: { - throw new Error(`Unexpected gossipsub topic: ${topic}`) + console.error(`Unexpected event %o on gossipsub topic: ${topic}`, evt) } } } diff --git a/js-peer/src/context/peer-ctx.tsx b/js-peer/src/context/peer-ctx.tsx index 01c6e820..324ccb80 100644 --- a/js-peer/src/context/peer-ctx.tsx +++ b/js-peer/src/context/peer-ctx.tsx @@ -5,7 +5,6 @@ import { PeerId } from '@libp2p/interface' export interface PeerStats { peerIds: PeerId[] - connected: boolean connections: Connection[] latency: number } @@ -17,7 +16,6 @@ export interface PeerContextInterface { export const peerContext = createContext({ peerStats: { peerIds: [], - connected: true, connections: [], latency: 0 }, @@ -31,7 +29,6 @@ export const usePeerContext = () => { export const PeerProvider = ({ children }: { children: ReactNode }) => { const [peerStats, setPeerStats] = useState({ peerIds: [], - connected: false, connections: [], latency: 0 }); diff --git a/js-peer/src/lib/constants.ts b/js-peer/src/lib/constants.ts index 2bff356d..2f10d2ea 100644 --- a/js-peer/src/lib/constants.ts +++ b/js-peer/src/lib/constants.ts @@ -1,5 +1,6 @@ export const CHAT_TOPIC = "universal-connectivity" export const CHAT_FILE_TOPIC = "universal-connectivity-file" +export const PUBSUB_PEER_DISCOVERY = "universal-connectivity-browser-peer-discovery" export const FILE_EXCHANGE_PROTOCOL = "/universal-connectivity-file/1" export const CIRCUIT_RELAY_CODE = 290 diff --git a/js-peer/src/lib/libp2p.ts b/js-peer/src/lib/libp2p.ts index 217c68eb..ba025020 100644 --- a/js-peer/src/lib/libp2p.ts +++ b/js-peer/src/lib/libp2p.ts @@ -1,4 +1,3 @@ -import { IDBDatastore } from 'datastore-idb' import { createDelegatedRoutingV1HttpApiClient, DelegatedRoutingV1HttpApiClient, @@ -11,28 +10,24 @@ import { yamux } from '@chainsafe/libp2p-yamux' import { bootstrap } from '@libp2p/bootstrap' import { Multiaddr } from '@multiformats/multiaddr' import { sha256 } from 'multiformats/hashes/sha2' -import type { Message, SignedMessage, PeerId } from '@libp2p/interface' +import type { Connection, Message, SignedMessage, PeerId } from '@libp2p/interface' import { gossipsub } from '@chainsafe/libp2p-gossipsub' import { webSockets } from '@libp2p/websockets' import { webTransport } from '@libp2p/webtransport' import { webRTC, webRTCDirect } from '@libp2p/webrtc' -import { BOOTSTRAP_PEER_IDS, CHAT_FILE_TOPIC, CHAT_TOPIC } from './constants' import { circuitRelayTransport } from '@libp2p/circuit-relay-v2' +import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery' +import { BOOTSTRAP_PEER_IDS, CHAT_FILE_TOPIC, CHAT_TOPIC, PUBSUB_PEER_DISCOVERY } from './constants' import first from 'it-first' export async function startLibp2p() { // enable verbose logging in browser console to view debug logs - localStorage.debug = 'libp2p*,-*:trace' - - // application-specific data lives in the datastore - const datastore = new IDBDatastore('universal-connectivity') - await datastore.open() + localStorage.debug = 'libp2p*,-libp2p:connection-manager*,-*:trace' const delegatedClient = createDelegatedRoutingV1HttpApiClient('https://delegated-ipfs.dev') const { bootstrapAddrs, relayListenAddrs } = await getBootstrapMultiaddrs(delegatedClient) const libp2p = await createLibp2p({ - datastore, addresses: { listen: [ // 👇 Listen for webRTC connection @@ -54,6 +49,7 @@ export async function startLibp2p() { ], }, }), + // 👇 Required to estalbish connections with peers supporting WebRTC-direct, e.g. the Rust-peer webRTCDirect(), // 👇 Required to create circuit relay reservations in order to hole punch browser-to-browser WebRTC connections circuitRelayTransport({ @@ -62,8 +58,8 @@ export async function startLibp2p() { }), ], connectionManager: { - maxConnections: 10, - minConnections: 0, + maxConnections: 30, + minConnections: 5, }, connectionEncryption: [noise()], streamMuxers: [yamux()], @@ -71,6 +67,11 @@ export async function startLibp2p() { denyDialMultiaddr: async () => false, }, peerDiscovery: [ + pubsubPeerDiscovery({ + interval: 10_000, + topics: [PUBSUB_PEER_DISCOVERY], + listenOnly: false, + }), bootstrap({ // The app-specific go and rust bootstrappers use WebTransport and WebRTC-direct which have ephemeral multiadrrs // that are resolved above using the delegated routing API @@ -93,15 +94,24 @@ export async function startLibp2p() { libp2p.services.pubsub.subscribe(CHAT_TOPIC) libp2p.services.pubsub.subscribe(CHAT_FILE_TOPIC) - // .catch((e) => { - // console.log('woot', e) - // }) - libp2p.addEventListener('self:peer:update', ({ detail: { peer } }) => { const multiaddrs = peer.addresses.map(({ multiaddr }) => multiaddr) console.log(`changed multiaddrs: peer ${peer.id.toString()} multiaddrs: ${multiaddrs}`) }) + // 👇 explicitly dialling peers discovered via pubsub is only necessary + // when minConnections is set to 0 and the connection manager doesn't autodial + // libp2p.addEventListener('peer:discovery', (event) => { + // const { multiaddrs, id } = event.detail + + // if (libp2p.getConnections(id)?.length > 0) { + // console.log(`Already connected to peer %s. Will not try dialling`, id) + // return + // } + + // dialWebRTCMaddrs(libp2p, multiaddrs) + // }) + return libp2p } @@ -116,6 +126,23 @@ export async function msgIdFnStrictNoSign(msg: Message): Promise { return await sha256.encode(encodedSeqNum) } +// Function which dials one maddr at a time to avoid establishing multiple connections to the same peer +async function dialWebRTCMaddrs(libp2p: Libp2p, multiaddrs: Multiaddr[]): Promise { + // Filter webrtc (browser-to-browser) multiaddrs + const webRTCMadrs = multiaddrs.filter((maddr) => maddr.protoNames().includes('webrtc')) + console.log(`peer:discovery with maddrs: %o`, webRTCMadrs) + + for (const addr of webRTCMadrs) { + try { + console.log(`woot attempting to dial webrtc multiaddr: %o`, addr) + await libp2p.dial(addr) + return // if we succeed dialing the peer, no need to try another address + } catch (error) { + console.error(`woot failed to dial webrtc multiaddr: %o`, addr) + } + } +} + export const connectToMultiaddr = (libp2p: Libp2p) => async (multiaddr: Multiaddr) => { console.log(`dialling: ${multiaddr.toString()}`) try { @@ -133,7 +160,9 @@ export const connectToMultiaddr = (libp2p: Libp2p) => async (multiaddr: Multiadd async function getBootstrapMultiaddrs( client: DelegatedRoutingV1HttpApiClient, ): Promise { - const peers = await Promise.all(BOOTSTRAP_PEER_IDS.map(peerId => first(client.getPeers(peerIdFromString(peerId))))) + const peers = await Promise.all( + BOOTSTRAP_PEER_IDS.map((peerId) => first(client.getPeers(peerIdFromString(peerId)))), + ) const bootstrapAddrs = [] const relayListenAddrs = [] @@ -166,3 +195,9 @@ interface BootstrapsMultiaddrs { // Constructs a multiaddr string representing the circuit relay v2 listen address for a relayed connection to the given peer. const getRelayListenAddr = (maddr: Multiaddr, peer: PeerId): string => `${maddr.toString()}/p2p/${peer.toString()}/p2p-circuit` + +export const getFormattedConnections = (connections: Connection[]) => + connections.map((conn) => ({ + peerId: conn.remotePeer, + protocols: [...new Set(conn.remoteAddr.protoNames())], + })) diff --git a/js-peer/src/pages/index.tsx b/js-peer/src/pages/index.tsx index 96949f16..9778b823 100644 --- a/js-peer/src/pages/index.tsx +++ b/js-peer/src/pages/index.tsx @@ -8,7 +8,7 @@ import { usePeerContext } from '../context/peer-ctx' import { useCallback, useEffect, useState } from 'react' import Image from 'next/image' import { multiaddr } from '@multiformats/multiaddr' -import { connectToMultiaddr } from '../lib/libp2p' +import { connectToMultiaddr, getFormattedConnections } from '../lib/libp2p' import { useListenAddressesContext } from '../context/listen-addresses-ctx' import Spinner from '@/components/spinner' @@ -28,7 +28,6 @@ export default function Home() { ...peerStats, peerIds: connections.map((conn) => conn.remotePeer), connections: connections, - connected: connections.length > 0, }) }, 1000) @@ -42,7 +41,6 @@ export default function Home() { const multiaddrs = libp2p.getMultiaddrs() setListenAddresses({ - ...listenAddresses, multiaddrs, }) }, 1000) @@ -52,32 +50,6 @@ export default function Home() { } }, [libp2p, listenAddresses, setListenAddresses]) - type PeerProtoTuple = { - peerId: PeerId - protocols: string[] - } - - const getFormattedConnections = (connections: Connection[]): PeerProtoTuple[] => { - const protoNames: Map = new Map() - - connections.forEach((conn) => { - const exists = protoNames.get(conn.remotePeer) - const dedupedProtonames = [...new Set(conn.remoteAddr.protoNames())] - - if (exists?.length) { - const namesToAdd = dedupedProtonames.filter((name) => !exists.includes(name)) - // console.log('namesToAdd: ', namesToAdd) - protoNames.set(conn.remotePeer, [...exists, ...namesToAdd]) - } else { - protoNames.set(conn.remotePeer, dedupedProtonames) - } - }) - - return [...protoNames.entries()].map(([peerId, protocols]) => ({ - peerId, - protocols, - })) - } const handleConnectToMultiaddr = useCallback( async (e: React.MouseEvent) => { @@ -181,7 +153,7 @@ export default function Home() {
Connected:{' '} - {peerStats.connected ? ( + {peerStats.connections.length > 0 ? ( ) : ( @@ -198,7 +170,7 @@ export default function Home() {
    {getFormattedConnections(peerStats.connections).map((pair) => (
  • {`${pair.peerId.toString()} (${pair.protocols.join(', ')})`} diff --git a/rust-peer/src/main.rs b/rust-peer/src/main.rs index 38d7d7af..9f476f87 100644 --- a/rust-peer/src/main.rs +++ b/rust-peer/src/main.rs @@ -217,7 +217,8 @@ async fn main() -> Result<()> { { debug!("identify::Event::Received observed_addr: {}", observed_addr); - swarm.add_external_address(observed_addr); + // Disable to see if it's the cause of the wrong multiaddrs getting announced + // swarm.add_external_address(observed_addr); // TODO: The following should no longer be necessary after https://github.com/libp2p/rust-libp2p/pull/4371. if protocols.iter().any(|p| p == &KADEMLIA_PROTOCOL_NAME) {