-
Notifications
You must be signed in to change notification settings - Fork 441
Dendrite fixes #1561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dendrite fixes #1561
Changes from all commits
7212185
0a231ad
59bba23
72d67a6
fc1c1c6
0b05b0a
df0e258
aab10ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,7 +26,7 @@ | |
| import aiohttp | ||
| import bittensor | ||
| from fastapi import Response | ||
| from typing import Union, Optional, List, Union, AsyncGenerator | ||
| from typing import Union, Optional, List, Union, AsyncGenerator, Any | ||
|
|
||
|
|
||
| class dendrite(torch.nn.Module): | ||
|
|
@@ -59,7 +59,8 @@ class dendrite(torch.nn.Module): | |
| Asynchronously sends a request to a specified Axon and processes the response. | ||
|
|
||
| call_stream(self, target_axon, synapse=bittensor.Synapse(), timeout=12.0, deserialize=True) -> AsyncGenerator[bittensor.Synapse, None]: | ||
| Sends a request to a specified Axon and yields streaming responses. | ||
| Sends a request to a specified Axon and yields an AsyncGenerator that contains streaming | ||
| response chunks before finally yielding the filled Synapse as the final element. | ||
|
|
||
| preprocess_synapse_for_request(self, target_axon_info, synapse, timeout=12.0) -> bittensor.Synapse: | ||
| Preprocesses the synapse for making a request, including building headers and signing. | ||
|
|
@@ -146,9 +147,10 @@ def close_session(self): | |
| Usage: | ||
| dendrite_instance.close_session() | ||
| """ | ||
| loop = asyncio.get_event_loop() | ||
| loop.run_until_complete(self._session.close()) | ||
| self._session = None | ||
| if self._session: | ||
| loop = asyncio.get_event_loop() | ||
| loop.run_until_complete(self._session.close()) | ||
| self._session = None | ||
|
|
||
| async def aclose_session(self): | ||
| """ | ||
|
|
@@ -164,9 +166,47 @@ async def aclose_session(self): | |
| await self._session.close() | ||
| self._session = None | ||
|
|
||
| def _get_endpoint_url(self, target_axon, request_name): | ||
| endpoint = ( | ||
| f"0.0.0.0:{str(target_axon.port)}" | ||
| if target_axon.ip == str(self.external_ip) | ||
| else f"{target_axon.ip}:{str(target_axon.port)}" | ||
| ) | ||
| return f"http://{endpoint}/{request_name}" | ||
|
|
||
| def _handle_request_errors(self, synapse, request_name, exception): | ||
| if isinstance(exception, aiohttp.ClientConnectorError): | ||
| synapse.dendrite.status_code = "503" | ||
| synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable." | ||
| elif isinstance(exception, asyncio.TimeoutError): | ||
| synapse.dendrite.status_code = "408" | ||
| synapse.dendrite.status_message = ( | ||
| f"Timedout after {synapse.timeout} seconds." | ||
| ) | ||
| else: | ||
| synapse.dendrite.status_code = "422" | ||
| synapse.dendrite.status_message = ( | ||
| f"Failed to parse response object with error: {str(exception)}" | ||
| ) | ||
|
|
||
| def _log_outgoing_request(self, synapse): | ||
| bittensor.logging.debug( | ||
| f"dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success" | ||
| ) | ||
|
|
||
| def _log_incoming_response(self, synapse): | ||
| bittensor.logging.debug( | ||
| f"dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}" | ||
| ) | ||
|
|
||
| def query( | ||
| self, *args, **kwargs | ||
| ) -> Union[bittensor.Synapse, List[bittensor.Synapse]]: | ||
| ) -> Union[ | ||
| bittensor.Synapse, | ||
| List[bittensor.Synapse], | ||
| bittensor.StreamingSynapse, | ||
| List[bittensor.StreamingSynapse], | ||
| ]: | ||
| """ | ||
| Makes a synchronous request to multiple target Axons and returns the server responses. | ||
|
|
||
|
|
@@ -207,7 +247,7 @@ async def forward( | |
| deserialize: bool = True, | ||
| run_async: bool = True, | ||
| streaming: bool = False, | ||
| ) -> bittensor.Synapse: | ||
| ) -> List[Union[AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse]]: | ||
| """ | ||
| Asynchronously sends requests to one or multiple Axons and collates their responses. | ||
|
|
||
|
|
@@ -216,6 +256,17 @@ async def forward( | |
| the requests, and then sends them off. After getting the responses, it processes and | ||
| collates them into a unified format. | ||
|
|
||
| When querying an Axon that sends back data in chunks using the Dednrite, this function | ||
| returns an AsyncGenerator that yields each chunk as it is received. The generator can be | ||
| iterated over to process each chunk individually. | ||
|
|
||
| For example: | ||
| >>> ... | ||
| >>> dendrte = bittensor.dendrite(wallet = wallet) | ||
| >>> async for chunk in dendrite.forward(axons, synapse, timeout, deserialize, run_async, streaming): | ||
| >>> # Process each chunk here | ||
| >>> print(chunk) | ||
|
|
||
| Args: | ||
| axons (Union[List[Union['bittensor.AxonInfo', 'bittensor.axon']], Union['bittensor.AxonInfo', 'bittensor.axon']]): | ||
| The target Axons to send requests to. Can be a single Axon or a list of Axons. | ||
|
|
@@ -226,7 +277,7 @@ async def forward( | |
| streaming (bool, optional): Indicates if the response is expected to be in streaming format. Defaults to False. | ||
|
|
||
| Returns: | ||
| Union[bittensor.Synapse, List[bittensor.Synapse]]: If a single Axon is targeted, returns its response. | ||
| Union[AsyncGenerator, bittensor.Synapse, List[bittensor.Synapse]]: If a single Axon is targeted, returns its response. | ||
| If multiple Axons are targeted, returns a list of their responses. | ||
| """ | ||
| is_list = True | ||
|
|
@@ -241,11 +292,13 @@ async def forward( | |
| ) | ||
| if streaming != is_streaming_subclass: | ||
| bittensor.logging.warning( | ||
| "Argument streaming is {streaming} while issubclass(synapse, StreamingSynapse) is {synapse.__class__.__name__}. This may cause unexpected behavior." | ||
| f"Argument streaming is {streaming} while issubclass(synapse, StreamingSynapse) is {synapse.__class__.__name__}. This may cause unexpected behavior." | ||
| ) | ||
| streaming = is_streaming_subclass or streaming | ||
|
|
||
| async def query_all_axons(is_stream: bool) -> List[bt.Synapse]: | ||
| async def query_all_axons( | ||
| is_stream: bool, | ||
| ) -> Union[AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse]: | ||
| """ | ||
| Handles requests for all axons, either in streaming or non-streaming mode. | ||
|
|
||
|
|
@@ -256,7 +309,11 @@ async def query_all_axons(is_stream: bool) -> List[bt.Synapse]: | |
| List of Synapse objects with responses. | ||
| """ | ||
|
|
||
| async def single_axon_response(target_axon) -> bt.Synapse: | ||
| async def single_axon_response( | ||
| target_axon, | ||
| ) -> Union[ | ||
| AsyncGenerator[Any], bittenst.Synapse, bittensor.StreamingSynapse | ||
| ]: | ||
| """ | ||
| Retrieve response for a single axon, either in streaming or non-streaming mode. | ||
|
|
||
|
|
@@ -267,17 +324,13 @@ async def single_axon_response(target_axon) -> bt.Synapse: | |
| A Synapse object with the response. | ||
| """ | ||
| if is_stream: | ||
| # If in streaming mode, we iterate over the streaming content | ||
| # and take the last item as the final result. | ||
| final_result = None | ||
| async for result in self.call_stream( | ||
| # If in streaming mode, return the async_generator | ||
| return self.call_stream( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If query multiple axons at the same time, this will return multiple async generators for us right? Would this have any issues on the bandwidth if we were to open 25-50 streams at once?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good question, I'll try this out on testnet |
||
| target_axon=target_axon, | ||
| synapse=synapse.copy(), | ||
| timeout=timeout, | ||
| deserialize=deserialize, | ||
| ): | ||
| final_result = result | ||
| return final_result | ||
| ) | ||
| else: | ||
| # If not in streaming mode, simply call the axon and get the response. | ||
| return await self.call( | ||
|
|
@@ -339,21 +392,14 @@ async def call( | |
|
|
||
| # Build request endpoint from the synapse class | ||
| request_name = synapse.__class__.__name__ | ||
| endpoint = ( | ||
| f"0.0.0.0:{str(target_axon.port)}" | ||
| if target_axon.ip == str(self.external_ip) | ||
| else f"{target_axon.ip}:{str(target_axon.port)}" | ||
| ) | ||
| url = f"http://{endpoint}/{request_name}" | ||
| url = self._get_endpoint_url(target_axon, request_name=request_name) | ||
|
|
||
| # Preprocess synapse for making a request | ||
| synapse = self.preprocess_synapse_for_request(target_axon, synapse, timeout) | ||
|
|
||
| try: | ||
| # Log outgoing request | ||
| bittensor.logging.debug( | ||
| f"dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success" | ||
| ) | ||
| self._log_outgoing_request(synapse) | ||
|
|
||
| # Make the HTTP POST request | ||
| async with (await self.session).post( | ||
|
|
@@ -370,24 +416,11 @@ async def call( | |
| # Set process time and log the response | ||
| synapse.dendrite.process_time = str(time.time() - start_time) | ||
|
|
||
| except aiohttp.ClientConnectorError as e: | ||
| synapse.dendrite.status_code = "503" | ||
| synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable." | ||
|
|
||
| except asyncio.TimeoutError as e: | ||
| synapse.dendrite.status_code = "408" | ||
| synapse.dendrite.status_message = f"Timedout after {timeout} seconds." | ||
|
|
||
| except Exception as e: | ||
| synapse.dendrite.status_code = "422" | ||
| synapse.dendrite.status_message = ( | ||
| f"Failed to parse response object with error: {str(e)}" | ||
| ) | ||
| self._handle_request_errors(synapse, request_name, e) | ||
|
|
||
| finally: | ||
| bittensor.logging.debug( | ||
| f"dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}" | ||
| ) | ||
| self._log_incoming_response(synapse) | ||
|
|
||
| # Log synapse event history | ||
| self.synapse_history.append( | ||
|
|
@@ -406,7 +439,7 @@ async def call_stream( | |
| synapse: bittensor.Synapse = bittensor.Synapse(), | ||
| timeout: float = 12.0, | ||
| deserialize: bool = True, | ||
| ) -> AsyncGenerator[bittensor.Synapse, None]: | ||
| ) -> AsyncGenerator[Any]: | ||
| """ | ||
| Sends a request to a specified Axon and yields streaming responses. | ||
|
|
||
|
|
@@ -422,7 +455,8 @@ async def call_stream( | |
| deserialize (bool, optional): Determines if each received chunk should be deserialized. Defaults to True. | ||
|
|
||
| Yields: | ||
| bittensor.Synapse: Each yielded Synapse object contains a chunk of the response data from the Axon. | ||
| object: Each yielded object contains a chunk of the arbitrary response data from the Axon. | ||
| bittensor.Synapse: After the AsyncGenerator has been exhausted, yields the final filled Synapse. | ||
| """ | ||
|
|
||
| # Record start time | ||
|
|
@@ -447,9 +481,7 @@ async def call_stream( | |
|
|
||
| try: | ||
| # Log outgoing request | ||
| bittensor.logging.debug( | ||
| f"stream dendrite | --> | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | 0 | Success" | ||
| ) | ||
| self._log_outgoing_request(synapse) | ||
|
|
||
| # Make the HTTP POST request | ||
| async with (await self.session).post( | ||
|
|
@@ -469,24 +501,11 @@ async def call_stream( | |
| # Set process time and log the response | ||
| synapse.dendrite.process_time = str(time.time() - start_time) | ||
|
|
||
| except aiohttp.ClientConnectorError as e: | ||
| synapse.dendrite.status_code = "503" | ||
| synapse.dendrite.status_message = f"Service at {synapse.axon.ip}:{str(synapse.axon.port)}/{request_name} unavailable." | ||
|
|
||
| except asyncio.TimeoutError as e: | ||
| synapse.dendrite.status_code = "408" | ||
| synapse.dendrite.status_message = f"Timedout after {timeout} seconds." | ||
|
|
||
| except Exception as e: | ||
| synapse.dendrite.status_code = "422" | ||
| synapse.dendrite.status_message = ( | ||
| f"Failed to parse response object with error: {str(e)}" | ||
| ) | ||
| self._handle_request_errors(synapse, request_name, e) | ||
|
|
||
| finally: | ||
| bittensor.logging.debug( | ||
| f"stream dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}" | ||
| ) | ||
| self._log_incoming_response(synapse) | ||
|
|
||
| # Log synapse event history | ||
| self.synapse_history.append( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really like this new functional design!