Skip to content

ETL Token Limit #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Mar 14, 2025
108 changes: 108 additions & 0 deletions business_objects/personal_access_token_activity_log_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import datetime
from submodules.model.enums import TokenAction
from submodules.model.session import session
from submodules.model.business_objects import general
from submodules.model.models import (
PersonalAccessTokenActivityLogEtl,
)
from submodules.model.util import prevent_sql_injection

FILE_UPLOAD_INTERVAL = 3600 # 1 hour in seconds
FILE_UPLOAD_LIMIT = 50 # per interval

TOKEN_LIMIT_BREACH_QUERY = """SELECT SUM(quantity)
FROM cognition.personal_access_token_activity_log_etl patale
WHERE patale.organization_id = '{org_id}'
AND patale.action = '{upload_action}'
AND patale.created_at > (SELECT NOW() - INTERVAL '{file_upload_interval} seconds')
"""


def get_remaining_upload_limit(
org_id: str, file_upload_limit: int, file_upload_interval: int
) -> int:
query = TOKEN_LIMIT_BREACH_QUERY.format(
org_id=prevent_sql_injection(org_id, isinstance(org_id, str)),
upload_action=TokenAction.FILE_UPLOAD.value,
file_upload_interval=prevent_sql_injection(
file_upload_interval, isinstance(file_upload_interval, int)
),
)
files_uploaded_no = general.execute_first(query)
if not files_uploaded_no[0]:
return file_upload_limit
return file_upload_limit - files_uploaded_no[0]


def is_limit_breached(
org_id: str, file_upload_limit: int, file_upload_interval: int
) -> bool:
remaining_upload_limit = get_remaining_upload_limit(
org_id, file_upload_limit, file_upload_interval
)
return remaining_upload_limit <= 0


def get_retry_after(
org_id: str, file_upload_limit: int = None, file_upload_interval: int = None
) -> int:
file_upload_limit = int(file_upload_limit or FILE_UPLOAD_LIMIT)
file_upload_interval = int(file_upload_interval or FILE_UPLOAD_INTERVAL)
if not is_limit_breached(org_id, file_upload_limit, file_upload_interval):
return 0

latest_activity = (
session.query(PersonalAccessTokenActivityLogEtl)
.filter(
PersonalAccessTokenActivityLogEtl.organization_id == org_id,
PersonalAccessTokenActivityLogEtl.action == TokenAction.FILE_UPLOAD.value,
)
.order_by(PersonalAccessTokenActivityLogEtl.created_at.desc())
.first()
)
time_since_latest = (
datetime.datetime.now(datetime.timezone.utc) - latest_activity.created_at
)
retry_after = file_upload_interval - time_since_latest.seconds
return retry_after


def create(
org_id: str,
action: str,
quantity: int,
endpoint: str,
token_scope_id: str,
with_commit: bool = True,
) -> PersonalAccessTokenActivityLogEtl:
pat_activity = PersonalAccessTokenActivityLogEtl(
organization_id=org_id,
action=action,
quantity=quantity,
endpoint=endpoint,
token_scope_id=token_scope_id,
)
general.add(pat_activity, with_commit)
return pat_activity


def delete_many(
org_id: str = None,
delete_after_days: int = 90,
with_commit: bool = False,
) -> None:
if org_id:
session.query(PersonalAccessTokenActivityLogEtl).filter(
PersonalAccessTokenActivityLogEtl.organization_id == org_id,
).delete()
general.flush_or_commit(with_commit)
return

if delete_after_days:
session.query(PersonalAccessTokenActivityLogEtl).filter(
PersonalAccessTokenActivityLogEtl.created_at
< datetime.datetime.now(datetime.timezone.utc)
- datetime.timedelta(days=delete_after_days),
).delete()
general.flush_or_commit(with_commit)
return
6 changes: 3 additions & 3 deletions business_objects/personal_access_token_etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@

TOKEN_WITH_SCOPE_QUERY = """SELECT
pate.*,
token_scope.scopes
ts.scopes
FROM cognition.personal_access_token_etl pate
INNER JOIN (
SELECT patse.token_id, array_agg(row_to_json(patse)) AS scopes
FROM (SELECT * FROM cognition.personal_access_token_scope_etl) patse
GROUP BY patse.token_id
) token_scope
ON pate.id = token_scope.token_id
) ts
ON pate.id = ts.token_id
"""


Expand Down
13 changes: 13 additions & 0 deletions enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class Tablenames(Enum):
PERSONAL_ACCESS_TOKEN = "personal_access_token"
PERSONAL_ACCESS_TOKEN_ETL = "personal_access_token_etl"
PERSONAL_ACCESS_TOKEN_SCOPE_ETL = "personal_access_token_scope_etl"
PERSONAL_ACCESS_TOKEN_ACTIVITY_LOG_ETL = "personal_access_token_activity_log_etl"
ADMIN_MESSAGE = "admin_message"
TASK_QUEUE = "task_queue"
CONVERSATION = "conversation"
Expand Down Expand Up @@ -429,6 +430,18 @@ class TokenExpireAtValues(Enum):
NEVER = "NEVER"


class TokenAction(Enum):
FILE_UPLOAD = "FILE_UPLOAD"


class TokenLimit(Enum):
FILE_UPLOAD_LIMIT = "FILE_UPLOAD_LIMIT"
FILE_UPLOAD_INTERVAL = "FILE_UPLOAD_INTERVAL"

def lowercase(self):
return self.value.lower()


class TokenScope(Enum):
READ = "READ"
READ_WRITE = "READ_WRITE"
Expand Down
42 changes: 42 additions & 0 deletions models.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
SliceTypes,
StrategyComplexity,
Tablenames,
TokenLimit,
TokenScope,
TokenSubject,
UploadStates,
Expand All @@ -28,6 +29,7 @@
DateTime,
Float,
ForeignKey,
Index,
Integer,
JSON,
LargeBinary,
Expand Down Expand Up @@ -159,6 +161,13 @@ class Organization(Base):
log_admin_requests = Column(String, default=AdminLogLevel.NO_GET.value)
conversation_lifespan_days = Column(Integer)
file_lifespan_days = Column(Integer, default=14)
token_limit = Column(
JSON,
default={
TokenLimit.FILE_UPLOAD_LIMIT.lowercase(): 50,
TokenLimit.FILE_UPLOAD_INTERVAL.lowercase(): 3600,
},
) # per hour


class User(Base):
Expand Down Expand Up @@ -1473,6 +1482,39 @@ class PersonalAccessTokenScopeEtl(Base):
)


class PersonalAccessTokenActivityLogEtl(Base):
__tablename__ = Tablenames.PERSONAL_ACCESS_TOKEN_ACTIVITY_LOG_ETL.value
__table_args__ = (
Index(
f"idx_{Tablenames.PERSONAL_ACCESS_TOKEN_ACTIVITY_LOG_ETL.value}_created_at",
"created_at",
postgresql_using="brin",
),
{"schema": "cognition"},
)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
created_at = Column(DateTime(timezone=True), default=sql.func.now())
action = Column(String, index=True)
quantity = Column(Integer, default=1)
endpoint = Column(String)
organization_id = Column(
UUID(as_uuid=True),
ForeignKey(
f"{Tablenames.ORGANIZATION.value}.id",
ondelete="CASCADE",
),
index=True,
)
token_scope_id = Column(
UUID(as_uuid=True),
ForeignKey(
f"cognition.{Tablenames.PERSONAL_ACCESS_TOKEN_SCOPE_ETL.value}.id",
ondelete="SET NULL",
),
index=True,
)


class CognitionMarkdownDataset(Base):
__tablename__ = Tablenames.MARKDOWN_DATASET.value
__table_args__ = {"schema": "cognition"}
Expand Down