forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_sys_binlog_restart.py
230 lines (212 loc) · 8.42 KB
/
test_sys_binlog_restart.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_binlog_restart.py
# @date 2021/11/02 14:37:00
# @brief This file is a test file for doris binlog load.
#
#############################################################################
"""
binlog load be/fe 异常测试
MySQL需开启binlog功能
"""
import os
import sys
import time
import pymysql
sys.path.append("../")
from lib import node_op
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
from data import binlog as DATA
client = None
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
canal_ip = config.canal_ip
WAIT_TIME = 20
def setup_module():
"""
setUp
"""
global node_operator
node_operator = node_op.Node()
global client
master = node_operator.get_master()
client = palo_client.get_client(master, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
global connect
connect = pymysql.connect(host=config.mysql_host, port=config.mysql_port, user=config.canal_user, \
passwd=config.canal_password)
# client.set_frontend_config('enable_create_sync_job', 'true')
global destination
destination = 'example_' + str(config.fe_query_port)
def check_fe_be():
"""检查fe和be,是否有false,如果有则拉起来"""
ret = client.get_backend_list()
be_list = util.get_attr_condition_list(ret, palo_job.BackendProcInfo.Alive,
'false', palo_job.BackendProcInfo.Host)
if be_list is not None:
for be_host in be_list:
node_operator.start_be(be_host)
assert node_operator.is_be_alive(be_host)
ret = client.get_fe_list()
fe_list = util.get_attr_condition_list(ret, palo_job.FrontendInfo.Alive,
'false', palo_job.FrontendInfo.Host)
if fe_list is not None:
for fe_host in fe_list:
node_operator.start_fe(fe_host)
assert node_operator.is_fe_alive(fe_host)
def mysql_execute(sql):
"""
连接mysql执行语句
"""
cursor = connect.cursor()
try:
LOG.info(L('mysql check sql', sql=sql))
cursor.execute(sql)
cursor.close()
connect.commit()
except Exception as error:
assert False, "execute error. %s" % str(error)
LOG.error(L("mysql execute error", error=str(error)))
time.sleep(1)
return cursor.fetchall()
def create_mysql_table(mysql_table_name, mysql_database_name, columns, key='PRIMARY KEY (k1)', new_database=True):
"""
创建MySQL数据库和表
"""
if new_database:
mysql_clean(mysql_database_name)
sql = "CREATE DATABASE %s" % mysql_database_name
try:
mysql_execute(sql)
except Exception as error:
LOG.error(L("CREATE database error", host=config.canal_host, database_name=mysql_database_name, \
error=error))
connect.select_db(mysql_database_name)
sql = "DROP TABLE IF EXISTS %s" % mysql_table_name
mysql_execute(sql)
sql = ''
for column in columns:
column_sql = '%s %s' % (column[0], column[1])
if len(column) > 2:
if column[2]:
column_sql = '%s %s' % (column_sql, column[2])
if len(column) > 3:
column_sql = '%s DEFAULT "%s"' % (column_sql, column[3])
sql = '%s %s,' % (sql, column_sql)
sql = "CREATE table %s (%s %s)" % (mysql_table_name, sql, key)
LOG.info(L('mysql check sql', sql=sql))
try:
mysql_execute(sql)
except Exception as error:
LOG.error(L("CREATE TABLE error", host=config.canal_host, table_name=mysql_table_name, error=error))
return False
return True
def mysql_clean(mysql_database_name):
"""
mysql drop database
"""
sql = "DROP DATABASE IF EXISTS %s" % mysql_database_name
mysql_execute(sql)
def test_restart_be():
"""
{
"title": "test_sys_binlog_restart:test_restart_be",
"describe": "创建binlog load任务,be宕机,mysql执行导入语句,验证数据导入到doris",
"tag": "p0,system,fuzz"
}
"""
check_fe_be()
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + '_' + str(config.fe_query_port)
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name,
job_name, canal_ip, destination=destination)
assert ret, 'create sync job failed'
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
be_id = client.get_backend_id_list()[0]
be_host = client.get_be_hostname_by_id(be_id)
ret = node_operator.stop_be(be_host)
assert ret, 'stop be failed'
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
assert client.verify(DATA.expected_file_3, table_name)
ret = node_operator.start_be(be_host)
assert ret, 'start be failed'
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
check_fe_be()
def test_restart_fe():
"""
{
"title": "test_sys_binlog_restart:test_restart_fe",
"describe": "创建binlog load任务,fe master宕机,mysql执行导入语句,验证数据导入到doris",
"tag": "p0,system,fuzz"
}
"""
check_fe_be()
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
client.create_table(table_name, DATA.column_1, set_null=True, keys_desc='UNIQUE KEY(k1)')
mysql_database_name = 'm_' + database_name + '_' + str(config.fe_query_port)
mysql_table_name = 'mysql_' + table_name
create_mysql_table(mysql_table_name, mysql_database_name, DATA.column_1)
job_name = 'job_' + table_name
ret = client.create_sync_job(table_name, database_name, mysql_table_name, mysql_database_name,
job_name, canal_ip, destination=destination)
assert ret, 'create sync job failed'
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
master_port = client.get_master()
master = master_port.split(':')[0]
ret = node_operator.stop_fe(master)
assert ret, 'stop fe failed'
time.sleep(WAIT_TIME)
sql = open(DATA.binlog_sql_4, 'r').read().format(mysql_table_name)
mysql_execute(sql)
time.sleep(WAIT_TIME)
ret = node_operator.start_fe(master)
assert ret, 'start fe failed'
time.sleep(WAIT_TIME)
client.connect()
client.use(database_name)
assert client.get_sync_job_state(job_name) == 'RUNNING', 'sync job state error'
assert client.verify(DATA.expected_file_3, table_name)
client.stop_sync_job(job_name)
client.clean(database_name)
mysql_clean(mysql_database_name)
check_fe_be()