Skip to content

Commit

Permalink
return KademliaPeers directly into the queue instead of exposing Anno…
Browse files Browse the repository at this point in the history
…uncement abstraction
  • Loading branch information
shyba committed Apr 5, 2022
1 parent 66551c8 commit eb0d481
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 27 deletions.
14 changes: 2 additions & 12 deletions lbry/stream/downloader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import ipaddress
import typing
import logging
import binascii
Expand All @@ -9,10 +8,9 @@
from lbry.utils import lru_cache_concurrent
from lbry.stream.descriptor import StreamDescriptor
from lbry.blob_exchange.downloader import BlobDownloader
from lbry.torrent.tracker import subscribe_hash
from lbry.torrent.tracker import enqueue_tracker_search

if typing.TYPE_CHECKING:
from lbry.torrent.tracker import AnnounceResponse
from lbry.conf import Config
from lbry.dht.node import Node
from lbry.blob.blob_manager import BlobManager
Expand Down Expand Up @@ -66,13 +64,6 @@ def _add_fixed_peers(fixed_peers):
fixed_peers = await get_kademlia_peers_from_hosts(self.config.fixed_peers)
self.fixed_peers_handle = self.loop.call_later(self.fixed_peers_delay, _add_fixed_peers, fixed_peers)

async def _process_announcement(self, announcement: 'AnnounceResponse'):
peers = await get_kademlia_peers_from_hosts([
(str(ipaddress.ip_address(peer.address)), peer.port) for peer in announcement.peers if peer.port > 1024
])
log.info("Found %d peers from tracker for %s", len(peers), self.sd_hash[:8])
self.peer_queue.put_nowait(peers)

async def load_descriptor(self, connection_id: int = 0):
# download or get the sd blob
sd_blob = self.blob_manager.get_blob(self.sd_hash)
Expand Down Expand Up @@ -102,8 +93,7 @@ async def start(self, node: typing.Optional['Node'] = None, connection_id: int =
self.accumulate_task.cancel()
_, self.accumulate_task = self.node.accumulate_peers(self.search_queue, self.peer_queue)
await self.add_fixed_peers()
subscribe_hash(
bytes.fromhex(self.sd_hash), lambda result: asyncio.ensure_future(self._process_announcement(result)))
enqueue_tracker_search(bytes.fromhex(self.sd_hash), self.peer_queue)
# start searching for peers for the sd hash
self.search_queue.put_nowait(self.sd_hash)
log.info("searching for peers for stream %s", self.sd_hash)
Expand Down
22 changes: 15 additions & 7 deletions lbry/torrent/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,7 @@ async def get_peer_list(self, info_hash, stopped=False, on_announcement=None, no

async def get_kademlia_peer_list(self, info_hash):
responses = await self.get_peer_list(info_hash, no_port=True)
peers = [
(str(ipaddress.ip_address(peer.address)), peer.port)
for ann in responses for peer in ann.peers if peer.port > 1024 # filter out privileged and 0
]
return await get_kademlia_peers_from_hosts(peers)
return await announcement_to_kademlia_peers(*responses)

async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=False, no_port=False):
result = None
Expand All @@ -229,8 +225,20 @@ async def _probe_server(self, info_hash, tracker_host, tracker_port, stopped=Fal
return result


def subscribe_hash(info_hash: bytes, on_data):
TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_data))
def enqueue_tracker_search(info_hash: bytes, peer_q: asyncio.Queue):
async def on_announcement(announcement: AnnounceResponse):
peers = await announcement_to_kademlia_peers(announcement)
log.info("Found %d peers from tracker for %s", len(peers), info_hash.hex()[:8])
peer_q.put_nowait(peers)
TrackerClient.EVENT_CONTROLLER.add(('search', info_hash, on_announcement))


def announcement_to_kademlia_peers(*announcements: AnnounceResponse):
peers = [
(str(ipaddress.ip_address(peer.address)), peer.port)
for announcement in announcements for peer in announcement.peers if peer.port > 1024 # no privileged or 0
]
return get_kademlia_peers_from_hosts(peers)


class UDPTrackerServerProtocol(asyncio.DatagramProtocol): # for testing. Not suitable for production
Expand Down
16 changes: 8 additions & 8 deletions tests/unit/torrent/test_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import random

from lbry.testcase import AsyncioTestCase
from lbry.torrent.tracker import CompactIPv4Peer, TrackerClient, subscribe_hash, UDPTrackerServerProtocol, encode_peer
from lbry.dht.peer import KademliaPeer
from lbry.torrent.tracker import CompactIPv4Peer, TrackerClient, enqueue_tracker_search, UDPTrackerServerProtocol, encode_peer


class UDPTrackerClientTestCase(AsyncioTestCase):
Expand Down Expand Up @@ -46,10 +47,9 @@ async def test_announce_many_info_hashes_to_many_servers_with_bad_one_and_dns_er
async def test_announce_using_helper_function(self):
info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False)
queue = asyncio.Queue()
subscribe_hash(info_hash, queue.put)
announcement = await queue.get()
peers = announcement.peers
self.assertEqual(peers, [CompactIPv4Peer(int.from_bytes(bytes([127, 0, 0, 1]), "big", signed=False), 4444)])
enqueue_tracker_search(info_hash, queue)
peers = await queue.get()
self.assertEqual(peers, [KademliaPeer('127.0.0.1', None, None, 4444, allow_localhost=True)])

async def test_error(self):
info_hash = random.getrandbits(160).to_bytes(20, "big", signed=False)
Expand Down Expand Up @@ -85,8 +85,8 @@ async def test_multiple_servers_with_different_peers_across_helper_function(self
peer = (f"127.0.0.{random.randint(1, 255)}", random.randint(2000, 65500))
fake_peers.append(peer)
server.add_peer(info_hash, *peer)
response = []
subscribe_hash(info_hash, response.append)
peer_q = asyncio.Queue()
enqueue_tracker_search(info_hash, peer_q)
await asyncio.sleep(0)
await asyncio.gather(*self.client.tasks.values())
self.assertEqual(11, len(response))
self.assertEqual(11, peer_q.qsize())

0 comments on commit eb0d481

Please sign in to comment.