-
Notifications
You must be signed in to change notification settings - Fork 0
Dead letter queue #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
WalkthroughThe update enhances the Changes
Entelligence.ai can learn from your feedback. Simply add 👍 / 👎 emojis to teach it your preferences. More shortcuts belowEmoji Descriptions:
Interact with the Bot:
|
| else: | ||
| raise ApiServerException(f"API server: {result.body}") | ||
| if result.code == 500: | ||
| HttpClient._handle_failed_request( | ||
| url, None, api_key, None, jwt, "ServerError" | ||
| ) | ||
| raise ApiServerException("API server: - internal server error") | ||
|
|
||
| return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Bug Fix:
Clarify Parameters for Error Handling
Verify _handle_failed_request parameters and document their purpose for handling 500 errors.
🔧 Suggested Code Diff:
- url, None, api_key, None, jwt, "ServerError"
+ url, None, api_key, None, token, "ServerError"📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| else: | |
| raise ApiServerException(f"API server: {result.body}") | |
| if result.code == 500: | |
| HttpClient._handle_failed_request( | |
| url, None, api_key, None, jwt, "ServerError" | |
| ) | |
| raise ApiServerException("API server: - internal server error") | |
| return result | |
| else: | |
| raise ApiServerException(f"API server: {result.body}") | |
| if result.code == 500: | |
| HttpClient._handle_failed_request( | |
| url, None, api_key, None, token, "ServerError" | |
| ) | |
| raise ApiServerException("API server: internal server error") |
| @staticmethod | ||
| def _retry_dlq_requests(): | ||
| """Retry requests in the DLQ""" | ||
| for failed_request in dead_letter_queue.get_all(): | ||
| dead_letter_queue.clear() | ||
| try: | ||
| if "payload" in failed_request: | ||
| # Retry POST request from DLQ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Bug Fix:
Fix DLQ Clearing Logic
Move dead_letter_queue.clear() after a successful retry to prevent data loss.
🔧 Suggested Code Diff:
@staticmethod
def _retry_dlq_requests():
"""Retry requests in the DLQ"""
for failed_request in dead_letter_queue.get_all():
try:
if "payload" in failed_request:
# Retry POST request from DLQ
HttpClient.post(
failed_request["url"],
json.dumps(filter_unjsonable(failed_request["payload"])).encode(
"utf-8"
),
failed_request["api_key"],
failed_request["parent_key"],
failed_request["jwt"],
)
else:
# Retry GET request from DLQ
HttpClient.get(
failed_request["url"],
failed_request["api_key"],
failed_request["jwt"],
)
dead_letter_queue.clear() # Clear after successful retry
except ApiServerException as e:
dead_letter_queue.add(failed_request)
# If it still fails, keep it in the DLQ
except Exception as e:
dead_letter_queue.add(failed_request)📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| @staticmethod | |
| def _retry_dlq_requests(): | |
| """Retry requests in the DLQ""" | |
| for failed_request in dead_letter_queue.get_all(): | |
| dead_letter_queue.clear() | |
| try: | |
| if "payload" in failed_request: | |
| # Retry POST request from DLQ | |
| @staticmethod | |
| def _retry_dlq_requests(): | |
| """Retry requests in the DLQ""" | |
| for failed_request in dead_letter_queue.get_all(): | |
| try: | |
| if "payload" in failed_request: | |
| # Retry POST request from DLQ | |
| HttpClient.post( | |
| failed_request["url"], | |
| json.dumps(filter_unjsonable(failed_request["payload"])).encode( | |
| "utf-8" | |
| ), | |
| failed_request["api_key"], | |
| failed_request["parent_key"], | |
| failed_request["jwt"], | |
| ) | |
| else: | |
| # Retry GET request from DLQ | |
| HttpClient.get( | |
| failed_request["url"], | |
| failed_request["api_key"], | |
| failed_request["jwt"], | |
| ) | |
| dead_letter_queue.remove(failed_request) # Remove after successful retry | |
| except ApiServerException as e: | |
| dead_letter_queue.add(failed_request) | |
| # If it still fails, keep it in the DLQ | |
| except Exception as e: | |
| dead_letter_queue.add(failed_request) | |
| # Log the exception for debugging purposes | |
| logging.error(f"Failed to retry request: {e}") |
| HttpClient.post( | ||
| failed_request["url"], | ||
| json.dumps(filter_unjsonable(failed_request["payload"])).encode( | ||
| "utf-8" | ||
| ), | ||
| failed_request["api_key"], | ||
| failed_request["parent_key"], | ||
| failed_request["jwt"], | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Bug Fix:
Add Exception Handling for Retry Logic
Add exception handling to log and manage retry failures.
🔧 Suggested Code Diff:
+ try:
+ if "payload" in failed_request:
+ # Retry POST request from DLQ
+ HttpClient.post(
+ failed_request["url"],
+ json.dumps(filter_unjsonable(failed_request["payload"])).encode(
+ "utf-8"
+ ),
+ failed_request["api_key"],
+ failed_request["parent_key"],
+ failed_request["jwt"],
+ )
+ else:
+ # Retry GET request from DLQ
+ HttpClient.get(
+ failed_request["url"],
+ failed_request["api_key"],
+ failed_request["jwt"],
+ )
+ except ApiServerException as e:
+ dead_letter_queue.add(failed_request)
+ # Log the exception
+ logger.error(f"ApiServerException occurred: {e}")
+ except Exception as e:
+ dead_letter_queue.add(failed_request)
+ # Log the exception
+ logger.error(f"Unexpected error occurred: {e}")📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| HttpClient.post( | |
| failed_request["url"], | |
| json.dumps(filter_unjsonable(failed_request["payload"])).encode( | |
| "utf-8" | |
| ), | |
| failed_request["api_key"], | |
| failed_request["parent_key"], | |
| failed_request["jwt"], | |
| ) | |
| try: | |
| if "payload" in failed_request: | |
| # Retry POST request from DLQ | |
| HttpClient.post( | |
| failed_request["url"], | |
| json.dumps(filter_unjsonable(failed_request["payload"])).encode( | |
| "utf-8" | |
| ), | |
| failed_request["api_key"], | |
| failed_request["parent_key"], | |
| failed_request["jwt"], | |
| ) | |
| else: | |
| # Retry GET request from DLQ | |
| HttpClient.get( | |
| failed_request["url"], | |
| failed_request["api_key"], | |
| failed_request["jwt"], | |
| ) | |
| except ApiServerException as e: | |
| dead_letter_queue.add(failed_request) | |
| # Log the exception | |
| logger.error(f"ApiServerException occurred: {e}") | |
| except Exception as e: | |
| dead_letter_queue.add(failed_request) | |
| # Log the exception | |
| logger.error(f"Unexpected error occurred: {e}") |
| @patch("requests.Session") | ||
| def test_post_timeout(self, mock_session, mock_open_file): | ||
| # Mock a timeout exception | ||
| mock_session_instance = mock_session.return_value | ||
| mock_session_instance.post.side_effect = requests.exceptions.Timeout | ||
|
|
||
| url = "https://api.agentops.ai/health" | ||
| payload = {"key": "value"} | ||
|
|
||
| with self.assertRaises(ApiServerException) as context: | ||
| HttpClient.post(url, payload) | ||
|
|
||
| self.assertIn("timeout", str(context.exception).lower()) | ||
| self.assertEqual(len(dead_letter_queue.get_all()), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Bug Fix:
Enhance Timeout Test Coverage
Verify DLQ contents and retry mechanism in the timeout test.
🔧 Suggested Code Diff:
@patch("requests.Session")
def test_post_timeout(self, mock_session, mock_open_file):
# Mock a timeout exception
mock_session_instance = mock_session.return_value
mock_session_instance.post.side_effect = requests.exceptions.Timeout
url = "https://api.agentops.ai/health"
payload = {"key": "value"}
with self.assertRaises(ApiServerException) as context:
HttpClient.post(url, payload)
self.assertIn("timeout", str(context.exception).lower())
self.assertEqual(len(dead_letter_queue.get_all()), 1)
failed_request = dead_letter_queue.get_all()[0]
self.assertEqual(failed_request["url"], url)
self.assertEqual(failed_request["error_type"], "Timeout")
# Simulate retry and check if DLQ is cleared
mock_session_instance.post.side_effect = None
mock_session_instance.post.return_value = mock_response_success
HttpClient.retry_dlq()
self.assertEqual(len(dead_letter_queue.get_all()), 0)📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| @patch("requests.Session") | |
| def test_post_timeout(self, mock_session, mock_open_file): | |
| # Mock a timeout exception | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.post.side_effect = requests.exceptions.Timeout | |
| url = "https://api.agentops.ai/health" | |
| payload = {"key": "value"} | |
| with self.assertRaises(ApiServerException) as context: | |
| HttpClient.post(url, payload) | |
| self.assertIn("timeout", str(context.exception).lower()) | |
| self.assertEqual(len(dead_letter_queue.get_all()), 1) | |
| @patch("requests.Session") | |
| @patch("builtins.open", new_callable=mock_open) | |
| def test_post_timeout(self, mock_open_file, mock_session): | |
| # Mock a timeout exception | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.post.side_effect = requests.exceptions.Timeout | |
| url = "https://api.agentops.ai/health" | |
| payload = {"key": "value"} | |
| with self.assertRaises(ApiServerException) as context: | |
| HttpClient.post(url, payload) | |
| self.assertIn("timeout", str(context.exception).lower()) | |
| self.assertEqual(len(dead_letter_queue.get_all()), 1) | |
| failed_request = dead_letter_queue.get_all()[0] | |
| self.assertEqual(failed_request["url"], url) | |
| self.assertEqual(failed_request["error_type"], "Timeout") | |
| # Simulate retry and check if DLQ is cleared | |
| mock_session_instance.post.side_effect = None | |
| mock_response_success = mock.Mock() | |
| mock_response_success.status_code = 200 | |
| mock_session_instance.post.return_value = mock_response_success | |
| HttpClient.retry_dlq() | |
| self.assertEqual(len(dead_letter_queue.get_all()), 0) |
| @patch("requests.Session") | ||
| def test_get_http_error(self, mock_session, mock_open_file): | ||
| # Mock an HTTPError | ||
| mock_response = Mock() | ||
| mock_response.status_code = 500 | ||
| mock_response.json.return_value = {"error": "Internal Server Error"} | ||
|
|
||
| mock_session_instance = mock_session.return_value | ||
| mock_session_instance.get.side_effect = requests.exceptions.HTTPError( | ||
| response=mock_response | ||
| ) | ||
|
|
||
| url = "https://api.agentops.ai/health" | ||
|
|
||
| with self.assertRaises(ApiServerException) as context: | ||
| HttpClient.get(url) | ||
|
|
||
| self.assertEqual(len(dead_letter_queue.get_all()), 1) | ||
| failed_request = dead_letter_queue.get_all()[0] | ||
| self.assertEqual(failed_request["url"], url) | ||
| self.assertEqual(failed_request["error_type"], "HTTPError") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Enhancement:
Expand DLQ Test Coverage
Add tests for HTTP errors and network failures to verify DLQ reliability and retry mechanism.
🔧 Suggested Code Diff:
+ @patch("requests.Session")
+ def test_dlq_handles_various_errors(self, mock_session, mock_open_file):
+ # Mock different HTTP errors
+ mock_response_404 = Mock()
+ mock_response_404.status_code = 404
+ mock_response_404.json.return_value = {"error": "Not Found"}
+
+ mock_response_500 = Mock()
+ mock_response_500.status_code = 500
+ mock_response_500.json.return_value = {"error": "Internal Server Error"}
+
+ mock_session_instance = mock_session.return_value
+ mock_session_instance.get.side_effect = [
+ requests.exceptions.HTTPError(response=mock_response_404),
+ requests.exceptions.HTTPError(response=mock_response_500)
+ ]
+
+ url = "https://api.agentops.ai/health"
+
+ with self.assertRaises(ApiServerException):
+ HttpClient.get(url)
+ self.assertEqual(len(dead_letter_queue.get_all()), 1)
+
+ with self.assertRaises(ApiServerException):
+ HttpClient.get(url)
+ self.assertEqual(len(dead_letter_queue.get_all()), 2)📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| @patch("requests.Session") | |
| def test_get_http_error(self, mock_session, mock_open_file): | |
| # Mock an HTTPError | |
| mock_response = Mock() | |
| mock_response.status_code = 500 | |
| mock_response.json.return_value = {"error": "Internal Server Error"} | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.get.side_effect = requests.exceptions.HTTPError( | |
| response=mock_response | |
| ) | |
| url = "https://api.agentops.ai/health" | |
| with self.assertRaises(ApiServerException) as context: | |
| HttpClient.get(url) | |
| self.assertEqual(len(dead_letter_queue.get_all()), 1) | |
| failed_request = dead_letter_queue.get_all()[0] | |
| self.assertEqual(failed_request["url"], url) | |
| self.assertEqual(failed_request["error_type"], "HTTPError") | |
| @patch("requests.Session") | |
| def test_dlq_handles_various_errors(self, mock_session, mock_open_file): | |
| # Mock different HTTP errors | |
| mock_response_404 = Mock() | |
| mock_response_404.status_code = 404 | |
| mock_response_404.json.return_value = {"error": "Not Found"} | |
| mock_response_500 = Mock() | |
| mock_response_500.status_code = 500 | |
| mock_response_500.json.return_value = {"error": "Internal Server Error"} | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.get.side_effect = [ | |
| requests.exceptions.HTTPError(response=mock_response_404), | |
| requests.exceptions.HTTPError(response=mock_response_500) | |
| ] | |
| url = "https://api.agentops.ai/health" | |
| with self.assertRaises(ApiServerException): | |
| HttpClient.get(url) | |
| self.assertEqual(len(dead_letter_queue.get_all()), 1) | |
| with self.assertRaises(ApiServerException): | |
| HttpClient.get(url) | |
| self.assertEqual(len(dead_letter_queue.get_all()), 2) |
| @patch("requests.Session") | ||
| def test_dlq_retry_successfully_retries_post_and_get( | ||
| self, mock_session, mock_open_file | ||
| ): | ||
| # Mock successful POST and GET responses for DLQ retries | ||
| mock_response_success = Mock() | ||
| mock_response_success.status_code = 200 | ||
| mock_response_success.json.return_value = {"message": "Success"} | ||
|
|
||
| mock_session_instance = mock_session.return_value | ||
| mock_session_instance.post.side_effect = [ | ||
| mock_response_success, # The initial post succeeds | ||
| mock_response_success, # The DLQ post retry succeeds | ||
| ] | ||
| mock_session_instance.get.side_effect = [ | ||
| mock_response_success, # The DLQ get retry succeeds | ||
| ] | ||
|
|
||
| # Manually add failed POST and GET requests to the DLQ | ||
| failed_post_request = { | ||
| "url": "https://api.agentops.ai/health", | ||
| "payload": {"key": "value1"}, | ||
| "api_key": "API_KEY_1", | ||
| "parent_key": None, | ||
| "jwt": None, | ||
| "error_type": "Timeout", | ||
| } | ||
| failed_get_request = { | ||
| "url": "https://api.agentops.ai/health", | ||
| "payload": None, # GET request | ||
| "api_key": "API_KEY_2", | ||
| "parent_key": None, | ||
| "jwt": None, | ||
| "error_type": "Timeout", | ||
| } | ||
| dead_letter_queue.add(failed_post_request) | ||
| dead_letter_queue.add(failed_get_request) | ||
|
|
||
| # Perform an initial successful POST request | ||
| url = "https://api.agentops.ai/health" | ||
| payload = {"key": "value"} | ||
| HttpClient.post(url, payload) | ||
|
|
||
| # Check that both failed requests (POST and GET) in the DLQ were retried and removed | ||
| self.assertEqual(len(dead_letter_queue.get_all()), 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤖 Enhancement:
Expand DLQ Retry Test Coverage
Add tests for DLQ retries with various error types and retry failures.
🔧 Suggested Code Diff:
+ @patch("requests.Session")
+ def test_dlq_retry_handles_various_errors(self, mock_session, mock_open_file):
+ # Mock different error responses
+ mock_response_timeout = requests.exceptions.Timeout()
+ mock_response_http_error = requests.exceptions.HTTPError()
+
+ mock_session_instance = mock_session.return_value
+ mock_session_instance.post.side_effect = [
+ mock_response_timeout, # First retry fails with timeout
+ mock_response_http_error, # Second retry fails with HTTP error
+ ]
+
+ # Add a failed request to the DLQ
+ failed_request = {
+ "url": "https://api.agentops.ai/health",
+ "payload": {"key": "value1"},
+ "api_key": "API_KEY_1",
+ "parent_key": None,
+ "jwt": None,
+ "error_type": "Timeout",
+ }
+ dead_letter_queue.add(failed_request)
+
+ # Perform an initial POST request
+ url = "https://api.agentops.ai/health"
+ payload = {"key": "value"}
+ HttpClient.post(url, payload)
+
+ # Check that the failed request is still in the DLQ
+ self.assertEqual(len(dead_letter_queue.get_all()), 1)📝 Committable Code Suggestion
‼️ Ensure you review the code suggestion before committing it to the branch. Make sure it replaces the highlighted code, contains no missing lines, and has no issues with indentation.
| @patch("requests.Session") | |
| def test_dlq_retry_successfully_retries_post_and_get( | |
| self, mock_session, mock_open_file | |
| ): | |
| # Mock successful POST and GET responses for DLQ retries | |
| mock_response_success = Mock() | |
| mock_response_success.status_code = 200 | |
| mock_response_success.json.return_value = {"message": "Success"} | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.post.side_effect = [ | |
| mock_response_success, # The initial post succeeds | |
| mock_response_success, # The DLQ post retry succeeds | |
| ] | |
| mock_session_instance.get.side_effect = [ | |
| mock_response_success, # The DLQ get retry succeeds | |
| ] | |
| # Manually add failed POST and GET requests to the DLQ | |
| failed_post_request = { | |
| "url": "https://api.agentops.ai/health", | |
| "payload": {"key": "value1"}, | |
| "api_key": "API_KEY_1", | |
| "parent_key": None, | |
| "jwt": None, | |
| "error_type": "Timeout", | |
| } | |
| failed_get_request = { | |
| "url": "https://api.agentops.ai/health", | |
| "payload": None, # GET request | |
| "api_key": "API_KEY_2", | |
| "parent_key": None, | |
| "jwt": None, | |
| "error_type": "Timeout", | |
| } | |
| dead_letter_queue.add(failed_post_request) | |
| dead_letter_queue.add(failed_get_request) | |
| # Perform an initial successful POST request | |
| url = "https://api.agentops.ai/health" | |
| payload = {"key": "value"} | |
| HttpClient.post(url, payload) | |
| # Check that both failed requests (POST and GET) in the DLQ were retried and removed | |
| self.assertEqual(len(dead_letter_queue.get_all()), 0) | |
| @patch("requests.Session") | |
| def test_dlq_retry_successfully_retries_post_and_get(self, mock_session, mock_open_file): | |
| # Mock successful POST and GET responses for DLQ retries | |
| mock_response_success = Mock() | |
| mock_response_success.status_code = 200 | |
| mock_response_success.json.return_value = {"message": "Success"} | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.post.side_effect = [ | |
| mock_response_success, # The initial post succeeds | |
| mock_response_success, # The DLQ post retry succeeds | |
| ] | |
| mock_session_instance.get.side_effect = [ | |
| mock_response_success, # The DLQ get retry succeeds | |
| ] | |
| # Manually add failed POST and GET requests to the DLQ | |
| failed_post_request = { | |
| "url": "https://api.agentops.ai/health", | |
| "payload": {"key": "value1"}, | |
| "api_key": "API_KEY_1", | |
| "parent_key": None, | |
| "jwt": None, | |
| "error_type": "Timeout", | |
| } | |
| failed_get_request = { | |
| "url": "https://api.agentops.ai/health", | |
| "payload": None, # GET request | |
| "api_key": "API_KEY_2", | |
| "parent_key": None, | |
| "jwt": None, | |
| "error_type": "Timeout", | |
| } | |
| dead_letter_queue.add(failed_post_request) | |
| dead_letter_queue.add(failed_get_request) | |
| # Perform an initial successful POST request | |
| url = "https://api.agentops.ai/health" | |
| payload = {"key": "value"} | |
| HttpClient.post(url, payload) | |
| # Check that both failed requests (POST and GET) in the DLQ were retried and removed | |
| self.assertEqual(len(dead_letter_queue.get_all()), 0) | |
| @patch("requests.Session") | |
| def test_dlq_retry_handles_various_errors(self, mock_session, mock_open_file): | |
| # Mock different error responses | |
| mock_response_timeout = requests.exceptions.Timeout() | |
| mock_response_http_error = requests.exceptions.HTTPError() | |
| mock_session_instance = mock_session.return_value | |
| mock_session_instance.post.side_effect = [ | |
| mock_response_timeout, # First retry fails with timeout | |
| mock_response_http_error, # Second retry fails with HTTP error | |
| ] | |
| # Add a failed request to the DLQ | |
| failed_request = { | |
| "url": "https://api.agentops.ai/health", | |
| "payload": {"key": "value1"}, | |
| "api_key": "API_KEY_1", | |
| "parent_key": None, | |
| "jwt": None, | |
| "error_type": "Timeout", | |
| } | |
| dead_letter_queue.add(failed_request) | |
| # Perform an initial POST request | |
| url = "https://api.agentops.ai/health" | |
| payload = {"key": "value"} | |
| HttpClient.post(url, payload) | |
| # Check that the failed request is still in the DLQ | |
| self.assertEqual(len(dead_letter_queue.get_all()), 1) |
🔍 Review Summary
Release Note
Purpose:
Changes:
Enhancement:
HttpClientclass to manage and retry failed HTTP requests, enhancing system robustness.Test:
Bug Fix:
multi_session_llm.pyfor accurate session ID display.Chore:
Impact:
Original Description
📥 Pull Request
📘 Description
Briefly describe the changes you've made.
🧪 Testing
Describe the tests you performed to validate your changes.