Skip to content

Commit

Permalink
Merge pull request #156 from jianzfb/application
Browse files Browse the repository at this point in the history
Application
  • Loading branch information
jianzfb authored Dec 30, 2024
2 parents 3752446 + dfb3d61 commit ff53e47
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 18 deletions.
Empty file.
Empty file.
140 changes: 140 additions & 0 deletions antgo/pipeline/application/create/cas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
# -*- coding: UTF-8 -*-
# @Time : 2024/11/27 22:42
# @File : user.py
# @Author : jian<jian@mltalker.com>
from __future__ import division
from __future__ import unicode_literals
from __future__ import print_function
from antvis.client.httprpc import *
from antgo.pipeline.functional.common.config import *
from antgo.pipeline.functional.common.env import *
from antgo.pipeline.functional.mixins.db import *
from urllib.parse import unquote_plus, quote_plus
import os
import cv2
import base64
import numpy as np
import re


# pattern for the authentication token header
auth_header_pat = re.compile(r'^token\s+([^\s]+)$')


class CasOp(object):
def __init__(self, cas_ip, cas_port, cas_proto='http', cas_prefix='antvis', server_router="/#/Login"):
self.cas_url = f'{cas_proto}://{cas_ip}:{cas_port}/{cas_prefix}'
self.cas_ip = cas_ip
self.cas_port = cas_port
self.cas_proto = cas_proto
self.cas_prefix = cas_prefix
self.server_router = server_router

def info(self):
return ['ST', 'token', 'username', 'password', 'session_id']

def get_current_user_token(self, db, token):
"""get_current_user from Authorization header token"""
if token is None:
return None
match = auth_header_pat.match(auth_header)
if not match:
return None
token = match.group(1)
orm_token = get_db_orm().APIToken.find(db, token)
if orm_token is None:
return None
else:
return orm_token.user

def get_current_user_token_from_argments(self, db, token):
if token is None:
return None

orm_token = get_db_orm().APIToken.find(db, token)
if orm_token is None:
return None

if orm_token is None:
return None
else:
return orm_token.user

def get_current_user(self, db, token, username, password):
"""get current username"""
# # 1.step 从cookie获得登录要换个户
# user = self.get_current_user_cookie()
# if user is not None:
# return user
# 2.step 从api_token获得登录用户
user = self.get_current_user_token(db, token)
if user is not None:
return user
# 3.step 从用户名密码获得登录用户
user = self.get_current_user_token_from_argments(db, token)
if user is not None:
return user
return user

def __call__(self, *args, ST=None, token=None, username=None, password=None, session_id=None):
with thread_session_context(get_db_session()) as db:
current_user = self.get_current_user(db, token, username, password)
if current_user is not None:
return True

if ST is None:
# 无票据信息,需要重新登录
re_server_router = self.server_router
if self.server_router.startswith('/'):
re_server_router = self.server_router[1:]

server_url = ''
if re_server_router.startswith('#'):
server_url = '{}/{}'.format(self.cas_url, quote_plus(re_server_router))

cas_url = '{}/cas/auth/?redirect={}'.format(self.cas_url, server_url)
set_context_redirect_info(session_id, cas_url)
set_context_exit_info(session_id, detail="login or re-auth user")
return None

if current_user is None or current_user.service_ticket != ST:
# 从CAS获得登录信息
response = HttpRpc('', self.cas_prefix, self.cas_ip, self.cas_port).cas.auth.post(ST=ST)
if response['status'] == 'OK':
# 当前用户登录成功
user_name = response['content']['user']
is_admin = response['content']['admin']
# 记录当前用户信息,如果不存在则创建
user = db.query(get_db_orm().User).filter(get_db_orm().User.name == user_name).one_or_none()

if user is None:
# 创建用户
user = get_db_orm().User(name=user_name, admin=is_admin)
db.add(user)
db.commit()

if user.admin != is_admin:
user.admin = is_admin

# 设置登录状态
user.cookie_id = str(uuid.uuid4()) + '/' + str(uuid.uuid4())
db.commit()
set_context_cookie_info(session_id, 'antgo-user', user.cookie_id)
return user
else:
# 票据失效,需要重新登录
s_server_router = self.server_router
if self.server_router.startswith('/'):
s_server_router = self.server_router[1:]

server_url = ''
if s_server_router.startswith('#'):
server_url = '{}/{}'.format(self.web_url, quote_plus(s_server_router))

cas_url = '{}/cas/auth/?redirect={}'.format(self.cas_url, server_url)

set_context_redirect_info(session_id, cas_url)
set_context_exit_info(session_id, detail="login or re-auth user")
return None

return current_user
Empty file.
77 changes: 77 additions & 0 deletions antgo/pipeline/application/execute/command.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: UTF-8 -*-
# @Time : 2024/11/27 22:42
# @File : command.py
# @Author : jian<jian@mltalker.com>
from __future__ import division
from __future__ import unicode_literals
from __future__ import print_function
from antvis.client.httprpc import *
from antgo.pipeline.functional.common.config import *
from antgo.pipeline.functional.mixins.db import *
from antgo import config
import threading
import os
import cv2
import base64
import numpy as np

# 配置数据库表示例
# get_table_info().update({
# 'application/execute/command': {
# 'table': 'task', # 构建新表名字
# 'links': ['user'], # 关联表
# 'fields': {
# 'task_name': 'str',
# 'task_progress': 'str',
# 'task_create_time': 'date',
# 'task_stop_time': 'date',
# 'task_is_finish': 'bool'
# }
# }
# })



class CommandOp(object):
def __init__(self, func):
self.func = func

def info(self):
# 设置需要使用隐信息(数据库、session_id)
return ['session_id']

def progress(self, user_name, task_name, task_progress, task_finish):
orm = get_db_orm()
with thread_session_context(get_db_session()) as db:
task = db.query(orm.Task).filter(orm.Task.task_name == task_name).one_or_none()
if task is None:
# no task
return

task.task_progress = task_progress
if task_finish:
task.task_is_finish = True
task.task_stop_time = datetime.datetime.now()

db.commit()

def __call__(self, *args, session_id):
# 启动新任务或查询进度
current_user, task_name = args[:2]
orm = get_db_orm()
with thread_session_context(get_db_session()) as db:
task = db.query(orm.Task).filter(orm.Task.user_id == current_user.id).one_or_none()
if task is not None:
# 返回任务进度
return {'status': 'running', 'progress': task.task_progress, 'create_time': task.task_create_time.strftime('%Y-%m-%d %H:%M:%S'), 'stop_time': task.task_stop_time.strftime('%Y-%m-%d %H:%M:%S') if task.task_is_finish else ''}

# 创建新任务
task = orm.Task(task_name=task_name, user=current_user)
db.add(task)
db.commit()

# 启动任务
thread = threading.Thread(target=self.func, args=(current_user.name, task_name, self.progress))
thread.start()
return {'status': 'start', 'progress': '', 'create_time': task.task_create_time.strftime('%Y-%m-%d %H:%M:%S')}

Empty file.
21 changes: 20 additions & 1 deletion antgo/pipeline/engine/execution/base_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from antgo.pipeline.deploy.cpp_op import CppOp
from antgo.pipeline.control.ifnotnone_op import IfNotNone
from antgo.pipeline.remote.remote_api import RemoteApiOp

from antgo.pipeline.functional.common.env import *

class BaseExecution:
"""
Expand All @@ -34,10 +34,24 @@ def __apply__(self, *arg, **kws):
self._op._index = self._index
if isinstance(self._op, RemoteApiOp):
self._op._index = self._index

if hasattr(self._op, 'info'):
info = self._op.info()
for key in self._op.info():
kws.update({key: getattr(arg[0], key, None)})
return self._op(*args, **kws)

def __is_need_exit__(self, session_id=None):
if session_id is None:
return None
exit_condition = get_context_exit_info(session_id, None)
return exit_condition

def __call__(self, *arg, **kws):
self.__check_init__()
if self.__is_need_exit__(arg[0].__dict__.get('session_id', None)):
return None

try:
if bool(self._index):
res = self.__apply__(*arg, **kws)
Expand Down Expand Up @@ -68,6 +82,11 @@ def __call__(self, *arg, **kws):
self._op._index = self._index
if isinstance(self._op, RemoteApiOp):
self._op._index = self._index

if hasattr(self._op, '__info'):
info = self._op.info()
for key in self._op.__info():
kws.update({key: getattr(arg[0], key, None)})
res = self._op(*arg, **kws)
return res
except Exception:
Expand Down
12 changes: 12 additions & 0 deletions antgo/pipeline/engine/operator_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,17 @@ def load_operator_from_remote(self, function: str, arg: List[Any], kws: Dict[str
})
return self.instance_operator(op, arg, kws) if op is not None else None

def load_operator_from_application(self, function: str, arg: List[Any], kws: Dict[str, Any], tag: str) -> Operator:
if not function.startswith('application'):
return None

module, action_name, obj_name = function.split('/')
op = getattr(importlib.import_module(f'antgo.pipeline.application.{action_name}.{obj_name}'), f'{obj_name.capitalize()}Op', None)
if op is None:
return None

return self.instance_operator(op, arg, kws) if op is not None else None

def load_operator(self, function: str, arg: List[Any], kws: Dict[str, Any], tag: str) -> Operator:
"""Attempts to load an operators from cache. If it does not exist, looks up the
operators in a remote location and downloads it to cache instead. By standard
Expand All @@ -347,6 +358,7 @@ def load_operator(self, function: str, arg: List[Any], kws: Dict[str, Any], tag:
for factory in [self.load_operator_from_internal,
self.load_operator_from_registry,
self.load_operator_from_remote,
self.load_operator_from_application,
self.load_operator_from_packages,
self.load_operator_from_eagleeye,
self.load_operator_from_deploy,
Expand Down
Loading

0 comments on commit ff53e47

Please sign in to comment.