forked from apache/doris
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstart.py
243 lines (204 loc) · 9 KB
/
start.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
This module start Palo.
Date: 2015/10/07 17:23:06
"""
import os
import time
import threading
import socket
import env_config
import execute
import load_cluster
def start_one_fe(host_name):
"""start one fe
"""
cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --daemon' % \
(env_config.JAVA_HOME, env_config.fe_path)
status, output = execute.exe_cmd(cmd, host_name)
time.sleep(10)
def start_one_fe_with_helper(host_name, master_host_port=None):
"""start one fe with helper
"""
edit_log_port = env_config.fe_query_port - 20
if master_host_port is None:
master_host_port = '%s:%d' % (env_config.master, edit_log_port)
cmd = 'export JAVA_HOME=%s;cd %s/fe;sh bin/start_fe.sh --helper %s --daemon' % \
(env_config.JAVA_HOME, env_config.fe_path, master_host_port)
status, output = execute.exe_cmd(cmd, host_name)
time.sleep(10)
def start_one_be(host_name):
"""start one be
"""
cmd = 'export PATH=$PATH:/sbin; export JAVA_HOME=%s; cd %s/be; ' \
'sh bin/start_be.sh --daemon' % (env_config.JAVA_HOME, env_config.be_path)
status, output = execute.exe_cmd(cmd, host_name)
time.sleep(10)
def start_master():
"""start master
"""
start_one_fe(env_config.master)
def start_other_fe():
"""start fe
"""
start_fe_threads = []
for host_name in env_config.follower_list + \
env_config.observer_list + env_config.dynamic_add_fe_list:
t = threading.Thread(target=start_one_fe_with_helper, args=(host_name,))
t.start()
start_fe_threads.append(t)
for t in start_fe_threads:
t.join()
def start_be():
"""start be
"""
start_be_threads = []
for host_name in env_config.be_list + env_config.dynamic_add_be_list:
t = threading.Thread(target=start_one_be, args=(host_name,))
t.start()
start_be_threads.append(t)
for t in start_be_threads:
t.join()
def add_be():
"""add be
"""
for host in env_config.be_list:
sql = 'ALTER SYSTEM ADD BACKEND "%s:%d"' % (host, env_config.heartbeat_service_port)
cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql)
os.system(cmd)
def add_follower():
"""add follower
"""
for follower in env_config.follower_list:
sql = 'ALTER SYSTEM ADD FOLLOWER "%s:%d"' % (socket.gethostbyname(follower), env_config.edit_log_port)
cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql)
os.system(cmd)
def add_observer():
"""add observer
"""
edit_log_port = env_config.fe_query_port - 20
for observer in env_config.observer_list:
sql = 'ALTER SYSTEM ADD OBSERVER "%s:%d"' % (socket.gethostbyname(observer), env_config.edit_log_port)
cmd = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql)
os.system(cmd)
def add_load_cluster():
"""add load cluster
"""
sql_1, sql_2 = load_cluster.gen_add_load_cluster_sql('TestUser')
cmd_1 = 'mysql -h %s -P%s -u TestUser@test_cluster -e "%s"' % (env_config.master, env_config.fe_query_port, sql_1)
os.system(cmd_1)
cmd_2 = 'mysql -h %s -P%s -u TestUser@test_cluster -e "%s"' % (env_config.master, env_config.fe_query_port, sql_2)
os.system(cmd_2)
def create_test_cluster():
"""create test cluster
"""
sql_1 = 'CREATE CLUSTER test_cluster PROPERTIES("instance_num"="4") IDENTIFIED BY ""'
sql_2 = 'enter test_cluster;CREATE USER "TestUser" SUPERUSER'
cmd_1 = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql_1)
os.system(cmd_1)
cmd_2 = "mysql -h %s -P%s -u root -e '%s'" % (env_config.master, env_config.fe_query_port, sql_2)
os.system(cmd_2)
def add_default_load_cluster():
"""default load cluster for user root"""
sql_1, sql_2 = load_cluster.gen_add_load_cluster_sql('root')
cmd_1 = 'mysql -h %s -P%s -u root@default_cluster -p%s -e "%s"' % (env_config.master,
env_config.fe_query_port,
env_config.fe_password, sql_1)
os.system(cmd_1)
cmd_2 = 'mysql -h %s -P%s -u root@default_cluster -p%s -e "%s"' % (env_config.master,
env_config.fe_query_port,
env_config.fe_password, sql_2)
os.system(cmd_2)
def add_password():
"""add root user password"""
sql = "set password for 'root'@'%%' = PASSWORD('%s')" % env_config.fe_password
cmd = 'mysql -h %s -P%s -uroot -e "%s"' % (env_config.master, env_config.fe_query_port, sql)
os.system(cmd)
def add_brokers():
"""add broker"""
for broker_name, broker_node in env_config.broker_list.items():
sql = "ALTER SYSTEM ADD BROKER %s '%s'" % (broker_name, broker_node)
cmd = 'mysql -h %s -P%s -uroot -e "%s"' % (env_config.master, env_config.fe_query_port, sql)
os.system(cmd)
def add_auditload_plugin():
"""add audit load plugin"""
sql = """
create table doris_audit_db__.doris_audit_tbl__ \
( \
query_id varchar(48) comment 'Unique query id', \
\`time\` datetime not null comment 'Query start time', \
client_ip varchar(32) comment 'Client IP', \
user varchar(64) comment 'User name', \
db varchar(96) comment 'Database of this query', \
state varchar(8) comment 'Query result state. EOF, ERR, OK', \
query_time bigint comment 'Query execution time in millisecond', \
scan_bytes bigint comment 'Total scan bytes of this query', \
scan_rows bigint comment 'Total scan rows of this query', \
return_rows bigint comment 'Returned rows of this query', \
stmt_id int comment 'An incremental id of statement', \
is_query tinyint comment 'Is this statemt a query. 1 or 0', \
frontend_ip varchar(32) comment 'Frontend ip of executing this statement', \
cpu_time_ms bigint comment 'Total scan cpu time in millisecond of this query', \
sql_hash varchar(48) comment 'Hash value for this query', \
sql_digest varchar(48) comment 'Sql digest for this query', \
peak_memory_bytes bigint comment 'Peak memory bytes used on all backends of this query', \
stmt string comment 'The original statement, trimed if longer than 2G ' \
) engine=OLAP \
duplicate key(query_id, \`time\`, client_ip) \
partition by range(\`time\`) () \
distributed by hash(query_id) buckets 1 \
properties( \
'dynamic_partition.time_unit' = 'HOUR', \
'dynamic_partition.start' = '-48', \
'dynamic_partition.end' = '3', \
'dynamic_partition.prefix' = 'p', \
'dynamic_partition.buckets' = '1', \
'dynamic_partition.enable' = 'true', \
'replication_num' = '3' \
);
"""
cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port,
env_config.fe_password, "create database doris_audit_db__")
os.system(cmd)
cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port,
env_config.fe_password, sql)
os.system(cmd)
sql = "INSTALL PLUGIN FROM '%s/fe/plugin_auditloader'" % env_config.fe_path
cmd = 'mysql -h %s -P%s -uroot -p%s -e "%s"' % (env_config.master, env_config.fe_query_port,
env_config.fe_password, sql)
os.system(cmd)
def start_palo(init_state=False, deploy_audit=False):
"""start palo
"""
start_master()
time.sleep(30)
if init_state:
add_follower()
add_observer()
add_be()
add_brokers()
add_password()
start_other_fe()
start_be()
time.sleep(5)
if deploy_audit:
add_auditload_plugin()
if __name__ == '__main__':
start_palo()