Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 81 additions & 62 deletions bittensor/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import aiohttp
import bittensor
from fastapi import Response
from typing import Union, Optional, List, Union, AsyncGenerator
from typing import Union, Optional, List, Union, AsyncGenerator, Any


class dendrite(torch.nn.Module):
Expand Down Expand Up @@ -59,7 +59,8 @@ class dendrite(torch.nn.Module):
Asynchronously sends a request to a specified Axon and processes the response.

call_stream(self, target_axon, synapse=bittensor.Synapse(), timeout=12.0, deserialize=True) -> AsyncGenerator[bittensor.Synapse, None]:
Sends a request to a specified Axon and yields streaming responses.
Sends a request to a specified Axon and yields an AsyncGenerator that contains streaming
response chunks before finally yielding the filled Synapse as the final element.

preprocess_synapse_for_request(self, target_axon_info, synapse, timeout=12.0) -> bittensor.Synapse:
Preprocesses the synapse for making a request, including building headers and signing.
Expand Down Expand Up @@ -146,9 +147,10 @@ def close_session(self):
Usage:
dendrite_instance.close_session()
"""
loop = asyncio.get_event_loop()
loop.run_until_complete(self._session.close())
self._session = None
if self._session:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._session.close())
self._session = None

async def aclose_session(self):
"""
Expand All @@ -164,9 +166,47 @@ async def aclose_session(self):
await self._session.close()
self._session = None

def _get_endpoint_url(self, target_axon, request_name):
endpoint = (
f"0.0.0.0:{str(target_axon.port)}"
if target_axon.ip == str(self.external_ip)
else f"{target_axon.ip}:{str(target_axon.port)}"
)
return f"http://{endpoint}/{request_name}"

def _handle_request_errors(self, synapse, request_name, exception):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really like this new functional design!

if isinstance(exception, aiohttp.ClientConnectorError):
synapse.dendrite.status_code = "503"
synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable."
elif isinstance(exception, asyncio.TimeoutError):
synapse.dendrite.status_code = "408"
synapse.dendrite.status_message = (
f"Timedout after {synapse.timeout} seconds."
)
else:
synapse.dendrite.status_code = "422"
synapse.dendrite.status_message = (
f"Failed to parse response object with error: {str(exception)}"
)

def _log_outgoing_request(self, synapse):
bittensor.logging.debug(
f"dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success"
)

def _log_incoming_response(self, synapse):
bittensor.logging.debug(
f"dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}"
)

def query(
self, *args, **kwargs
) -> Union[bittensor.Synapse, List[bittensor.Synapse]]:
) -> Union[
bittensor.Synapse,
List[bittensor.Synapse],
bittensor.StreamingSynapse,
List[bittensor.StreamingSynapse],
]:
"""
Makes a synchronous request to multiple target Axons and returns the server responses.

Expand Down Expand Up @@ -207,7 +247,7 @@ async def forward(
deserialize: bool = True,
run_async: bool = True,
streaming: bool = False,
) -> bittensor.Synapse:
) -> List[Union[AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse]]:
"""
Asynchronously sends requests to one or multiple Axons and collates their responses.

Expand All @@ -216,6 +256,17 @@ async def forward(
the requests, and then sends them off. After getting the responses, it processes and
collates them into a unified format.

When querying an Axon that sends back data in chunks using the Dednrite, this function
returns an AsyncGenerator that yields each chunk as it is received. The generator can be
iterated over to process each chunk individually.

For example:
>>> ...
>>> dendrte = bittensor.dendrite(wallet = wallet)
>>> async for chunk in dendrite.forward(axons, synapse, timeout, deserialize, run_async, streaming):
>>> # Process each chunk here
>>> print(chunk)

Args:
axons (Union[List[Union['bittensor.AxonInfo', 'bittensor.axon']], Union['bittensor.AxonInfo', 'bittensor.axon']]):
The target Axons to send requests to. Can be a single Axon or a list of Axons.
Expand All @@ -226,7 +277,7 @@ async def forward(
streaming (bool, optional): Indicates if the response is expected to be in streaming format. Defaults to False.

Returns:
Union[bittensor.Synapse, List[bittensor.Synapse]]: If a single Axon is targeted, returns its response.
Union[AsyncGenerator, bittensor.Synapse, List[bittensor.Synapse]]: If a single Axon is targeted, returns its response.
If multiple Axons are targeted, returns a list of their responses.
"""
is_list = True
Expand All @@ -241,11 +292,13 @@ async def forward(
)
if streaming != is_streaming_subclass:
bittensor.logging.warning(
"Argument streaming is {streaming} while issubclass(synapse, StreamingSynapse) is {synapse.__class__.__name__}. This may cause unexpected behavior."
f"Argument streaming is {streaming} while issubclass(synapse, StreamingSynapse) is {synapse.__class__.__name__}. This may cause unexpected behavior."
)
streaming = is_streaming_subclass or streaming

async def query_all_axons(is_stream: bool) -> List[bt.Synapse]:
async def query_all_axons(
is_stream: bool,
) -> Union[AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse]:
"""
Handles requests for all axons, either in streaming or non-streaming mode.

Expand All @@ -256,7 +309,11 @@ async def query_all_axons(is_stream: bool) -> List[bt.Synapse]:
List of Synapse objects with responses.
"""

async def single_axon_response(target_axon) -> bt.Synapse:
async def single_axon_response(
target_axon,
) -> Union[
AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse
]:
"""
Retrieve response for a single axon, either in streaming or non-streaming mode.

Expand All @@ -267,17 +324,13 @@ async def single_axon_response(target_axon) -> bt.Synapse:
A Synapse object with the response.
"""
if is_stream:
# If in streaming mode, we iterate over the streaming content
# and take the last item as the final result.
final_result = None
async for result in self.call_stream(
# If in streaming mode, return the async_generator
return self.call_stream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If query multiple axons at the same time, this will return multiple async generators for us right? Would this have any issues on the bandwidth if we were to open 25-50 streams at once?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question, I'll try this out on testnet

target_axon=target_axon,
synapse=synapse.copy(),
timeout=timeout,
deserialize=deserialize,
):
final_result = result
return final_result
)
else:
# If not in streaming mode, simply call the axon and get the response.
return await self.call(
Expand Down Expand Up @@ -339,21 +392,14 @@ async def call(

# Build request endpoint from the synapse class
request_name = synapse.__class__.__name__
endpoint = (
f"0.0.0.0:{str(target_axon.port)}"
if target_axon.ip == str(self.external_ip)
else f"{target_axon.ip}:{str(target_axon.port)}"
)
url = f"http://{endpoint}/{request_name}"
url = self._get_endpoint_url(target_axon, request_name=request_name)

# Preprocess synapse for making a request
synapse = self.preprocess_synapse_for_request(target_axon, synapse, timeout)

try:
# Log outgoing request
bittensor.logging.debug(
f"dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success"
)
self._log_outgoing_request(synapse)

# Make the HTTP POST request
async with (await self.session).post(
Expand All @@ -370,24 +416,11 @@ async def call(
# Set process time and log the response
synapse.dendrite.process_time = str(time.time() - start_time)

except aiohttp.ClientConnectorError as e:
synapse.dendrite.status_code = "503"
synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable."

except asyncio.TimeoutError as e:
synapse.dendrite.status_code = "408"
synapse.dendrite.status_message = f"Timedout after {timeout} seconds."

except Exception as e:
synapse.dendrite.status_code = "422"
synapse.dendrite.status_message = (
f"Failed to parse response object with error: {str(e)}"
)
self._handle_request_errors(synapse, request_name, e)

finally:
bittensor.logging.debug(
f"dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}"
)
self._log_incoming_response(synapse)

# Log synapse event history
self.synapse_history.append(
Expand All @@ -406,7 +439,7 @@ async def call_stream(
synapse: bittensor.Synapse = bittensor.Synapse(),
timeout: float = 12.0,
deserialize: bool = True,
) -> AsyncGenerator[bittensor.Synapse, None]:
) -> AsyncGenerator[Any]:
"""
Sends a request to a specified Axon and yields streaming responses.

Expand All @@ -422,7 +455,8 @@ async def call_stream(
deserialize (bool, optional): Determines if each received chunk should be deserialized. Defaults to True.

Yields:
bittensor.Synapse: Each yielded Synapse object contains a chunk of the response data from the Axon.
object: Each yielded object contains a chunk of the arbitrary response data from the Axon.
bittensor.Synapse: After the AsyncGenerator has been exhausted, yields the final filled Synapse.
"""

# Record start time
Expand All @@ -447,9 +481,7 @@ async def call_stream(

try:
# Log outgoing request
bittensor.logging.debug(
f"stream dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success"
)
self._log_outgoing_request(synapse)

# Make the HTTP POST request
async with (await self.session).post(
Expand All @@ -469,24 +501,11 @@ async def call_stream(
# Set process time and log the response
synapse.dendrite.process_time = str(time.time() - start_time)

except aiohttp.ClientConnectorError as e:
synapse.dendrite.status_code = "503"
synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable."

except asyncio.TimeoutError as e:
synapse.dendrite.status_code = "408"
synapse.dendrite.status_message = f"Timedout after {timeout} seconds."

except Exception as e:
synapse.dendrite.status_code = "422"
synapse.dendrite.status_message = (
f"Failed to parse response object with error: {str(e)}"
)
self._handle_request_errors(synapse, request_name, e)

finally:
bittensor.logging.debug(
f"stream dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}"
)
self._log_incoming_response(synapse)

# Log synapse event history
self.synapse_history.append(
Expand Down