Skip to content

Commit 6e2d598

Browse files
committed
Update clickhouse dumping to use native clickhouse BACKUP command
1 parent 3b7d3c4 commit 6e2d598

3 files changed

Lines changed: 126 additions & 119 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ To use this image you need to set up all of its environment values. If one of it
99

1010
| Environment | Description | Required | Default |
1111
|--------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------|----------|--------------------|
12-
| CRON | Crontab syntax used to determine when to do a backup. You can use [crontab.guru](crontab.guru) website to get help with cron syntax || |
12+
| CRON | Crontab syntax used to determine when to do a backup. You can use [crontab.guru](https://crontab.guru) website to get help with cron syntax || |
1313
| START_MANUAL_MANAGEMENT_SERVER | When set to true, starts a server on specified port to which you can send a http request to manually trigger backup process | | `true` |
1414
| MANUAL_MANAGEMENT_PORT | Prot for manual management server | | 33399 |
1515
| DATABASE_TYPE | Type of the database you want to backup. Available values [MySQL, PostgreSQL, ClickHouse, MongoDB] || |

src/dump.py

Lines changed: 122 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,8 @@ def dump_mongodb(environment, output_path):
8383

8484
return dump_path
8585

86-
8786
def dump_clickhouse(environment, output_path):
88-
dump_path = output_path + '.sql.gz'
89-
def stringify_row(row):
90-
new_row = []
91-
92-
for v in row:
93-
if type(v) == datetime:
94-
v: datetime
95-
new_row.append(v.strftime('%Y-%m-%d %H:%M:%S'))
96-
else:
97-
new_row.append(str(v))
98-
99-
return new_row
87+
filename = os.path.basename(output_path) + '.zip'
10088

10189
client = ClickhouseClient(
10290
host=environment.get('DATABASE_HOST'),
@@ -106,36 +94,101 @@ def stringify_row(row):
10694
port=environment.get('DATABASE_PORT'),
10795
)
10896

109-
available_tables = list(map(lambda x: x[0], client.execute(f'SHOW TABLES IN `{environment.get("DATABASE_NAME")}`')))
110-
111-
dump_output = ""
112-
113-
for table in available_tables:
114-
create_query = client.execute(f'SHOW CREATE TABLE `{table}`')[0][0]
115-
dump_output += create_query + ';\n'
116-
117-
dump_output += "\n\n"
118-
119-
for table in available_tables:
120-
insert_query = client.execute(f"SELECT * FROM {table}")
121-
122-
dump_output += f'INSERT INTO `{table}` FORMAT TSV'
123-
124-
for row in insert_query:
125-
dump_output += '\n' + '\t'.join(stringify_row(row))
126-
127-
dump_output += ';\n\n'
128-
129-
with gzip.GzipFile(dump_path, 'w+') as f:
130-
f.write(dump_output.encode())
97+
backup_query = f'''
98+
BACKUP DATABASE `{environment.get("DATABASE_NAME")}`
99+
TO S3(
100+
'https://{environment.get("GLACIER_BUCKET_NAME")}.s3.amazonaws.com/{filename}',
101+
'{environment.get("AWS_ACCESS_KEY_ID")}',
102+
'{environment.get("AWS_SECRET_ACCESS_KEY")}'
103+
)
104+
SETTINGS
105+
compression_method = 'lzma',
106+
compression_level = 4,
107+
s3_storage_class = 'STANDARD';
108+
'''
109+
client.execute(backup_query)
110+
111+
s3 = boto3.client('s3')
112+
s3.copy(
113+
CopySource={
114+
"Bucket": environment.get('GLACIER_BUCKET_NAME'),
115+
"Key": filename
116+
},
117+
Bucket=environment.get('GLACIER_BUCKET_NAME'),
118+
Key=filename,
119+
ExtraArgs={
120+
'StorageClass': storage_class_map[environment.get('GLACIER_STORAGE_CLASS')],
121+
'MetadataDirective': 'COPY',
122+
}
123+
)
131124

132-
return dump_path
125+
return None
133126

127+
def prepare_s3_bucket(environment):
128+
s3 = boto3.client('s3')
129+
try:
130+
region = environment.get('AWS_DEFAULT_REGION')
131+
s3.create_bucket(
132+
Bucket=environment.get('GLACIER_BUCKET_NAME'),
133+
**({'CreateBucketConfiguration': {'LocationConstraint': region}} if region != 'us-east-1' else {}),
134+
)
135+
except (s3.exceptions.BucketAlreadyExists, s3.exceptions.BucketAlreadyOwnedByYou):
136+
pass
134137

135-
def dump_database(environment):
136-
logger.info(f'{datetime.now()}: Creating backup')
138+
try:
139+
configuration = s3.get_bucket_lifecycle_configuration(
140+
Bucket=environment.get('GLACIER_BUCKET_NAME')
141+
)
142+
except botocore.exceptions.ClientError as e:
143+
if e.response.get('Error', {}).get('Code') == 'NoSuchLifecycleConfiguration':
144+
configuration = {}
145+
else:
146+
raise
147+
148+
glacierizer_current_rule = next(
149+
filter(lambda r: r['ID'] == 'GLACIERIZER_EXPIRE_AFTER', configuration.get('Rules', [])), None)
150+
existing_rules = list(filter(lambda r: r['ID'] != 'GLACIERIZER_EXPIRE_AFTER', configuration.get('Rules', [])))
151+
152+
glacierizer_current_expire = glacierizer_current_rule.get('Expire', {}).get('Days',
153+
None) if glacierizer_current_rule else None
154+
glacierizer_new_expire = environment.get('GLACIER_EXPIRE_AFTER')
155+
156+
glacierizer_rule_enabled = glacierizer_new_expire > 0
157+
glacierizer_rule_changed = glacierizer_current_expire != glacierizer_new_expire
158+
159+
glacierizer_new_rule = None
160+
if glacierizer_rule_enabled and glacierizer_rule_changed:
161+
glacierizer_new_rule = {
162+
'ID': 'GLACIERIZER_EXPIRE_AFTER',
163+
'Status': 'Enabled',
164+
'Expiration': {
165+
'Days': glacierizer_new_expire,
166+
},
167+
'Filter': {},
168+
}
169+
170+
if glacierizer_new_rule:
171+
s3.put_bucket_lifecycle_configuration(
172+
Bucket=environment.get('GLACIER_BUCKET_NAME'),
173+
LifecycleConfiguration={
174+
'Rules': [*existing_rules, glacierizer_new_rule]
175+
}
176+
)
177+
elif glacierizer_rule_enabled is False and glacierizer_current_rule is not None:
178+
if len(existing_rules) == 0:
179+
s3.delete_bucket_lifecycle(
180+
Bucket=environment.get('GLACIER_BUCKET_NAME')
181+
)
182+
else:
183+
s3.put_bucket_lifecycle_configuration(
184+
Bucket=environment.get('GLACIER_BUCKET_NAME'),
185+
LifecycleConfiguration={
186+
'Rules': existing_rules
187+
}
188+
)
137189

138-
filename = f'{environment.get("DATABASE_TYPE")}_{environment.get("DATABASE_NAME")}_{datetime.now().strftime("%Y_%m_%d")}'
190+
def create_dump(environment):
191+
filename = f'{environment.get("DATABASE_TYPE")}_{environment.get("DATABASE_NAME")}_{datetime.now().strftime("%Y_%m_%d_%H_%M")}'
139192
dump_path = os.path.join('/tmp', filename)
140193

141194
dump_database_methods = {
@@ -148,93 +201,47 @@ def dump_database(environment):
148201
database_type = environment.get('DATABASE_TYPE').lower()
149202
dump_database_method = dump_database_methods.get(database_type)
150203

151-
if dump_database_method:
152-
dump_path = dump_database_method(environment, dump_path)
204+
if not dump_database_method:
205+
raise NotImplemented(f"database of type {database_type} is not supported")
206+
207+
dump_path = dump_database_method(environment, dump_path)
208+
return dump_path
153209

154-
file_size = 0
155-
try:
156-
file_size = os.path.getsize(dump_path)
157-
except Exception as e:
158-
logger.error("Failed to get size of file.")
159-
logger.exception(e)
160210

161-
logger.info(f'{datetime.now()}: Backup created. Uploading to S3')
162-
try:
211+
def dump_database(environment):
212+
logger.info(f'Creating backup')
213+
214+
file_size = 0
215+
try:
216+
prepare_s3_bucket(environment)
217+
dump_path = create_dump(environment)
218+
logger.info(f'Backup created {dump_path=}')
219+
220+
if dump_path:
221+
logger.info(f'Uploading to S3')
163222
s3 = boto3.client('s3')
164-
try:
165-
region = environment.get('AWS_DEFAULT_REGION')
166-
s3.create_bucket(
167-
Bucket=environment.get('GLACIER_BUCKET_NAME'),
168-
**({'CreateBucketConfiguration': {'LocationConstraint': region}} if region != 'us-east-1' else {}),
169-
)
170-
except (s3.exceptions.BucketAlreadyExists, s3.exceptions.BucketAlreadyOwnedByYou):
171-
pass
172223

173224
try:
174-
configuration = s3.get_bucket_lifecycle_configuration(
175-
Bucket=environment.get('GLACIER_BUCKET_NAME')
176-
)
177-
except botocore.exceptions.ClientError as e:
178-
if e.response.get('Error', {}).get('Code') == 'NoSuchLifecycleConfiguration':
179-
configuration = {}
180-
else:
181-
raise
182-
183-
glacierizer_current_rule = next(filter(lambda r: r['ID'] == 'GLACIERIZER_EXPIRE_AFTER', configuration.get('Rules', [])), None)
184-
existing_rules = list(filter(lambda r: r['ID'] != 'GLACIERIZER_EXPIRE_AFTER', configuration.get('Rules', [])))
185-
186-
glacierizer_current_expire = glacierizer_current_rule.get('Expire', {}).get('Days', None) if glacierizer_current_rule else None
187-
glacierizer_new_expire = environment.get('GLACIER_EXPIRE_AFTER')
188-
189-
glacierizer_rule_enabled = glacierizer_new_expire > 0
190-
glacierizer_rule_changed = glacierizer_current_expire != glacierizer_new_expire
191-
192-
glacierizer_new_rule = None
193-
if glacierizer_rule_enabled and glacierizer_rule_changed:
194-
glacierizer_new_rule = {
195-
'ID': 'GLACIERIZER_EXPIRE_AFTER',
196-
'Status': 'Enabled',
197-
'Expiration': {
198-
'Days': glacierizer_new_expire,
199-
},
200-
'Filter': {},
201-
}
202-
203-
if glacierizer_new_rule:
204-
s3.put_bucket_lifecycle_configuration(
205-
Bucket=environment.get('GLACIER_BUCKET_NAME'),
206-
LifecycleConfiguration={
207-
'Rules': [*existing_rules, glacierizer_new_rule]
208-
}
209-
)
210-
elif glacierizer_rule_enabled is False and glacierizer_current_rule is not None:
211-
if len(existing_rules) == 0:
212-
s3.delete_bucket_lifecycle(
213-
Bucket=environment.get('GLACIER_BUCKET_NAME')
214-
)
215-
else:
216-
s3.put_bucket_lifecycle_configuration(
217-
Bucket=environment.get('GLACIER_BUCKET_NAME'),
218-
LifecycleConfiguration={
219-
'Rules': existing_rules
220-
}
221-
)
225+
file_size = os.path.getsize(dump_path)
226+
except Exception as e:
227+
logger.error("Failed to get size of file.")
228+
logger.exception(e)
222229

223230
with open(dump_path, 'rb') as file:
224231
logger.info(s3.put_object(
225232
Bucket=environment.get('GLACIER_BUCKET_NAME'),
226-
Key=filename,
233+
Key=os.path.basename(dump_path),
227234
Body=file,
228235
StorageClass=storage_class_map[environment.get('GLACIER_STORAGE_CLASS')]
229236
))
237+
230238
logger.info('Archive upload done.')
231239
send_slack_message(environment, f"Successfully created and uploaded DB dump ({sizeof_fmt(file_size)}).")
232-
except Exception as e:
233-
logger.exception(e)
234-
send_slack_message(
235-
environment,
236-
f"Failed to upload DB dump ({sizeof_fmt(file_size)}) to AWS S3 ({storage_class_map.get(environment.get('GLACIER_STORAGE_CLASS'), 'UNKNOWN')}). Please check the error in the container logs.",
237-
'FAIL'
238-
)
239-
else:
240-
logger.error(f'Database of type {database_type} is not supported. Supported types are: {dump_database_methods.keys()}')
240+
241+
except Exception as e:
242+
logger.exception(e)
243+
send_slack_message(
244+
environment,
245+
f"Failed to upload DB dump ({sizeof_fmt(file_size)}) to AWS S3 ({storage_class_map.get(environment.get('GLACIER_STORAGE_CLASS'), 'UNKNOWN')}). Please check the error in the container logs.",
246+
'FAIL'
247+
)

test/clickhouse_compose.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ version: "3.0"
22

33
services:
44
clickhouse_db:
5-
image: yandex/clickhouse-server
5+
image: clickhouse/clickhouse-server:24.5
66
ports:
77
- 8123:8123
88
- 9000:9000
@@ -12,12 +12,12 @@ services:
1212
CLICKHOUSE_DB: test_db
1313

1414
clickhouse_dumper:
15-
build: ../src
15+
build: ..
1616
restart: always
1717
ports:
1818
- 33399:33399
1919
environment:
20-
TEST: "false"
20+
TEST: "true"
2121
CRON: "* * * * *"
2222

2323
DATABASE_TYPE: "ClickHouse"

0 commit comments

Comments
 (0)