Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nullable fields and add clickhouse_datatime_timezone #375

Merged
merged 2 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions sink-connector/python/db_load/clickhouse_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def find_partitioning_options(source):
return partitioning_options


def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support):
def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone):

# do we have a table in the source

Expand Down Expand Up @@ -265,23 +265,23 @@ def convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete
return (res, columns)


def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_support, use_regexp_parser):
def convert_to_clickhouse_table(user_name, table_name, source, rmt_delete_support, use_regexp_parser, datetime_timezone):
# do we have a table in the source
if not find_create_table(source):
return ('', [])

src = source
if use_regexp_parser:
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support)
#if use_regexp_parser == True:
# return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone)
# the progressive grammar trims the comment
partition_options = find_partitioning_options(source)

try:
return convert_to_clickhouse_table_antlr(src, rmt_delete_support, partition_options)
return convert_to_clickhouse_table_antlr(src, rmt_delete_support, partition_options, datetime_timezone)
except Exception as ex:
logging.info(f"Use regexp DDL converter")
logging.info(f"{ex}")
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support)
return convert_to_clickhouse_table_regexp(user_name, table_name, source, rmt_delete_support, datetime_timezone)


def get_unix_timezone_from_mysql_timezone(timezone):
Expand All @@ -304,10 +304,10 @@ def get_unix_timezone_from_mysql_timezone(timezone):
return tz


def load_schema(args, dry_run=False):
def load_schema(args, dry_run=False, datetime_timezone=None):

if args.mysqlshell:
return load_schema_mysqlshell(args, dry_run=dry_run)
return load_schema_mysqlshell(args, dry_run=dry_run, datetime_timezone=datetime_timezone)

schema_map = {}
# create database
Expand All @@ -334,7 +334,7 @@ def load_schema(args, dry_run=False):
with gzip.open(file, "r") as schema_file:
source = schema_file.read().decode('UTF-8')
logging.info(source)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone)
logging.info(table_source)
timezone = find_dump_timezone(source)
logging.info(f"Timezone {timezone}")
Expand All @@ -349,7 +349,7 @@ def load_schema(args, dry_run=False):
return (tz, schema_map)


def load_schema_mysqlshell(args, dry_run=False):
def load_schema_mysqlshell(args, dry_run=False, datetime_timezone=None):

schema_map = {}
# create database
Expand Down Expand Up @@ -380,7 +380,7 @@ def load_schema_mysqlshell(args, dry_run=False):
with open(file, "r") as schema_file:
source = schema_file.read()
logging.info(source)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser)
(table_source, columns) = convert_to_clickhouse_table(db, table, source, args.rmt_delete_support, args.use_regexp_parser, datetime_timezone)
logging.info(table_source)
# timezone = find_dump_timezone(source)
logging.info(f"Timezone {timezone}")
Expand Down Expand Up @@ -562,7 +562,8 @@ def main():
action='store_true', default=False)
parser.add_argument('--rmt_delete_support', help='Use RMT deletes', dest='rmt_delete_support',
action='store_true', default=False)

parser.add_argument('--clickhouse_datetime_timezone',
help='Timezone for CH date times', required=False, default=None)
args = parser.parse_args()
schema = not args.data_only
data = not args.schema_only
Expand All @@ -576,7 +577,7 @@ def main():
'zstd'), "zstd should be in the PATH for util.dumpSchemas load"

if schema:
(timezone, schema_map) = load_schema(args, dry_run=args.dry_run)
(timezone, schema_map) = load_schema(args, dry_run=args.dry_run, datetime_timezone = args.clickhouse_datetime_timezone)
if data:
if timezone is None:
(timezone, schema_map) = load_schema(args, dry_run=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


class CreateTableMySQLParserListener(MySqlParserListener):
def __init__(self, rmt_delete_support, partition_options):
def __init__(self, rmt_delete_support, partition_options, datetime_timezone=None):
self.buffer = ""
self.columns = ""
self.primary_key = ""
Expand All @@ -16,14 +16,18 @@ def __init__(self, rmt_delete_support, partition_options):
self.rename_list = []
self.rmt_delete_support = rmt_delete_support
self.partition_options = partition_options

self.datatime_timezone = datetime_timezone

def extract_original_text(self, ctx):
token_source = ctx.start.getTokenSource()
input_stream = token_source.inputStream
start, stop = ctx.start.start, ctx.stop.stop
return input_stream.getText(start, stop)

def add_timezone(self, dataTypeText):
if self.datatime_timezone is not None:
dataTypeText = dataTypeText[:-1]+",'"+self.datatime_timezone+"')"
return dataTypeText

def convertDataType(self, dataType):
dataTypeText = self.extract_original_text(dataType)
Expand All @@ -36,8 +40,10 @@ def convertDataType(self, dataType):
if isinstance(dataType, MySqlParser.DimensionDataTypeContext):
if dataType.DATETIME() or dataType.TIMESTAMP():
dataTypeText = 'DateTime64(0)'
dataTypeText = self.add_timezone(dataTypeText)
if dataType.lengthOneDimension():
dataTypeText = 'DateTime64'+dataType.lengthOneDimension().getText()
dataTypeText = self.add_timezone(dataTypeText)
elif dataType.TIME():
dataTypeText = "String"

Expand Down
4 changes: 2 additions & 2 deletions sink-connector/python/db_load/mysql_parser/mysql_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
raise Exception(f"Syntax error at line {line} column {column}")


def convert_to_clickhouse_table_antlr(source, rmt_delete_support, partition_options=''):
def convert_to_clickhouse_table_antlr(source, rmt_delete_support, partition_options='', datetime_timezone=None):
columns = []
input_stream = InputStream(source)
lexer = MySqlLexer(input_stream)
stream = CommonTokenStream(lexer)
parser = MySqlParser(stream)
parser.addErrorListener( MyErrorListener() )
tree = parser.sqlStatements()
listener = CreateTableMySQLParserListener(rmt_delete_support, partition_options)
listener = CreateTableMySQLParserListener(rmt_delete_support, partition_options, datetime_timezone=datetime_timezone)
walker = ParseTreeWalker()
walker.walk(listener, tree)
logging.debug(Trees.toStringTree(tree, None, parser))
Expand Down
Loading