Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy of PR #385 - Dead letter queue #24

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/python-testing.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ jobs:
matrix:
python-version: [3.7,3.8,3.9,3.10,3.11,3.12]

env:
ENVIRONMENT: test

steps:
- uses: actions/checkout@v2
- name: Set up Python
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ cython_debug/
.DS_Store

agentops_time_travel.json
.agentops_time_travel.yaml
.agentops_time_travel.yaml
.agentops/
2 changes: 2 additions & 0 deletions agentops/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
from uuid import UUID

from .log_config import logger
from .singleton import singleton


@singleton
class Configuration:
def __init__(self):
self.api_key: Optional[str] = None
Comment on lines 2 to 11

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

ℹ️ Design Pattern Implementation

Ensure Thread-Safe Singleton Implementation

The Configuration class has been updated to use the singleton pattern, which ensures that only one instance of the class is created throughout the application. This is beneficial for managing configuration settings consistently across different parts of the application. However, ensure that the singleton implementation in singleton.py is thread-safe, especially if the application is multi-threaded.

Expand Down
18 changes: 17 additions & 1 deletion agentops/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import http.client
import json
from importlib.metadata import version, PackageNotFoundError

from .log_config import logger
from uuid import UUID
from importlib.metadata import version
import os


def get_ISO_time():
Expand Down Expand Up @@ -179,3 +179,19 @@ def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)

return wrapper


def ensure_dead_letter_queue():
# Define file path
file_path = os.path.join(".agentops", "dead_letter_queue.json")

# Check if directory exists
if not os.path.exists(".agentops"):
os.makedirs(".agentops")

# Check if file exists
if not os.path.isfile(file_path):
with open(file_path, "w") as f:
json.dump({"messages": []}, f)

return file_path
223 changes: 176 additions & 47 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
from datetime import datetime
from enum import Enum
from typing import Optional
from typing import Optional, List, Union

import jwt
from requests.adapters import Retry, HTTPAdapter
import requests
from agentops.log_config import logger
from .config import Configuration

from .exceptions import ApiServerException
from dotenv import load_dotenv
import os

from .helpers import ensure_dead_letter_queue, filter_unjsonable, safe_serialize
import json

load_dotenv()

JSON_HEADER = {"Content-Type": "application/json; charset=UTF-8", "Accept": "*/*"}

Expand All @@ -21,8 +33,49 @@ class HttpStatus(Enum):
UNKNOWN = -1


class Response:
class DeadLetterQueue:
def __init__(self):
self.queue: List[dict] = []
self.is_testing = os.environ.get("ENVIRONMENT") == "test"

# if not self.is_testing:
self.file_path = ensure_dead_letter_queue()

def read_queue(self):
if not self.is_testing:
with open(self.file_path, "r") as f:
return json.load(f)["messages"]
else:
return []

def write_queue(self):
if not self.is_testing:
with open(self.file_path, "w") as f:
json.dump({"messages": safe_serialize(self.queue)}, f)
Comment on lines +44 to +54
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Optimize dead letter queue file I/O operations

Solution: Optimize the dead letter queue implementation by caching the queue in memory and only writing to the file when necessary (e.g., on application shutdown or when the queue reaches a certain size). This will reduce the number of file I/O operations and improve the overall performance of the system.
!! Make sure the following suggestion is correct before committing it !!

Suggested change
def read_queue(self):
if not self.is_testing:
with open(self.file_path, "r") as f:
return json.load(f)["messages"]
else:
return []
def write_queue(self):
if not self.is_testing:
with open(self.file_path, "w") as f:
json.dump({"messages": safe_serialize(self.queue)}, f)
['def __init__(self):', ' self.queue: List[dict] =[]', ' self.is_testing = os.environ.get("ENVIRONMENT") == "test"', ' if not self.is_testing:', ' self.file_path = ensure_dead_letter_queue()', ' self.queue = self._load_queue_from_file()', '', 'def _load_queue_from_file(self):', ' with open(self.file_path, "r") as f:', ' return json.load(f)["messages"]', '', 'def write_queue(self):', ' if not self.is_testing:', ' with open(self.file_path, "w") as f:', ' json.dump({"messages": safe_serialize(self.queue)}, f)', ' self.queue.clear()']


def add(self, request_data: dict):
if not self.is_testing:
self.queue.append(request_data)
self.write_queue()

def get_all(self) -> List[dict]:
return self.queue

def remove(self, request_data: dict):
if not self.is_testing:
if request_data in self.queue:
self.queue.remove(request_data)
self.write_queue()

def clear(self):
self.queue.clear()
self.write_queue()


dead_letter_queue = DeadLetterQueue()


class Response:
def __init__(
self, status: HttpStatus = HttpStatus.UNKNOWN, body: Optional[dict] = None
):
Expand Down Expand Up @@ -57,15 +110,13 @@ def get_status(code: int) -> HttpStatus:


class HttpClient:

@staticmethod
def post(
url: str,
payload: bytes,
api_key: Optional[str] = None,
parent_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
token: Optional[str] = None,
) -> Response:
result = Response()
try:
Expand All @@ -79,53 +130,62 @@ def post(
if parent_key is not None:
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"
if token is not None:
decoded_jwt = jwt.decode(
token,
algorithms=["HS256"],
options={"verify_signature": False},
)

# if token is expired, reauth
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now():
new_jwt = reauthorize_jwt(
token,
api_key,
decoded_jwt["session_id"],
)
token = new_jwt
Comment on lines +134 to +147
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Ensure JWT token is properly validated and refreshed

Solution: Implement a more robust JWT token validation and refresh mechanism. When the token is found to be expired, attempt to refresh the token using the reauthorize_jwt function. If the refresh fails, log the error and handle the situation appropriately (e.g., prompt the user to re-authenticate).
!! Make sure the following suggestion is correct before committing it !!

Suggested change
decoded_jwt = jwt.decode(
token,
algorithms=["HS256"],
options={"verify_signature": False},
)
# if token is expired, reauth
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now():
new_jwt = reauthorize_jwt(
token,
api_key,
decoded_jwt["session_id"],
)
token = new_jwt
['try:', ' decoded_jwt = jwt.decode(', ' token,', ' algorithms=["HS256"],', ' options={"verify_signature": True},', ' )', 'except jwt.ExpiredSignatureError:', ' new_jwt = reauthorize_jwt(', ' token,', ' api_key,', ' decoded_jwt["session_id"],', ' )', ' if new_jwt is None:', ' # Handle token refresh failure (e.g., prompt user to re-authenticate)', ' raise ApiServerException("JWT token refresh failed.")', ' token = new_jwt', 'except jwt.InvalidTokenError:', ' # Handle invalid token (e.g., prompt user to re-authenticate)', ' raise ApiServerException("Invalid JWT token.")']


JSON_HEADER["Authorization"] = f"Bearer {token}"

res = request_session.post(
url, data=payload, headers=JSON_HEADER, timeout=20
)

result.parse(res)
except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
raise ApiServerException(
"Could not reach API server - connection timed out"

if result.code == 200:
HttpClient._retry_dlq_requests()

except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e:
HttpClient._handle_failed_request(
url, payload, api_key, parent_key, token, type(e).__name__
)
except requests.exceptions.HTTPError as e:
try:
result.parse(e.response)
except Exception:
result = Response()
result.code = e.response.status_code
result.status = Response.get_status(e.response.status_code)
result.body = {"error": str(e)}
raise ApiServerException(f"HTTPError: {e}")
raise ApiServerException(f"{type(e).__name__}: {e}")
except requests.exceptions.RequestException as e:
result.body = {"error": str(e)}
HttpClient._handle_failed_request(
url, payload, api_key, parent_key, token, "RequestException"
)
raise ApiServerException(f"RequestException: {e}")

if result.code == 401:
if result.body.get("message") == "Expired Token":
raise ApiServerException(f"API server: jwt token expired.")
raise ApiServerException(
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects"
)
Comment on lines 130 to 176

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

🔒 Security Suggestion

Verify JWT Signature for Security

The current implementation decodes the JWT without verifying its signature, which can lead to security vulnerabilities. It's crucial to verify the JWT signature to ensure the token's authenticity and integrity.

+                decoded_jwt = jwt.decode(
+                    token,
+                    algorithms=["HS256"],
+                    options={"verify_signature": True},
+                )
Commitable Code Suggestion:
Suggested change
if parent_key is not None:
JSON_HEADER["X-Agentops-Parent-Key"] = parent_key
if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"
if token is not None:
decoded_jwt = jwt.decode(
token,
algorithms=["HS256"],
options={"verify_signature": False},
)
# if token is expired, reauth
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now():
new_jwt = reauthorize_jwt(
token,
api_key,
decoded_jwt["session_id"],
)
token = new_jwt
JSON_HEADER["Authorization"] = f"Bearer {token}"
res = request_session.post(
url, data=payload, headers=JSON_HEADER, timeout=20
)
result.parse(res)
except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
raise ApiServerException(
"Could not reach API server - connection timed out"
if result.code == 200:
HttpClient._retry_dlq_requests()
except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e:
HttpClient._handle_failed_request(
url, payload, api_key, parent_key, token, type(e).__name__
)
except requests.exceptions.HTTPError as e:
try:
result.parse(e.response)
except Exception:
result = Response()
result.code = e.response.status_code
result.status = Response.get_status(e.response.status_code)
result.body = {"error": str(e)}
raise ApiServerException(f"HTTPError: {e}")
raise ApiServerException(f"{type(e).__name__}: {e}")
except requests.exceptions.RequestException as e:
result.body = {"error": str(e)}
HttpClient._handle_failed_request(
url, payload, api_key, parent_key, token, "RequestException"
)
raise ApiServerException(f"RequestException: {e}")
if result.code == 401:
if result.body.get("message") == "Expired Token":
raise ApiServerException(f"API server: jwt token expired.")
raise ApiServerException(
f"API server: invalid API key: {api_key}. Find your API key at https://app.agentops.ai/settings/projects"
)
if token is not None:
decoded_jwt = jwt.decode(
token,
algorithms=["HS256"],
options={"verify_signature": True},
)
# if token is expired, reauth
if datetime.fromtimestamp(decoded_jwt["exp"]) < datetime.now():
new_jwt = reauthorize_jwt(
token,
api_key,
decoded_jwt["session_id"],
)
token = new_jwt
JSON_HEADER["Authorization"] = f"Bearer {token}"

if result.code == 400:
if "message" in result.body:
raise ApiServerException(f"API server: {result.body['message']}")
else:
raise ApiServerException(f"API server: {result.body}")
if result.code == 500:
raise ApiServerException("API server: - internal server error")

return result
+ HttpClient._handle_failed_request(
+ url, payload, api_key, parent_key, token, "ServerError"
+ )

@staticmethod
def get(
url: str,
api_key: Optional[str] = None,
jwt: Optional[str] = None,
header=None,
token: Optional[str] = None,
) -> Response:
result = Response()
try:
Expand All @@ -136,29 +196,25 @@ def get(
if api_key is not None:
JSON_HEADER["X-Agentops-Api-Key"] = api_key

if jwt is not None:
JSON_HEADER["Authorization"] = f"Bearer {jwt}"
if token is not None:
JSON_HEADER["Authorization"] = f"Bearer {token}"

res = request_session.get(url, headers=JSON_HEADER, timeout=20)

result.parse(res)
except requests.exceptions.Timeout:
result.code = 408
result.status = HttpStatus.TIMEOUT
raise ApiServerException(
"Could not reach API server - connection timed out"

if result.code == 200:
HttpClient._retry_dlq_requests()

except (requests.exceptions.Timeout, requests.exceptions.HTTPError) as e:
HttpClient._handle_failed_request(
url, None, api_key, None, token, type(e).__name__
)
except requests.exceptions.HTTPError as e:
try:
result.parse(e.response)
except Exception:
result = Response()
result.code = e.response.status_code
result.status = Response.get_status(e.response.status_code)
result.body = {"error": str(e)}
raise ApiServerException(f"HTTPError: {e}")
raise ApiServerException(f"{type(e).__name__}: {e}")
except requests.exceptions.RequestException as e:
result.body = {"error": str(e)}
HttpClient._handle_failed_request(
url, None, api_key, None, token, "RequestException"
)
raise ApiServerException(f"RequestException: {e}")

if result.code == 401:
Expand All @@ -171,6 +227,79 @@ def get(
else:
raise ApiServerException(f"API server: {result.body}")
if result.code == 500:
HttpClient._handle_failed_request(
url, None, api_key, None, jwt, "ServerError"
)
raise ApiServerException("API server: - internal server error")

return result

@staticmethod
def _retry_dlq_requests():
"""Retry requests in the DLQ"""
for failed_request in dead_letter_queue.get_all():
dead_letter_queue.clear()
try:
if "payload" in failed_request:
# Retry POST request from DLQ
HttpClient.post(
failed_request["url"],
json.dumps(filter_unjsonable(failed_request["payload"])).encode(
"utf-8"
),
failed_request["api_key"],
failed_request["parent_key"],
failed_request["jwt"],
)
else:
# Retry GET request from DLQ
HttpClient.get(
failed_request["url"],
failed_request["api_key"],
failed_request["jwt"],
)
except ApiServerException as e:
dead_letter_queue.add(failed_request)
# If it still fails, keep it in the DLQ
except Exception as e:
dead_letter_queue.add(failed_request)

@staticmethod
def _handle_failed_request(
url: str,
payload: Optional[bytes],
api_key: Optional[str],
parent_key: Optional[str],
jwt: Optional[str],
error_type: str,
):
dead_letter_queue.add(
{
"url": url,
"payload": payload,
"api_key": api_key,
"parent_key": parent_key,
"jwt": jwt,
"error_type": error_type,
}
)

logger.warning(
f"An error occurred while communicating with the server: {error_type}"
)
Comment on lines +288 to +289
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment: Potential exposure of sensitive data in logs.

Solution: Avoid logging sensitive data. Use placeholders or obfuscate sensitive information in logs.
!! Make sure the following suggestion is correct before committing it !!

Suggested change
f"An error occurred while communicating with the server: {error_type}"
)
logger.warning(f"An error occurred while communicating with the server:{error_type}. Check logs for details.")



def reauthorize_jwt(old_jwt: str, api_key: str, session_id: str) -> Union[str, None]:
payload = {"jwt": old_jwt, "session_id": session_id}
serialized_payload = json.dumps(filter_unjsonable(payload)).encode("utf-8")
config = Configuration()
res = HttpClient.post(
f"{config.endpoint}/v2/reauthorize_jwt",
serialized_payload,
api_key,
)

if res.code != 200:
return None

return res.body.get("jwt", None)
8 changes: 4 additions & 4 deletions agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def format_duration(start_time, end_time):
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not end session - {e}")
Comment on lines 125 to 131

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

🔒 Security Suggestion

Ensure Secure Management of Token

The change from jwt to token in the HttpClient method calls is a critical update that enhances security by ensuring the correct parameter is used for authentication. However, ensure that the token is securely managed and refreshed as needed to prevent unauthorized access.

Commitable Code Suggestion:
Suggested change
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not end session - {e}")
- jwt=self.jwt,
+ token=self.jwt,

Expand Down Expand Up @@ -289,7 +289,7 @@ def _update_session(self) -> None:
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not update session - {e}")
Comment on lines 289 to 295

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

🔒 Security Suggestion

Validate and Refresh Token for Secure Sessions

The replacement of jwt with token in the _update_session method is crucial for maintaining secure communication. Verify that the token is valid and refreshed appropriately to avoid session hijacking or unauthorized access.

Commitable Code Suggestion:
Suggested change
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not update session - {e}")
def _update_session(self) -> None:
try:
res = HttpClient.post(
f"{self.config.endpoint}/v2/update_session",
json.dumps(filter_unjsonable(payload)).encode("utf-8"),
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not update session - {e}")

Expand All @@ -311,7 +311,7 @@ def _flush_queue(self) -> None:
HttpClient.post(
f"{self.config.endpoint}/v2/create_events",
serialized_payload,
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not post events - {e}")
Comment on lines 311 to 317

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

🔒 Security Suggestion

Secure Token Management for Event Posting

In the _flush_queue method, the change from jwt to token is important for secure event posting. Ensure that the token is valid and securely stored to prevent potential security breaches.

Commitable Code Suggestion:
Suggested change
HttpClient.post(
f"{self.config.endpoint}/v2/create_events",
serialized_payload,
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not post events - {e}")
def _flush_queue(self) -> None:
HttpClient.post(
f"{self.config.endpoint}/v2/create_events",
serialized_payload,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not post events - {e}")

Expand Down Expand Up @@ -360,7 +360,7 @@ def create_agent(self, name, agent_id):
HttpClient.post(
f"{self.config.endpoint}/v2/create_agent",
serialized_payload,
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not create agent - {e}")
Comment on lines 360 to 366

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entelligence AI Bot Icon Entelligence AI Bot v4

🔒 Security Suggestion

Secure Token Handling for Agent Creation

The update from jwt to token in the create_agent method is a significant security improvement. Ensure that the token is securely handled and refreshed to maintain secure agent creation processes.

Commitable Code Suggestion:
Suggested change
HttpClient.post(
f"{self.config.endpoint}/v2/create_agent",
serialized_payload,
jwt=self.jwt,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not create agent - {e}")
def create_agent(self, name, agent_id):
HttpClient.post(
f"{self.config.endpoint}/v2/create_agent",
serialized_payload,
token=self.jwt,
)
except ApiServerException as e:
return logger.error(f"Could not create agent - {e}")

Expand Down
Loading
Loading