Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow for password less command lines #412

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions sink-connector/python/db/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
import warnings
from clickhouse_driver import connect
import xml.etree.ElementTree as ET
import yaml
import os


def clickhouse_connection(host, database='default', user='default', password='', port=9000,
secure=False):
conn = connect(host=host,
user=user,
password=password,
port=port,
database=database,
connect_timeout=20,
secure=secure
)
return conn


def clickhouse_execute_conn(conn, sql):
logging.debug(sql)
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
return result


def execute_sql(conn, strSql):
"""
# -- =======================================================================
# -- Connect to the SQL server and execute the command
# -- =======================================================================
"""
logging.debug("SQL="+strSql)
rowset = None
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
rowset = clickhouse_execute_conn(conn, strSql)
rowcount = len(rowset)
if len(w) > 0:
logging.warning("SQL warnings : "+str(len(w)))
logging.warning("first warning : "+str(w[0].message))

return (rowset, rowcount)


def resolve_credentials_from_config(config_file):
assert config_file is not None, "A config file --clickhouse_config_file must be passed if --password is not specified"
assert os.path.isfile(config_file), f"Path {config_file} must exist"
assert config_file.endswith(".xml") or config_file.endswith(".yml") or config_file.endswith(".yaml"), f"Supported configuration extensions .xml or .yaml or .yml"

if config_file.endswith(".xml"):
tree = ET.parse(config_file)
root = tree.getroot()
clickhouse_user = root.findtext('user')
clickhouse_password = root.findtext('password')
elif config_file.endswith(".yml") or config_file.endswith(".yaml"):
with open(config_file, 'r') as f:
valuesYaml = yaml.load(f, Loader=yaml.FullLoader)
clickhouse_user = valuesYaml['config']['user']
clickhouse_password = valuesYaml['config']['password']
logging.debug(f"clickhouse_user {clickhouse_user} clickhouse_password {clickhouse_password}")
return (clickhouse_user, clickhouse_password)
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from sqlalchemy import create_engine
import logging
import warnings
import os
import configparser
config = configparser.ConfigParser()

binary_datatypes = ('blob', 'varbinary', 'point', 'geometry', 'bit', 'binary', 'linestring',
'geomcollection', 'multilinestring', 'multipolygon', 'multipoint', 'polygon')

binary_datatypes = ('blob', 'varbinary', 'point', 'geometry', 'bit', 'binary', 'linestring', 'geomcollection', 'multilinestring', 'multipolygon', 'multipoint', 'polygon')

def is_binary_datatype(datatype):
if "blob" in datatype or "binary" in datatype or "varbinary" in datatype or "bit" in datatype:
return True
else:
return datatype.lower() in binary_datatypes
if "blob" in datatype or "binary" in datatype or "varbinary" in datatype or "bit" in datatype:
return True
else:
return datatype.lower() in binary_datatypes


def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql_database):
url = 'mysql+pymysql://{user}:{passwd}@{host}:{port}/{db}?charset=utf8mb4'.format(
Expand All @@ -17,6 +23,7 @@ def get_mysql_connection(mysql_host, mysql_user, mysql_passwd, mysql_port, mysql
conn = engine.connect()
return conn


def execute_mysql(conn, strSql):
"""
# -- =======================================================================
Expand All @@ -34,3 +41,18 @@ def execute_mysql(conn, strSql):
logging.warning("first warning : "+str(w[0].message))

return (rowset, rowcount)


def resolve_credentials_from_config(config_file):
assert config_file is not None, "A config file --default_file must be passed if --password is not specified"
config_file = os.path.expanduser(config_file)
assert os.path.isfile(config_file), f"Path {config_file} must exist"
assert config_file.endswith(".cnf"), f"Supported configuration extensions .cnf"
# ini file read
config = configparser.ConfigParser()
config.read(config_file)
assert 'client' in config, f"Expected a [client] section in f{config_file}"
mysql_user = config['client']['user']
mysql_password = config['client']['password']
logging.debug(f"mysql_user {mysql_user} mysql_password {mysql_password}")
return (mysql_user, mysql_password)
109 changes: 34 additions & 75 deletions sink-connector/python/db_compare/clickhouse_table_checksum.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,77 +16,24 @@
import re
import os
import hashlib
from clickhouse_driver import connect
import concurrent.futures

from db.clickhouse import *

runTime = datetime.datetime.now().strftime("%Y.%m.%d-%H.%M.%S")


def clickhouse_connection(host, database='default', user='default', port=9000, password='',
secure=False):
conn = connect(host=host,
user=user,
password=password,
port=port,
database=database,
connect_timeout=20,
secure=secure
)
return conn


def clickhouse_execute_conn(conn, sql):
logging.debug(sql)
cursor = conn.cursor()
cursor.execute(sql)
result = cursor.fetchall()
return result


def get_connection():
def get_connection(clickhouse_user, clickhouse_password):

conn = clickhouse_connection(args.clickhouse_host, database=args.clickhouse_database,
user=args.clickhouse_user, password=args.clickhouse_password,
user=clickhouse_user, password=clickhouse_password,
port=args.clickhouse_port,
secure=args.secure)
return conn


def execute_sql(conn, strSql):
"""
# -- =======================================================================
# -- Connect to the SQL server and execute the command
# -- =======================================================================
"""
logging.debug("SQL="+strSql)
rowset = None
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter('always')
rowset = clickhouse_execute_conn(conn, strSql)
rowcount = len(rowset)
if len(w) > 0:
logging.warning("SQL warnings : "+str(len(w)))
logging.warning("first warning : "+str(w[0].message))
def compute_checksum(table, clickhouse_user, clickhouse_password, statements):

return (rowset, rowcount)


def execute_statement(strSql):
"""
# -- =======================================================================
# -- Connect to the SQL server and execute the command
# -- =======================================================================
"""
conn = get_connection()
(rowset, rowcount) = execute_sql(conn, strSql)
conn.close()
return (rowset, rowcount)


def compute_checksum(table, statements):

conn = get_connection()
conn = get_connection(clickhouse_user, clickhouse_password)

debug_out = None
if args.debug_output:
Expand Down Expand Up @@ -132,23 +79,23 @@ def compute_checksum(table, statements):
conn.close()


def get_primary_key_columns(table_schema, table_name):
def get_primary_key_columns(conn, table_schema, table_name):
sql = """
SELECT
name
FROM system.columns
WHERE (database = '{table_schema}') AND (table = '{table_name}') AND (is_in_primary_key = 1)
ORDER BY position ASC
""".format(table_schema=table_schema, table_name=table_name)
(rowset, count) = execute_statement(sql)
(rowset, count) = execute_sql(conn, sql)
res = []
for row in rowset:
if row[0] is not None:
res.append(row[0])
return res


def get_table_checksum_query(table):
def get_table_checksum_query(conn, table):
#logging.info(f"Excluded columns before join, {args.exclude_columns}")
excluded_columns = "','".join(args.exclude_columns)
excluded_columns = [f'{column}' for column in excluded_columns.split(',')]
Expand All @@ -157,7 +104,7 @@ def get_table_checksum_query(table):
excluded_columns_str = ','.join((f"'{col}'" for col in excluded_columns))
checksum_query="select name, type, if(match(type,'Nullable'),1,0) is_nullable, numeric_scale from system.columns where database='" + args.clickhouse_database+"' and table = '"+table+"' and name not in ("+ excluded_columns_str +") order by position"

(rowset, rowcount) = execute_statement(checksum_query)
(rowset, rowcount) = execute_sql(conn, checksum_query)
#logging.info(f"CHECKSUM QUERY: {checksum_query}")

select = ""
Expand Down Expand Up @@ -223,7 +170,7 @@ def get_table_checksum_query(table):
query = "select "+select+"||',' as query from " + \
args.clickhouse_database+"."+table

primary_key_columns = get_primary_key_columns(
primary_key_columns = get_primary_key_columns(conn,
args.clickhouse_database, table)
logging.debug(str(primary_key_columns))
order_by_columns = ""
Expand Down Expand Up @@ -284,20 +231,20 @@ def select_table_statements(table, query, select_query, order_by, external_colum
return statements


def get_tables_from_regex(strDSN):
def get_tables_from_regex(conn, strDSN):
if args.no_wc:
return [[args.tables_regex]]

schema = args.clickhouse_database
strCommand = "select name from system.tables where database = '{d}' and match(name,'{t}') order by 1".format(
d=schema, t=args.tables_regex)
logging.info(f"REGEX QUERY: {strCommand}")
(rowset, rowcount) = execute_statement(strCommand)
(rowset, rowcount) = execute_sql(conn, strCommand)
x = rowset
return x


def calculate_checksum(table):
def calculate_checksum(table, clickhouse_user, clickhouse_password):
if args.ignore_tables_regex:
rex_ignore_tables = re.compile(args.ignore_tables_regex, re.IGNORECASE)
if rex_ignore_tables.match(table):
Expand All @@ -316,7 +263,8 @@ def calculate_checksum(table):
if args.where:
sql = sql + " where " + args.where

(rowset, rowcount) = execute_statement(sql)
conn = get_connection(clickhouse_user, clickhouse_password)
(rowset, rowcount) = execute_sql(conn, sql)
if rowcount == 0:
logging.info("No rows in ClickHouse. Nothing to sync.")
logging.info("Checksum for table {schema}.{table} = d41d8cd98f00b204e9800998ecf8427e count 0".format(
Expand All @@ -325,10 +273,10 @@ def calculate_checksum(table):

# generate the file from ClickHouse
(query, select_query, distributed_by,
external_table_types) = get_table_checksum_query(table)
external_table_types) = get_table_checksum_query(conn, table)
statements = select_table_statements(
table, query, select_query, distributed_by, external_table_types)
compute_checksum(table, statements)
compute_checksum(table, clickhouse_user, clickhouse_password, statements)


# hack to add the user to the logger, which needs it apparently
Expand Down Expand Up @@ -356,9 +304,11 @@ def main():
parser.add_argument('--clickhouse_host',
help='ClickHouse host', required=True)
parser.add_argument('--clickhouse_user',
help='ClickHouse user', required=True)
help='ClickHouse user', required=False)
parser.add_argument('--clickhouse_password',
help='ClickHouse password', required=True)
help='CH password (discouraged option use a configuration file)', required=False, default=None)
parser.add_argument('--clickhouse_config_file',
help='CH config file either xml or yaml, default is ./clickhouse-client.xml', required=False, default='./clickhouse-client.xml')
parser.add_argument('--clickhouse_database',
help='ClickHouse database', required=True)
parser.add_argument('--clickhouse_port',
Expand Down Expand Up @@ -405,17 +355,26 @@ def main():
root.setLevel(logging.DEBUG)
handler.setLevel(logging.DEBUG)

thisScript = argv[0]
clickhouse_user = args.clickhouse_user
clickhouse_password = args.clickhouse_password

# check parameters
if args.clickhouse_password:
logging.warning("Using password on the command line is not secure, please specify a config file ")
assert args.clickhouse_user is not None, "--clickhouse_user must be specified"
else:
config_file = args.clickhouse_config_file
(clickhouse_user, clickhouse_password) = resolve_credentials_from_config(config_file)
try:
tables = get_tables_from_regex(args.tables_regex)
conn = get_connection(clickhouse_user, clickhouse_password)
tables = get_tables_from_regex(conn, args.tables_regex)
# CH does not print decimal with trailing zero, we need a custom function
execute_statement(create_function_format_decimal)
execute_sql(conn, create_function_format_decimal)

with concurrent.futures.ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = []
for table in tables:
futures.append(executor.submit(calculate_checksum, table[0]))
futures.append(executor.submit(calculate_checksum, table[0], clickhouse_user, clickhouse_password))
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
Expand Down
Loading
Loading