Skip to content

Commit

Permalink
revert session
Browse files Browse the repository at this point in the history
  • Loading branch information
areibman committed Oct 29, 2024
1 parent b03c83e commit 28ef4c7
Showing 1 changed file with 5 additions and 16 deletions.
21 changes: 5 additions & 16 deletions agentops/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from decimal import ROUND_HALF_UP, Decimal
from termcolor import colored
from typing import Optional, List, Union, Callable
from typing import Optional, List, Union
from uuid import UUID, uuid4
from datetime import datetime

Expand Down Expand Up @@ -40,7 +40,6 @@ def __init__(
config: Configuration,
tags: Optional[List[str]] = None,
host_env: Optional[dict] = None,
callback: Optional[Callable[["Session"], None]] = None,
):
self.end_timestamp = None
self.end_state: Optional[str] = None
Expand All @@ -61,8 +60,9 @@ def __init__(
"errors": 0,
"apis": 0,
}

self.stop_flag = threading.Event()
self.thread = threading.Thread(target=self._run, args=(callback,))
self.thread = threading.Thread(target=self._run)
self.thread.daemon = True
self.thread.start()

Expand Down Expand Up @@ -274,21 +274,16 @@ def _start_session(self):
self.config.parent_key,
)
except ApiServerException as e:
logger.error(f"Could not start session - {e}")
return False
return logger.error(f"Could not start session - {e}")

logger.debug(res.body)

if res.code != 200:
logger.error(f"Could not start session - server error")
return False

jwt = res.body.get("jwt", None)
self.jwt = jwt
if jwt is None:
logger.error(
f"Could not start session - server could not authenticate your API Key"
)
return False

session_url = res.body.get(
Expand Down Expand Up @@ -321,8 +316,6 @@ def _update_session(self) -> None:
return logger.error(f"Could not update session - {e}")

def _flush_queue(self) -> None:
import time
print("\n🔄 FLUSHING QUEUE 🔄", flush=True)
if not self.is_running:
return
with self.lock:
Expand All @@ -336,15 +329,11 @@ def _flush_queue(self) -> None:

serialized_payload = safe_serialize(payload).encode("utf-8")
try:
print("\n🚀 Starting post 🚀\n", serialized_payload)
start_time = time.time()
HttpClient.post(
f"{self.config.endpoint}/v2/create_events",
serialized_payload,
jwt=self.jwt,
)
end_time = time.time()
print(f"\n🚀 QUEUE FLUSHED SUCCESSFULLY! 🚀\nTime taken: {end_time - start_time:.2f} seconds\n")
except ApiServerException as e:
return logger.error(f"Could not post events - {e}")

Expand All @@ -370,7 +359,7 @@ def _flush_queue(self) -> None:
elif event_type == "apis":
self.event_counts["apis"] += 1

def _run(self, callback: Optional[Callable[["Session"], None]] = None) -> None:
def _run(self) -> None:
while not self.stop_flag.is_set():
time.sleep(self.config.max_wait_time / 1000)
if self.queue:
Expand Down

0 comments on commit 28ef4c7

Please sign in to comment.