diff --git a/agentops/session.py b/agentops/session.py index 72c5bdff..6fd51e61 100644 --- a/agentops/session.py +++ b/agentops/session.py @@ -5,7 +5,7 @@ import time from decimal import ROUND_HALF_UP, Decimal from termcolor import colored -from typing import Optional, List, Union, Callable +from typing import Optional, List, Union from uuid import UUID, uuid4 from datetime import datetime @@ -40,7 +40,6 @@ def __init__( config: Configuration, tags: Optional[List[str]] = None, host_env: Optional[dict] = None, - callback: Optional[Callable[["Session"], None]] = None, ): self.end_timestamp = None self.end_state: Optional[str] = None @@ -61,8 +60,9 @@ def __init__( "errors": 0, "apis": 0, } + self.stop_flag = threading.Event() - self.thread = threading.Thread(target=self._run, args=(callback,)) + self.thread = threading.Thread(target=self._run) self.thread.daemon = True self.thread.start() @@ -274,21 +274,16 @@ def _start_session(self): self.config.parent_key, ) except ApiServerException as e: - logger.error(f"Could not start session - {e}") - return False + return logger.error(f"Could not start session - {e}") logger.debug(res.body) if res.code != 200: - logger.error(f"Could not start session - server error") return False jwt = res.body.get("jwt", None) self.jwt = jwt if jwt is None: - logger.error( - f"Could not start session - server could not authenticate your API Key" - ) return False session_url = res.body.get( @@ -321,8 +316,6 @@ def _update_session(self) -> None: return logger.error(f"Could not update session - {e}") def _flush_queue(self) -> None: - import time - print("\nšŸ”„ FLUSHING QUEUE šŸ”„", flush=True) if not self.is_running: return with self.lock: @@ -336,15 +329,11 @@ def _flush_queue(self) -> None: serialized_payload = safe_serialize(payload).encode("utf-8") try: - print("\nšŸš€ Starting post šŸš€\n", serialized_payload) - start_time = time.time() HttpClient.post( f"{self.config.endpoint}/v2/create_events", serialized_payload, jwt=self.jwt, ) - end_time = time.time() - print(f"\nšŸš€ QUEUE FLUSHED SUCCESSFULLY! šŸš€\nTime taken: {end_time - start_time:.2f} seconds\n") except ApiServerException as e: return logger.error(f"Could not post events - {e}") @@ -370,7 +359,7 @@ def _flush_queue(self) -> None: elif event_type == "apis": self.event_counts["apis"] += 1 - def _run(self, callback: Optional[Callable[["Session"], None]] = None) -> None: + def _run(self) -> None: while not self.stop_flag.is_set(): time.sleep(self.config.max_wait_time / 1000) if self.queue: