Skip to content
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ local_settings.py

seatable-python-runner/
seatable-python-runner.zip

.python-version
4 changes: 2 additions & 2 deletions scheduler/app/database/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, scoped_session

DB_ROOT_USER = os.getenv("DB_ROOT_USER", "root")
DB_ROOT_PASSWD = os.getenv("DB_ROOT_PASSWD")
Expand Down Expand Up @@ -37,4 +37,4 @@

engine = create_engine(db_url, **db_kwargs)
Base = declarative_base()
DBSession = sessionmaker(bind=engine)
DBSession = scoped_session(sessionmaker(bind=engine))
27 changes: 24 additions & 3 deletions scheduler/app/faas_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class ScriptLog(Base):
return_code = Column(Integer, nullable=True)
output = Column(Text, nullable=True)
operate_from = Column(String(255))
state = Column(String(10))
created_at = Column(DateTime, index=True)

PENDING = "pending"
RUNNING = "running"
FINISHED = "finished"

def __init__(
self,
Expand All @@ -39,17 +45,28 @@ def __init__(
org_id,
script_name,
context_data,
started_at,
state,
created_at,
operate_from=None,
):
self.dtable_uuid = dtable_uuid
self.owner = owner
self.org_id = org_id
self.script_name = script_name
self.context_data = context_data
self.started_at = started_at
self.state = state
self.created_at = created_at
self.operate_from = operate_from

def get_info(self):
return {
"id": self.id,
"org_id": self.org_id,
"owner": self.owner,
"dtable_uuid": self.dtable_uuid,
"script_name": self.script_name,
}

def to_dict(self):
from faas_scheduler.utils import datetime_to_isoformat_timestr

Expand All @@ -61,13 +78,17 @@ def to_dict(self):
"context_data": (
json.loads(self.context_data) if self.context_data else None
),
"started_at": datetime_to_isoformat_timestr(self.started_at),
"started_at": self.started_at
and datetime_to_isoformat_timestr(self.started_at),
"finished_at": self.finished_at
and datetime_to_isoformat_timestr(self.finished_at),
"success": self.success,
"return_code": self.return_code,
"output": self.output,
"operate_from": self.operate_from,
"state": self.state,
"created_at": self.created_at
and datetime_to_isoformat_timestr(self.created_at),
}


Expand Down
214 changes: 153 additions & 61 deletions scheduler/app/faas_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@
import json
import logging
import requests
from datetime import datetime
from datetime import datetime, timedelta
from uuid import UUID

from tzlocal import get_localzone
from sqlalchemy import desc, text
from faas_scheduler.models import ScriptLog
from faas_scheduler.models import (
ScriptLog,
UserRunScriptStatistics,
OrgRunScriptStatistics,
DTableRunScriptStatistics,
)

import sys

Expand Down Expand Up @@ -172,6 +177,7 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
},
"context_data": context_data,
"script_id": script_id,
"timeout": int(SUB_PROCESS_TIMEOUT),
}
headers = {"User-Agent": "python-scheduler/" + VERSION}
logger.debug("I call starter at url %s", RUN_FUNC_URL)
Expand All @@ -196,66 +202,140 @@ def call_faas_func(script_url, temp_api_token, context_data, script_id=None):
return None


def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time):
if not spend_time:
return
username = owner

# dtable_run_script_statistcis
sqls = [
"""
INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES
(:dtable_uuid, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]
def update_stats_run_count(db_session, dtable_uuid, owner, org_id):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_count += 1
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_count += 1
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=0,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_count += 1
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception(
"update stats for org_id %s owner %s dtable %s run count error %s",
org_id,
owner,
dtable_uuid,
e,
)

# org_run_script_statistics
if org_id and org_id != -1:
sqls += [
"""
INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

# user_run_script_statistics
if "@seafile_group" not in username:
sqls += [
"""
INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES
(:username, :org_id, :run_date, 1, :spend_time, :update_at)
ON DUPLICATE KEY UPDATE
org_id=:org_id,
total_run_count=total_run_count+1,
total_run_time=total_run_time+:spend_time,
update_at=:update_at;
"""
]

def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time):
run_date = datetime.today().strftime("%Y-%m-%d")
try:
for sql in sqls:
db_session.execute(
text(sql),
{
"dtable_uuid": dtable_uuid,
"username": username,
"org_id": org_id,
"run_date": datetime.today(),
"spend_time": spend_time,
"update_at": datetime.now(),
},
dtable_stats = (
db_session.query(DTableRunScriptStatistics)
.filter_by(dtable_uuid=dtable_uuid, run_date=run_date)
.first()
)
if not dtable_stats:
dtable_stats = DTableRunScriptStatistics(
dtable_uuid=dtable_uuid,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(dtable_stats)
else:
dtable_stats.total_run_time += spend_time
dtable_stats.update_at = datetime.now()
if org_id == -1:
if "@seafile_group" not in owner:
user_stats = (
db_session.query(UserRunScriptStatistics)
.filter_by(username=owner, run_date=run_date)
.first()
)
if not user_stats:
user_stats = UserRunScriptStatistics(
username=owner,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(user_stats)
else:
user_stats.total_run_time += spend_time
user_stats.update_at = datetime.now()
else:
org_stats = (
db_session.query(OrgRunScriptStatistics)
.filter_by(org_id=org_id, run_date=run_date)
.first()
)
if not org_stats:
org_stats = OrgRunScriptStatistics(
org_id=org_id,
run_date=run_date,
total_run_count=1,
total_run_time=spend_time,
update_at=datetime.now(),
)
db_session.add(org_stats)
else:
org_stats.total_run_time += spend_time
org_stats.update_at = datetime.now()
db_session.commit()
except Exception as e:
logger.exception("update statistics sql error: %s", e)
logger.exception(
"update stats for org_id %s owner %s dtable %s run time error %s",
org_id,
owner,
dtable_uuid,
e,
)


# required to get "script logs" in dtable-web
Expand Down Expand Up @@ -379,20 +459,27 @@ def add_script(
org_id,
script_name,
context_data,
ScriptLog.PENDING,
datetime.now(),
operate_from,
)
db_session.add(script)
db_session.commit()

update_stats_run_count(db_session, dtable_uuid, owner, org_id)

return script


def update_script(db_session, script, success, return_code, output):
script.finished_at = datetime.now()
def update_script(
db_session, script, success, return_code, output, started_at, finished_at
):
script.started_at = started_at
script.finished_at = finished_at
script.success = success
script.return_code = return_code
script.output = output
script.state = ScriptLog.FINISHED
db_session.commit()

return script
Expand Down Expand Up @@ -420,11 +507,16 @@ def run_script(
return True


def hook_update_script(db_session, script_id, success, return_code, output, spend_time):
def hook_update_script(
db_session, script_id, success, return_code, output, started_at, spend_time
):
script = db_session.query(ScriptLog).filter_by(id=script_id).first()
if script:
update_script(db_session, script, success, return_code, output)
update_statistics(
finished_at = started_at + timedelta(seconds=spend_time)
update_script(
db_session, script, success, return_code, output, started_at, finished_at
)
update_stats_run_time(
db_session, script.dtable_uuid, script.owner, script.org_id, spend_time
)

Expand Down
Loading