Skip to content

Commit

Permalink
persistent dead letter queue
Browse files Browse the repository at this point in the history
  • Loading branch information
bboynton97 committed Sep 18, 2024
1 parent d5cc60f commit 0ed4260
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 29 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,4 +164,5 @@ cython_debug/
.DS_Store

agentops_time_travel.json
.agentops_time_travel.yaml
.agentops_time_travel.yaml
.agentops/
18 changes: 17 additions & 1 deletion agentops/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
import http.client
import json
from importlib.metadata import version, PackageNotFoundError

from .log_config import logger
from uuid import UUID
from importlib.metadata import version
import os


def get_ISO_time():
Expand Down Expand Up @@ -179,3 +179,19 @@ def wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)

return wrapper


def ensure_dead_letter_queue():
# Define file path
file_path = os.path.join(".agentops", "dead_letter_queue.json")

# Check if directory exists
if not os.path.exists(".agentops"):
os.makedirs(".agentops")

Check warning on line 190 in agentops/helpers.py

View check run for this annotation

Codecov / codecov/patch

agentops/helpers.py#L190

Added line #L190 was not covered by tests

# Check if file exists
if not os.path.isfile(file_path):
with open(file_path, "w") as f:
json.dump({"messages": []}, f)

Check warning on line 195 in agentops/helpers.py

View check run for this annotation

Codecov / codecov/patch

agentops/helpers.py#L195

Added line #L195 was not covered by tests

return file_path
25 changes: 22 additions & 3 deletions agentops/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from dotenv import load_dotenv
import os

from .singleton import singleton
from .helpers import ensure_dead_letter_queue
import json

load_dotenv()

Expand All @@ -28,25 +29,43 @@ class HttpStatus(Enum):
UNKNOWN = -1


# @singleton
class DeadLetterQueue:
def __init__(self):
self.queue: List[dict] = []
self.is_testing = os.environ.get("ENVIRONMENT") == "test"

# if not self.is_testing:
self.file_path = ensure_dead_letter_queue()

def read_queue(self):
if not self.is_testing:
with open(self.file_path, "r") as f:
return json.load(f)["messages"]

Check warning on line 43 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L43

Added line #L43 was not covered by tests
else:
return []

Check warning on line 45 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L45

Added line #L45 was not covered by tests

def write_queue(self):
if not self.is_testing:
with open(self.file_path, "w") as f:
json.dump({"messages": self.queue}, f)

def add(self, request_data: dict):
if not self.is_testing:
self.queue.append(request_data)
self.write_queue()

def get_all(self) -> List[dict]:
return self.queue

def remove(self, request_data: dict):
if not self.is_testing:
self.queue.remove(request_data)
if request_data in self.queue:
self.queue.remove(request_data)
self.write_queue()

Check warning on line 64 in agentops/http_client.py

View check run for this annotation

Codecov / codecov/patch

agentops/http_client.py#L63-L64

Added lines #L63 - L64 were not covered by tests

def clear(self):
self.queue.clear()
self.write_queue()


dead_letter_queue = DeadLetterQueue()
Expand Down
55 changes: 31 additions & 24 deletions tests/test_http_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import os
import unittest
from unittest.mock import patch, Mock
from unittest.mock import patch, Mock, mock_open
import requests

from agentops.http_client import (
Expand All @@ -11,6 +11,10 @@
)


@patch("builtins.open", new_callable=mock_open, read_data='{"messages": []}')
# @patch("os.path.exists", return_value=False)
# @patch("os.path.isfile", return_value=False)
# @patch("os.makedirs")
class TestHttpClient(unittest.TestCase):
MAX_RETRIES = 3
RETRY_DELAY = 1
Expand All @@ -19,13 +23,14 @@ def setUp(self):
# Clear DLQ before each test
dead_letter_queue.is_testing = False
dead_letter_queue.clear()
self.addCleanup(patch.stopall)

def tearDown(self):
dead_letter_queue.is_testing = True
dead_letter_queue.clear()

@patch("requests.Session")
def test_post_success(self, mock_session):
def test_post_success(self, mock_session, mock_open_file):
# Mock a successful response
mock_response = Mock()
mock_response.status_code = 200
Expand All @@ -35,7 +40,7 @@ def test_post_success(self, mock_session):
mock_session_instance.post.return_value = mock_response

url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}

response = HttpClient.post(url, payload)

Expand All @@ -44,13 +49,13 @@ def test_post_success(self, mock_session):
self.assertEqual(response.body, {"message": "Success"})

@patch("requests.Session")
def test_post_timeout(self, mock_session):
def test_post_timeout(self, mock_session, mock_open_file):
# Mock a timeout exception
mock_session_instance = mock_session.return_value
mock_session_instance.post.side_effect = requests.exceptions.Timeout

url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}

with self.assertRaises(ApiServerException) as context:
HttpClient.post(url, payload)
Expand All @@ -59,7 +64,7 @@ def test_post_timeout(self, mock_session):
self.assertEqual(len(dead_letter_queue.get_all()), 1)

@patch("requests.Session")
def test_post_http_error(self, mock_session):
def test_post_http_error(self, mock_session, mock_open_file):
# Mock an HTTPError
mock_response = Mock()
mock_response.status_code = 500
Expand All @@ -70,7 +75,7 @@ def test_post_http_error(self, mock_session):
)

url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}

with self.assertRaises(ApiServerException) as context:
HttpClient.post(url, payload)
Expand All @@ -82,7 +87,7 @@ def test_post_http_error(self, mock_session):
self.assertEqual(failed_request["error_type"], "HTTPError")

@patch("requests.Session")
def test_post_invalid_api_key(self, mock_session):
def test_post_invalid_api_key(self, mock_session, mock_open_file):
# Mock a response with invalid API key
mock_response = Mock()
mock_response.status_code = 401
Expand All @@ -92,7 +97,7 @@ def test_post_invalid_api_key(self, mock_session):
mock_session_instance.post.return_value = mock_response

url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}

with self.assertRaises(ApiServerException) as context:
HttpClient.post(url, payload, api_key="INVALID_KEY")
Expand All @@ -101,7 +106,7 @@ def test_post_invalid_api_key(self, mock_session):
self.assertEqual(len(dead_letter_queue.get_all()), 0)

@patch("requests.Session")
def test_get_success(self, mock_session):
def test_get_success(self, mock_session, mock_open_file):
# Mock a successful response
mock_response = Mock()
mock_response.status_code = 200
Expand All @@ -119,7 +124,7 @@ def test_get_success(self, mock_session):
self.assertEqual(response.body, {"message": "Success"})

@patch("requests.Session")
def test_get_timeout(self, mock_session):
def test_get_timeout(self, mock_session, mock_open_file):
# Mock a timeout exception
mock_session_instance = mock_session.return_value
mock_session_instance.get.side_effect = requests.exceptions.Timeout
Expand All @@ -133,7 +138,7 @@ def test_get_timeout(self, mock_session):
self.assertEqual(len(dead_letter_queue.get_all()), 1)

@patch("requests.Session")
def test_get_http_error(self, mock_session):
def test_get_http_error(self, mock_session, mock_open_file):
# Mock an HTTPError
mock_response = Mock()
mock_response.status_code = 500
Expand All @@ -154,7 +159,7 @@ def test_get_http_error(self, mock_session):
self.assertEqual(failed_request["url"], url)
self.assertEqual(failed_request["error_type"], "HTTPError")

def test_clear_dead_letter_queue(self):
def test_clear_dead_letter_queue(self, mock_open_file):
# Add a dummy request to DLQ and clear it
dead_letter_queue.add(
{"url": "https://api.agentops.ai/health", "error_type": "DummyError"}
Expand All @@ -165,7 +170,7 @@ def test_clear_dead_letter_queue(self):
self.assertEqual(len(dead_letter_queue.get_all()), 0)

@patch("requests.Session")
def test_post_success_triggers_dlq_retry(self, mock_session):
def test_post_success_triggers_dlq_retry(self, mock_session, mock_open_file):
# Mock successful POST response for the initial request
mock_response_success = Mock()
mock_response_success.status_code = 200
Expand All @@ -181,15 +186,15 @@ def test_post_success_triggers_dlq_retry(self, mock_session):
# Manually add failed requests to the DLQ
failed_request_1 = {
"url": "https://api.agentops.ai/health",
"payload": b'{"key": "value1"}',
"payload": {"key": "value1"},
"api_key": "API_KEY_1",
"parent_key": None,
"jwt": None,
"error_type": "Timeout",
}
failed_request_2 = {
"url": "https://api.agentops.ai/health",
"payload": b'{"key": "value2"}',
"payload": {"key": "value2"},
"api_key": "API_KEY_2",
"parent_key": None,
"jwt": None,
Expand All @@ -200,14 +205,14 @@ def test_post_success_triggers_dlq_retry(self, mock_session):

# Perform an initial successful POST request
url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}
HttpClient.post(url, payload)

# Check that both failed requests in the DLQ were retried and removed
self.assertEqual(0, len(dead_letter_queue.get_all()))

@patch("requests.Session")
def test_dlq_retry_fails_and_stays_in_queue(self, mock_session):
def test_dlq_retry_fails_and_stays_in_queue(self, mock_session, mock_open_file):
# Mock successful POST response for the initial request
mock_response_success = Mock()
mock_response_success.status_code = 200
Expand All @@ -229,7 +234,7 @@ def test_dlq_retry_fails_and_stays_in_queue(self, mock_session):
# Manually add a failed request to the DLQ
failed_request = {
"url": "https://api.agentops.ai/health",
"payload": b'{"key": "value1"}',
"payload": {"key": "value1"},
"api_key": "API_KEY_1",
"parent_key": None,
"jwt": None,
Expand All @@ -246,7 +251,9 @@ def test_dlq_retry_fails_and_stays_in_queue(self, mock_session):
self.assertEqual(len(dead_letter_queue.get_all()), 1)

@patch("requests.Session")
def test_dlq_retry_successfully_retries_post_and_get(self, mock_session):
def test_dlq_retry_successfully_retries_post_and_get(
self, mock_session, mock_open_file
):
# Mock successful POST and GET responses for DLQ retries
mock_response_success = Mock()
mock_response_success.status_code = 200
Expand All @@ -264,7 +271,7 @@ def test_dlq_retry_successfully_retries_post_and_get(self, mock_session):
# Manually add failed POST and GET requests to the DLQ
failed_post_request = {
"url": "https://api.agentops.ai/health",
"payload": b'{"key": "value1"}',
"payload": {"key": "value1"},
"api_key": "API_KEY_1",
"parent_key": None,
"jwt": None,
Expand All @@ -283,17 +290,17 @@ def test_dlq_retry_successfully_retries_post_and_get(self, mock_session):

# Perform an initial successful POST request
url = "https://api.agentops.ai/health"
payload = b'{"key": "value"}'
payload = {"key": "value"}
HttpClient.post(url, payload)

# Check that both failed requests (POST and GET) in the DLQ were retried and removed
self.assertEqual(len(dead_letter_queue.get_all()), 0)

def test_clear_dlq_after_success(self):
def test_clear_dlq_after_success(self, mock_open_file):
# Add requests to DLQ and ensure they are removed after retry success
failed_request = {
"url": "https://api.agentops.ai/health",
"payload": b'{"key": "value1"}',
"payload": {"key": "value1"},
"api_key": "API_KEY_1",
"parent_key": None,
"jwt": None,
Expand Down

0 comments on commit 0ed4260

Please sign in to comment.