forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpalo_task.py
280 lines (248 loc) · 9.24 KB
/
palo_task.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
一个Palo的Task,
用于测试两类任务的相互影响时,持续执行一类任务
Date: 2015/01/22 10:49:31
"""
import random
import time
import threading
from lib import palo_client
class PaloTask(object):
"""
所有Task的父类
"""
def __init__(self, client):
self.client = client
def do_task(self):
"""
执行task, 在子类中实现
"""
pass
def wait_task(self):
"""
等待task执行结束,用于异步任务的状态监控, 需在子类中实现
"""
pass
def clean(self):
"""清理task中的残留"""
pass
class TaskThread(threading.Thread):
"""
启动一个线程循环去执行task
"""
def __init__(self, task):
self._exit_event = threading.Event()
self.task = task
threading.Thread.__init__(self)
self.setDaemon(True)
def stop(self):
"""
设置结束标记,会结束所有对象线程
"""
self._exit_event.set()
def run(self):
"""
启动线程
"""
while not self._exit_event.is_set():
self.task.wait_task()
self.task.do_task()
class SelectTask(PaloTask):
"""
查询任务
"""
def __init__(self, host, port, sql, database_name=None, expected_file_path=None,
user="root", password="", charset="utf8", delay=None, interval=None):
self.client = palo_client.get_client(host, port, database_name=database_name, user=user,
password=password, charset=charset)
self.sql = sql
if database_name is not None:
self.client.use(database_name)
self.expected_file_path = expected_file_path
self.delay = delay
if interval is None:
self.interval = 1
else:
self.interval = interval
def do_task(self):
"""
发送查询
"""
result = None
if self.delay is not None:
try_time = 0
while try_time < self.delay:
try:
result = self.client.execute(self.sql)
except:
try_time += self.interval
time.sleep(self.interval)
else:
break
else:
result = self.client.execute(self.sql)
if self.expected_file_path:
pass
return result
class BatchLoadTask(PaloTask):
"""
连续提交导入任务
"""
def __init__(self, host, port, database_name, load_label, load_data_list,
max_filter_ratio=None, timeout=None, is_wait=False, interval=None,
user="root", password="", charset="utf8", broker=None):
self.client = palo_client.get_client(host, port, database_name=database_name, user=user,
password=password, charset=charset)
self.client.use(database_name)
self.load_label = load_label
self.load_num = 0
self.load_data_list = load_data_list
self.max_filter_ratio = max_filter_ratio
self.timeout = timeout
self.is_wait = is_wait
self.broker = broker
if interval is None:
self.interval = 0
else:
self.interval = interval
def do_task(self):
"""
做导入任务
"""
load_label = "%s_%d" % (self.load_label, self.load_num)
ret = self.client.batch_load(load_label, self.load_data_list,
max_filter_ratio=self.max_filter_ratio,
timeout=self.timeout, is_wait=self.is_wait,
broker=self.broker)
assert ret
self.load_num += 1
time.sleep(self.interval)
class BulkLoadTask(PaloTask):
"""
连续提交小批量导入任务
"""
def __init__(self, host, port, be_host, webserver_port, database_name, table_family_name,
load_label, data_file, max_filter_ratio=None, timeout=None, is_wait=False,
user="root", password="", be_user="root", be_password="", charset="utf8", interval=0):
self.client = palo_client.get_client(host, port, database_name=database_name, user=user,
password=password, charset=charset)
self.be_host = be_host
self.webserver_port = webserver_port
self.database_name = database_name
self.table_family_name = table_family_name
self.load_label = load_label
self.load_num = 1
self.data_file = data_file
self.max_filter_ratio = max_filter_ratio
self.timeout = timeout
self.is_wait = is_wait
self.be_user = be_user
self.be_password = be_password
self.interval = interval
def do_task(self):
"""
做小批量导入任务
"""
load_label = "%s_%d" % (self.load_label, self.load_num)
ret = self.client.bulk_load(self.table_family_name, load_label, self.data_file,
self.max_filter_ratio, self.timeout, self.database_name, self.be_host,
self.webserver_port, self.is_wait, user=self.be_user, password=self.be_password)
assert ret
self.load_num += 1
time.sleep(self.interval)
class RollupTask(PaloTask):
"""
连续提交
"""
def __init__(self, host, port, database_name, table_family_name, rollup_table_name,
rollup_column_name_list, user="root", password="", charset="utf8", **kwargs):
self.client = palo_client.get_client(host, port, database_name=database_name, user=user,
password=password, charset=charset)
self.client.use(database_name)
self.table_family_name = table_family_name
self.rollup_table_name = rollup_table_name
self.rollup_num = 1
self.rollup_column_name_list = rollup_column_name_list
self.kwargs = kwargs
def do_task(self):
"""
做rollup
"""
rollup_table_name = "%s_%d" % (self.rollup_table_name, self.rollup_num)
self.rollup_num += 1
self.client.create_rollup_table(self.table_family_name, rollup_table_name,
self.rollup_column_name_list, is_wait=True)
class DeleteTask(PaloTask):
"""
数据删除任务, 循环使用delete_conditions_list中的删除条件,向palo发送数据删除命令
"""
def __init__(self, host, port, database_name, table_family_name, delete_conditions_list,
user="root", password="", charset="utf8", **kwargs):
"""
Parameters:
delete_conditions_list:由delete_condition_list(见PaloClient.delete)组成的list
"""
self.client = palo_client.get_client(host, port, database_name=database_name, user=user,
password=password, charset=charset)
self.client.use(database_name)
self.table_family_name = table_family_name
self.delete_conditions_list = delete_conditions_list
self.delete_conditions_index = 0
self.kwargs = kwargs
def do_task(self):
"""
执行一次数据删除,delete_conditions_index递增
"""
delete_condition_list = self.delete_conditions_list[self.delete_conditions_index % \
len(self.delete_conditions_list)]
self.delete_conditions_index += 1
try:
ret = self.client.delete(self.table_family_name, delete_condition_list, **self.kwargs)
except palo_client.PaloClientException as error:
print(str(error))
#TODO
pass
class SyncTask(PaloTask):
"""
执行同步任务,如查询,同步的insert、delete、update等
todo: 每个任务的参数相同,每个任务的结果不校验
"""
def __init__(self, func, *args, **kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
self.succ_count = 0
self.error_count = 0
self.interval = None
def do_task(self):
"""
执行一次任务
"""
try:
self.func(*self.args, **self.kwargs)
self.succ_count += 1
except Exception as e:
self.error_count += 1
print(str(e))
if self.interval is None:
time.sleep(random.randint(0, 10) / 10.0)
else:
time.sleep(self.interval)