-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathsession.py
111 lines (88 loc) · 2.78 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
from typing import Any
import os
import sys
import asyncio
import docker
from contextvars import ContextVar
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.exc import PendingRollbackError, TimeoutError
import traceback
from . import daemon
from .business_objects import general
from .util import collect_engine_variables
from threading import Lock
import time
session_lock = Lock()
request_id_ctx_var = ContextVar("request_id", default=None)
def get_request_id():
return request_id_ctx_var.get()
(
pool_size,
pool_max_overflow,
pool_recycle,
pool_use_lifo,
pool_pre_ping,
) = collect_engine_variables()
engine = create_engine(
os.getenv("POSTGRES"),
pool_size=3, # pool_size,
max_overflow=0, # pool_max_overflow,
pool_recycle=pool_recycle,
pool_use_lifo=pool_use_lifo,
pool_pre_ping=pool_pre_ping,
pool_timeout=5,
)
session = scoped_session(
sessionmaker(autocommit=False, autoflush=True, bind=engine),
scopefunc=get_request_id,
)
## uncomment following lines to enable db logging
""" import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
"""
def exit_on_timeout(f):
def safe_execution(*args, **kwargs):
try:
return f(*args, **kwargs)
except TimeoutError as e:
client = docker.from_env()
container = client.containers.get("cognition-gateway")
loop = asyncio.get_event_loop()
print(f"TimeoutError in {f.__name__}: {e}", flush=True)
traceback.print_exc()
loop.stop()
container.restart()
sys.exit(1)
return safe_execution
def check_session_and_rollback():
try:
_ = session.connection()
except PendingRollbackError:
print("session issue detected, rollback initiated", flush=True)
print(traceback.format_exc(), flush=True)
while session.registry().in_transaction():
session.rollback()
def get_engine_dialect() -> Any:
if not engine:
return None
return engine.dialect
def start_session_cleanup_thread():
"""
Start a thread that cleans up sessions older than 5 minutes.
"""
daemon.run_without_db_token(__start_session_cleanup)
def __start_session_cleanup():
while True:
with session_lock:
sessions = general.get_session_lookup(exclude_last_x_seconds=20 * 60)
for session in sessions:
try:
general.force_remove_and_refresh_session_by_id(
session["session_id"]
)
print("Session removed", session, flush=True)
except Exception:
traceback.print_exc()
time.sleep(10)