-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy of PR #385 - Dead letter queue #24
base: main
Are you sure you want to change the base?
Conversation
SummaryPurpose:
|
WalkthroughThis update enhances the AgentOps project by introducing a Dead Letter Queue (DLQ) mechanism to handle failed HTTP requests, ensuring they are retried. The Changes
|
Files selected (6)
Files ignored (8)
InstructionsEmoji Descriptions:
Interact with the Bot:
Available Commands:
Tips for Using @Entelligence.AI Effectively:
Need More Help?
|
from uuid import UUID | ||
|
||
from .log_config import logger | ||
from .singleton import singleton | ||
|
||
|
||
@singleton | ||
class Configuration: | ||
def __init__(self): | ||
self.api_key: Optional[str] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ℹ️ Design Pattern Implementation
Ensure Thread-Safe Singleton Implementation
The Configuration
class has been updated to use the singleton pattern, which ensures that only one instance of the class is created throughout the application. This is beneficial for managing configuration settings consistently across different parts of the application. However, ensure that the singleton implementation in singleton.py
is thread-safe, especially if the application is multi-threaded.
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not end session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Ensure Secure Management of Token
The change from jwt
to token
in the HttpClient
method calls is a critical update that enhances security by ensuring the correct parameter is used for authentication. However, ensure that the token
is securely managed and refreshed as needed to prevent unauthorized access.
Commitable Code Suggestion:
res = HttpClient.post( | |
f"{self.config.endpoint}/v2/update_session", | |
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | |
jwt=self.jwt, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not end session - {e}") | |
- jwt=self.jwt, | |
+ token=self.jwt, |
HttpClient.post( | ||
f"{self.config.endpoint}/v2/create_agent", | ||
serialized_payload, | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not create agent - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Secure Token Handling for Agent Creation
The update from jwt
to token
in the create_agent
method is a significant security improvement. Ensure that the token
is securely handled and refreshed to maintain secure agent creation processes.
Commitable Code Suggestion:
HttpClient.post( | |
f"{self.config.endpoint}/v2/create_agent", | |
serialized_payload, | |
jwt=self.jwt, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not create agent - {e}") | |
def create_agent(self, name, agent_id): | |
HttpClient.post( | |
f"{self.config.endpoint}/v2/create_agent", | |
serialized_payload, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not create agent - {e}") |
HttpClient.post( | ||
f"{self.config.endpoint}/v2/create_events", | ||
serialized_payload, | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not post events - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Secure Token Management for Event Posting
In the _flush_queue
method, the change from jwt
to token
is important for secure event posting. Ensure that the token
is valid and securely stored to prevent potential security breaches.
Commitable Code Suggestion:
HttpClient.post( | |
f"{self.config.endpoint}/v2/create_events", | |
serialized_payload, | |
jwt=self.jwt, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not post events - {e}") | |
def _flush_queue(self) -> None: | |
HttpClient.post( | |
f"{self.config.endpoint}/v2/create_events", | |
serialized_payload, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not post events - {e}") |
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | ||
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | ||
|
||
print("session_id_1: {}".format(session_1)) | ||
print("session_id_2: {}".format(session_2)) | ||
print("session_id_1: {}".format(session_1.session_id)) | ||
print("session_id_2: {}".format(session_2.session_id)) | ||
|
||
messages = [{"role": "user", "content": "Hello"}] | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💻 Code Improvement
Ensure session_id
is initialized in start_session
The change from accessing the session object directly to accessing the session_id
attribute is a good improvement for clarity and correctness. However, ensure that the session_id
attribute is always available and correctly initialized in the start_session
method to prevent potential AttributeError
.
Commitable Code Suggestion:
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | |
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | |
print("session_id_1: {}".format(session_1)) | |
print("session_id_2: {}".format(session_2)) | |
print("session_id_1: {}".format(session_1.session_id)) | |
print("session_id_2: {}".format(session_2.session_id)) | |
messages = [{"role": "user", "content": "Hello"}] | |
session_1 = agentops.start_session(tags=["multi-session-test-1"]) | |
session_2 = agentops.start_session(tags=["multi-session-test-2"]) | |
print("session_id_1: {}".format(session_1.session_id)) | |
print("session_id_2: {}".format(session_2.session_id)) | |
messages = [{"role": "user", "content": "Hello"}] |
res = HttpClient.post( | ||
f"{self.config.endpoint}/v2/update_session", | ||
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | ||
jwt=self.jwt, | ||
token=self.jwt, | ||
) | ||
except ApiServerException as e: | ||
return logger.error(f"Could not update session - {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Validate and Refresh Token for Secure Sessions
The replacement of jwt
with token
in the _update_session
method is crucial for maintaining secure communication. Verify that the token
is valid and refreshed appropriately to avoid session hijacking or unauthorized access.
Commitable Code Suggestion:
res = HttpClient.post( | |
f"{self.config.endpoint}/v2/update_session", | |
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | |
jwt=self.jwt, | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not update session - {e}") | |
def _update_session(self) -> None: | |
try: | |
res = HttpClient.post( | |
f"{self.config.endpoint}/v2/update_session", | |
json.dumps(filter_unjsonable(payload)).encode("utf-8"), | |
token=self.jwt, | |
) | |
except ApiServerException as e: | |
return logger.error(f"Could not update session - {e}") |
### Purpose | ||
# test an edge case where a request is retried after the jwt has expired | ||
import time | ||
from datetime import datetime | ||
|
||
### SETUP | ||
# Run the API server locally | ||
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | ||
# Run this script | ||
|
||
### Plan | ||
# The first request should succeed and return a JWT | ||
# We'll manually add a failed request to the DLQ with the expired JWT | ||
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | ||
|
||
import agentops | ||
from agentops import ActionEvent | ||
from agentops.helpers import safe_serialize, get_ISO_time | ||
from agentops.http_client import dead_letter_queue, HttpClient | ||
|
||
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | ||
agentops.init( | ||
endpoint="http://localhost:8000", | ||
api_key=api_key, | ||
auto_start_session=False, | ||
default_tags=["dead-letter-queue-test"], | ||
) | ||
|
||
# create session | ||
session = agentops.start_session() | ||
|
||
# add failed request to DLQ | ||
event = ActionEvent() | ||
event.end_timestamp = get_ISO_time() | ||
|
||
failed_request = { | ||
"url": "http://localhost:8000/v2/create_events", | ||
"payload": {"events": [event.__dict__]}, | ||
"api_key": str(api_key), | ||
"parent_key": None, | ||
"jwt": session.jwt, | ||
"error_type": "Timeout", | ||
} | ||
# failed_request = safe_serialize(failed_request).encode("utf-8") | ||
|
||
dead_letter_queue.add(failed_request) | ||
assert len(dead_letter_queue.get_all()) == 1 | ||
|
||
# wait for the JWT to expire | ||
time.sleep(3) | ||
|
||
# retry | ||
HttpClient()._retry_dlq_requests() | ||
session.end_session(end_state="Success") | ||
|
||
# check if the failed request is still in the DLQ | ||
assert dead_letter_queue.get_all() == [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Avoid Hardcoding Sensitive Information
The test script includes a hardcoded API key, which poses a security risk if the code is shared or exposed. Consider using environment variables or a secure vault to manage sensitive information.
- api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea"
+ api_key = os.getenv('API_KEY')
Commitable Code Suggestion:
### Purpose | |
# test an edge case where a request is retried after the jwt has expired | |
import time | |
from datetime import datetime | |
### SETUP | |
# Run the API server locally | |
# In utils.py -> generate_jwt -> set the jwt expiration to 0.001 | |
# Run this script | |
### Plan | |
# The first request should succeed and return a JWT | |
# We'll manually add a failed request to the DLQ with the expired JWT | |
# When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again | |
import agentops | |
from agentops import ActionEvent | |
from agentops.helpers import safe_serialize, get_ISO_time | |
from agentops.http_client import dead_letter_queue, HttpClient | |
api_key = "492f0ee6-0b7d-40a6-af86-22d89c7c5eea" | |
agentops.init( | |
endpoint="http://localhost:8000", | |
api_key=api_key, | |
auto_start_session=False, | |
default_tags=["dead-letter-queue-test"], | |
) | |
# create session | |
session = agentops.start_session() | |
# add failed request to DLQ | |
event = ActionEvent() | |
event.end_timestamp = get_ISO_time() | |
failed_request = { | |
"url": "http://localhost:8000/v2/create_events", | |
"payload": {"events": [event.__dict__]}, | |
"api_key": str(api_key), | |
"parent_key": None, | |
"jwt": session.jwt, | |
"error_type": "Timeout", | |
} | |
# failed_request = safe_serialize(failed_request).encode("utf-8") | |
dead_letter_queue.add(failed_request) | |
assert len(dead_letter_queue.get_all()) == 1 | |
# wait for the JWT to expire | |
time.sleep(3) | |
# retry | |
HttpClient()._retry_dlq_requests() | |
session.end_session(end_state="Success") | |
# check if the failed request is still in the DLQ | |
assert dead_letter_queue.get_all() == [] | |
api_key = os.getenv('API_KEY') |
ℹ️ Logic Error
Verify DLQ Request Removal Logic
The test script assumes that the failed request will be removed from the DLQ after retrying. Ensure that the HttpClient()._retry_dlq_requests()
method correctly handles the removal of successfully retried requests from the DLQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@entelligence.ai how does the DLQ request logic actually work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@EntelligenceAI how does the DLQ request logic work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI how does the DLQ work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI how does the DLQ work?
The DLQ works by capturing and storing requests that have failed to be processed. When reattempting, the http_client should identify the expired jwt and reauthorize it before sending again. The process ensures that failed requests are retried and processed successfully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI how does the DLQ work?
The DLQ (Dead-Letter Queue) is used to handle failed requests in a system. When a request fails, it is added to the DLQ for later retry. The retry logic should identify and reauthorize expired JWTs before sending the request again. It's important to verify that the HttpClient()._retry_dlq_requests()
method handles the removal of successfully retried requests from the DLQ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI what is DLQ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI what is DLQ?
The Dead Letter Queue (DLQ) is a mechanism used to store failed messages in a message queue system. It allows for the identification and handling of messages that were unable to be processed initially. If you have any further questions, feel free to ask.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI what is DLQ and how does it work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Entelligence.AI what is DLQ and how does it work?
A DLQ, or Dead Letter Queue, is a mechanism used to manage and handle messages or requests that have failed to be processed successfully. It acts as a holding area for these failed messages, allowing for further analysis and potential reprocessing. In the context of the code changes, it appears that the script is testing the handling of failed requests by adding them to a DLQ and then retrying them. The script assumes that the failed request will be removed from the DLQ after retrying. However, it's essential to verify that the retry logic correctly handles the removal of successfully retried requests from the DLQ.
if parent_key is not None: | ||
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | ||
|
||
if jwt is not None: | ||
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | ||
if token is not None: | ||
decoded_jwt = jwt.decode( | ||
token, | ||
algorithms=["HS256"], | ||
options={"verify_signature": False}, | ||
) | ||
|
||
# if token is expired, reauth | ||
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | ||
new_jwt = reauthorize_jwt( | ||
token, | ||
api_key, | ||
decoded_jwt["session_id"], | ||
) | ||
token = new_jwt | ||
|
||
JSON_HEADER["Authorization"] = f"Bearer {token}" | ||
|
||
res = request_session.post( | ||
url, data=payload, headers=JSON_HEADER, timeout=20 | ||
) | ||
|
||
result.parse(res) | ||
except requests.exceptions.Timeout: | ||
result.code = 408 | ||
result.status = HttpStatus.TIMEOUT | ||
raise ApiServerException( | ||
"Could not reach API server - connection timed out" | ||
|
||
if result.code == 200: | ||
HttpClient._retry_dlq_requests() | ||
|
||
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, type(e).__name__ | ||
) | ||
except requests.exceptions.HTTPError as e: | ||
try: | ||
result.parse(e.response) | ||
except Exception: | ||
result = Response() | ||
result.code = e.response.status_code | ||
result.status = Response.get_status(e.response.status_code) | ||
result.body = {"error": str(e)} | ||
raise ApiServerException(f"HTTPError: {e}") | ||
raise ApiServerException(f"{type(e).__name__}: {e}") | ||
except requests.exceptions.RequestException as e: | ||
result.body = {"error": str(e)} | ||
HttpClient._handle_failed_request( | ||
url, payload, api_key, parent_key, token, "RequestException" | ||
) | ||
raise ApiServerException(f"RequestException: {e}") | ||
|
||
if result.code == 401: | ||
if result.body.get("message") == "Expired Token": | ||
raise ApiServerException(f"API server: jwt token expired.") | ||
raise ApiServerException( | ||
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔒 Security Suggestion
Verify JWT Signature for Security
The current implementation decodes the JWT without verifying its signature, which can lead to security vulnerabilities. It's crucial to verify the JWT signature to ensure the token's authenticity and integrity.
+ decoded_jwt = jwt.decode(
+ token,
+ algorithms=["HS256"],
+ options={"verify_signature": True},
+ )
Commitable Code Suggestion:
if parent_key is not None: | |
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key | |
if jwt is not None: | |
JSON_HEADER["Authorization"] = f"Bearer {jwt}" | |
if token is not None: | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": False}, | |
) | |
# if token is expired, reauth | |
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | |
new_jwt = reauthorize_jwt( | |
token, | |
api_key, | |
decoded_jwt["session_id"], | |
) | |
token = new_jwt | |
JSON_HEADER["Authorization"] = f"Bearer {token}" | |
res = request_session.post( | |
url, data=payload, headers=JSON_HEADER, timeout=20 | |
) | |
result.parse(res) | |
except requests.exceptions.Timeout: | |
result.code = 408 | |
result.status = HttpStatus.TIMEOUT | |
raise ApiServerException( | |
"Could not reach API server - connection timed out" | |
if result.code == 200: | |
HttpClient._retry_dlq_requests() | |
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e: | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, type(e).__name__ | |
) | |
except requests.exceptions.HTTPError as e: | |
try: | |
result.parse(e.response) | |
except Exception: | |
result = Response() | |
result.code = e.response.status_code | |
result.status = Response.get_status(e.response.status_code) | |
result.body = {"error": str(e)} | |
raise ApiServerException(f"HTTPError: {e}") | |
raise ApiServerException(f"{type(e).__name__}: {e}") | |
except requests.exceptions.RequestException as e: | |
result.body = {"error": str(e)} | |
HttpClient._handle_failed_request( | |
url, payload, api_key, parent_key, token, "RequestException" | |
) | |
raise ApiServerException(f"RequestException: {e}") | |
if result.code == 401: | |
if result.body.get("message") == "Expired Token": | |
raise ApiServerException(f"API server: jwt token expired.") | |
raise ApiServerException( | |
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects" | |
) | |
if token is not None: | |
decoded_jwt = jwt.decode( | |
token, | |
algorithms=["HS256"], | |
options={"verify_signature": True}, | |
) | |
# if token is expired, reauth | |
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now(): | |
new_jwt = reauthorize_jwt( | |
token, | |
api_key, | |
decoded_jwt["session_id"], | |
) | |
token = new_jwt | |
JSON_HEADER["Authorization"] = f"Bearer {token}" |
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
🔍 Code Review Summary❗ Attention Required: This push has potential issues. 🚨 Overview
🚨 Critical Issuessecurity (2 issues)1. Potential exposure of sensitive data in logs.📁 File: agentops/http_client.py 💡 Solution: Current Code: logger.warning(f"An error occurred while communicating with the server:{error_type}") Suggested Code: logger.warning(f"An error occurred while communicating with the server:{error_type}. Check logs for details.") 2. Potential exposure of sensitive information in error messages.📁 File: tests/test_http_client.py 💡 Solution: Current Code: self.assertIn("invalid API key", str(context.exception)) Suggested Code: self.assertIn("invalid API key", str(context.exception).replace("API_KEY_1", "<REDACTED>"))
Useful Commands
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing the following changes to improve the code.
f"An error occurred while communicating with the server: {error_type}" | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Potential exposure of sensitive data in logs.
Solution: Avoid logging sensitive data. Use placeholders or obfuscate sensitive information in logs.
!! Make sure the following suggestion is correct before committing it !!
f"An error occurred while communicating with the server: {error_type}" | |
) | |
logger.warning(f"An error occurred while communicating with the server:{error_type}. Check logs for details.") |
with self.assertRaises(ApiServerException) as context: | ||
HttpClient.post(url, payload, api_key="INVALID_KEY") | ||
|
||
self.assertIn("invalid API key", str(context.exception)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Potential exposure of sensitive information in error messages.
Solution: Sanitize error messages to remove any sensitive information before raising exceptions.
!! Make sure the following suggestion is correct before committing it !!
self.assertIn("invalid API key", str(context.exception)) | |
self.assertIn("invalid API key", str(context.exception).replace("API_KEY_1", "<REDACTED>")) |
Update GitHub Actions Workflow
Update the Python testing GitHub Actions workflow to include additional Python versions and set the environment variable for testing.
ENVIRONMENT
environment variable totest
.The updated workflow will now run tests against a wider range of Python versions and indicate that the tests are being run in a test environment.
Original Description
Agentops PR AgentOps-AI#385 - Dead letter queue