-
Notifications
You must be signed in to change notification settings - Fork 56
/
api.py
121 lines (94 loc) · 3.27 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# -*- coding: utf-8 -*-
import abc
from django.conf import settings
from django.utils.module_loading import import_string
class AbstractBkApi(metaclass=abc.ABCMeta):
@staticmethod
def search_cloud_area(params: dict = None):
raise NotImplementedError
@staticmethod
def search_business(params: dict = None):
raise NotImplementedError
@staticmethod
def search_biz_inst_topo(params: dict = None):
raise NotImplementedError
@staticmethod
def get_biz_internal_module(params: dict = None):
raise NotImplementedError
@staticmethod
def find_host_topo_relation(params: dict = None):
raise NotImplementedError
@staticmethod
def list_biz_hosts(params: dict = None):
raise NotImplementedError
@staticmethod
def list_host_total_mainline_topo(params: dict = None):
raise NotImplementedError
@staticmethod
def get_agent_status(params: dict = None):
"""
GSE1.0, 查询主机agent状态, bk_cloud_id:ip为唯一标识版本
params:
hosts: List[Dict[str, Any]], [{"bk_cloud_id": 0, "ip": ""]}, 云区域ID和IP
"""
raise NotImplementedError
@staticmethod
def get_agent_status_v2(params: dict = None):
"""
GSE2.0, 查询主机agent状态, bk_agent_id为唯一标识版本
params:
agent_id_list: List[str], ["010000000000001"] 主机agent_id列表
"""
raise NotImplementedError
@staticmethod
def list_service_template(params: dict = None):
"""查询服务模板"""
raise NotImplementedError
@staticmethod
def list_set_template(params: dict = None):
"""查询集群模板"""
raise NotImplementedError
@staticmethod
def search_set(params: dict = None):
"""查询集群"""
raise NotImplementedError
@staticmethod
def search_module(params: dict = None):
"""查询模块"""
raise NotImplementedError
@staticmethod
def search_dynamic_group(params: dict = None):
"""查询动态分组"""
raise NotImplementedError
@staticmethod
def execute_dynamic_group(params: dict = None):
"""执行动态分组"""
raise NotImplementedError
@staticmethod
def find_host_by_service_template(params: dict = None):
"""分页查询服务模板的主机"""
raise NotImplementedError
@staticmethod
def find_host_by_set_template(params: dict = None):
"""分页查询集群模板的主机"""
raise NotImplementedError
@staticmethod
def find_topo_node_paths(params: dict = None):
"""查询拓扑节点所在的拓扑路径"""
raise NotImplementedError
@staticmethod
def list_service_category(params: dict = None):
"""查询服务分类列表"""
raise NotImplementedError
class BkApiProxy:
def __init__(self):
self._api = None
def __getattr__(self, action):
if self._api is None:
self.init_api()
func = getattr(self._api, action)
return func
def init_api(self):
api_class = getattr(settings, "BKM_IPCHOOSER_BKAPI_CLASS", "bkm_ipchooser.api.AbstractBkApi")
self._api = import_string(api_class)
BkApi: AbstractBkApi = BkApiProxy()