Skip to content

Commit 24a6c5c

Browse files
max-ibragimowMax Ibragimov
andauthored
[TTDB-394] Added multiprocessing for dump mode (#25)
* [TTDB-394] Added multiprocessing for dump mode - Added `server_settings` for connections - Added docs for `db_utils` - Renamed CLI parameter `threads` to `db_connections_per_process` - Updated tests run arguments * [TTDB-394] Updated parameter `db_connections_per_process in `README.md` * [TTDB-394] Added check exists schema with utils * [TTDB-394] Fixed check exists schema with utils * [TTDB-394] Fixed json output for "view" modes * [TTDB-394] Fixed escaping quotes --------- Co-authored-by: Max Ibragimov <maxim.ibragimov@tantorlabs.ru>
1 parent ce8ec84 commit 24a6c5c

File tree

14 files changed

+429
-229
lines changed

14 files changed

+429
-229
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -347,12 +347,12 @@ python pg_anon.py --help
347347

348348
Common pg_anon options:
349349

350-
| Option | Description |
351-
|---------------|----------------------------------------------------------------|
352-
| `--debug` | Enable debug mode (default false) |
353-
| `--verbose` | Configure verbose mode: [info, debug, error] (default info) |
354-
| `--threads` | Amount of threads for IO operations (default 4) |
355-
| `--processes` | Amount of processes for multiprocessing operations (default 4) |
350+
| Option | Description |
351+
|---------------------------------|----------------------------------------------------------------------|
352+
| `--debug` | Enable debug mode (default false) |
353+
| `--verbose` | Configure verbose mode: [info, debug, error] (default info) |
354+
| `--db-connections-per-process` | Amount of connections for IO operations for each process (default 4) |
355+
| `--processes` | Amount of processes for multiprocessing operations (default 4) |
356356

357357
Database configuration options:
358358

pg_anon/common/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
ANON_UTILS_DB_SCHEMA_NAME = 'anon_funcs'

pg_anon/common/db_queries.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22

3+
from pg_anon.common.constants import ANON_UTILS_DB_SCHEMA_NAME
34
from pg_anon.common.dto import FieldInfo
45

56

@@ -9,15 +10,15 @@ def get_query_limit(limit: int) -> str:
910

1011
def get_query_get_scan_fields(limit: int = None, count_only: bool = False):
1112
if not count_only:
12-
fields = """
13+
fields = f"""
1314
SELECT DISTINCT
1415
n.nspname,
1516
c.relname,
1617
a.attname AS column_name,
1718
format_type(a.atttypid, a.atttypmod) as type,
1819
c.oid, a.attnum,
19-
anon_funcs.digest(n.nspname || '.' || c.relname || '.' || a.attname, '', 'md5') as obj_id,
20-
anon_funcs.digest(n.nspname || '.' || c.relname, '', 'md5') as tbl_id
20+
{ANON_UTILS_DB_SCHEMA_NAME}.digest(n.nspname || '.' || c.relname || '.' || a.attname, '', 'md5') as obj_id,
21+
{ANON_UTILS_DB_SCHEMA_NAME}.digest(n.nspname || '.' || c.relname, '', 'md5') as tbl_id
2122
"""
2223
order_by = 'ORDER BY 1, 2, a.attnum' if count_only else ''
2324
else:

pg_anon/common/db_utils.py

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,28 +3,69 @@
33
import asyncpg
44
from asyncpg import Connection
55

6+
from pg_anon.common.constants import ANON_UTILS_DB_SCHEMA_NAME
67
from pg_anon.common.db_queries import get_query_get_scan_fields
78
from pg_anon.common.dto import FieldInfo
89

910

10-
async def get_scan_fields_list(connection_params: Dict, limit: int = None) -> List:
11-
db_conn = await asyncpg.connect(**connection_params)
11+
async def check_anon_utils_db_schema_exists(connection_params: Dict, server_settings: Dict = None) -> bool:
12+
"""
13+
Checks exists db schema what consists predefined anonymization utils
14+
:param connection_params: Required connection parameters such as host, login, password and etc.
15+
:param server_settings: Optional server settings for new connection. Can consists of timeout settings, application name and etc.
16+
:return: Exists schema or not
17+
"""
18+
query = f"""
19+
select exists (select schema_name FROM information_schema.schemata where "schema_name" = '{ANON_UTILS_DB_SCHEMA_NAME}');
20+
"""
21+
22+
db_conn = await asyncpg.connect(**connection_params, server_settings=server_settings)
23+
exists = await db_conn.fetchval(query)
24+
await db_conn.close()
25+
return exists
26+
27+
28+
async def get_scan_fields_list(connection_params: Dict, server_settings: Dict = None, limit: int = None) -> List:
29+
"""
30+
Get fields list for scan sensitive data
31+
:param connection_params: Required connection parameters such as host, login, password and etc.
32+
:param server_settings: Optional server settings for new connection. Can consists of timeout settings, application name and etc.
33+
:param limit: Limit the number of results to return.
34+
:return: resulted fields list for processing
35+
"""
1236
query = get_query_get_scan_fields(limit=limit)
37+
38+
db_conn = await asyncpg.connect(**connection_params, server_settings=server_settings)
1339
fields_list = await db_conn.fetch(query)
1440
await db_conn.close()
1541
return fields_list
1642

1743

18-
async def get_scan_fields_count(connection_params: Dict) -> int:
19-
db_conn = await asyncpg.connect(**connection_params)
44+
async def get_scan_fields_count(connection_params: Dict, server_settings: Dict = None) -> int:
45+
"""
46+
Get count of fields for scan sensitive data
47+
:param connection_params: Required connection parameters such as host, login, password and etc.
48+
:param server_settings: Optional server settings for new connection. Can consists of timeout settings, application name and etc.
49+
:return: count of resulted fields list for processing
50+
"""
2051
query = get_query_get_scan_fields(count_only=True)
52+
53+
db_conn = await asyncpg.connect(**connection_params, server_settings=server_settings)
2154
count = await db_conn.fetchval(query)
2255
await db_conn.close()
2356
return count
2457

2558

26-
async def get_fields_list(connection_params: Dict, table_schema: str, table_name: str) -> List:
27-
db_conn = await asyncpg.connect(**connection_params)
59+
async def get_fields_list(connection_params: Dict, table_schema: str, table_name: str, server_settings: Dict = None) -> List:
60+
"""
61+
Get fields list for dump
62+
:param connection_params: Required connection parameters such as host, login, password and etc.
63+
:param table_schema: Table schema name
64+
:param table_name: Table name
65+
:param server_settings: Optional server settings for new connection. Can consists of timeout settings, application name and etc.
66+
:return: fields list for dump
67+
"""
68+
db_conn = await asyncpg.connect(**connection_params, server_settings=server_settings)
2869
fields_list = await db_conn.fetch(
2970
"""
3071
SELECT column_name, udt_name FROM information_schema.columns
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import time
2+
from typing import List, Callable
3+
4+
import aioprocessing
5+
6+
7+
async def init_process(name: str, ctx, target_func: Callable, tasks: List, *args, **kwargs):
8+
from pg_anon.context import Context
9+
10+
ctx: Context
11+
start_t = time.time()
12+
ctx.logger.info(f"================> Process [{name}] started. Input items: {len(tasks)}")
13+
queue = aioprocessing.AioQueue()
14+
15+
p = aioprocessing.AioProcess(
16+
target=target_func,
17+
args=(name, ctx, queue, tasks, *args),
18+
kwargs=kwargs,
19+
)
20+
p.start()
21+
res = None
22+
while True:
23+
result = await queue.coro_get()
24+
if result is None:
25+
break
26+
res = result
27+
await p.coro_join()
28+
end_t = time.time()
29+
elapsed = round(end_t - start_t, 2)
30+
result_item_log = str(len(res)) if res is not None else "0"
31+
ctx.logger.info(
32+
f"<================ Process [{name}] finished, elapsed: {elapsed} sec. Result {result_item_log} item(s)"
33+
)
34+
return res

pg_anon/common/utils.py

Lines changed: 17 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ async def get_dump_query(ctx, table_schema: str, table_name: str, table_rule,
206206
(table_schema + "_" + table_name).encode()
207207
).hexdigest()
208208

209-
files["%s.bin.gz" % hashed_name] = {"schema": table_schema, "table": table_name}
209+
files[f"{hashed_name}.bin.gz"] = {"schema": table_schema, "table": table_name}
210210

211211
if not found_white_list:
212212
included_objs.append(
@@ -217,7 +217,6 @@ async def get_dump_query(ctx, table_schema: str, table_name: str, table_rule,
217217
or ctx.args.dbg_stage_2_validate_data
218218
or ctx.args.dbg_stage_3_validate_full):
219219
query = "SELECT * FROM %s %s" % (table_name_full, ctx.validate_limit)
220-
ctx.logger.info(str(query))
221220
return query
222221
else:
223222
query = f"SELECT * FROM {table_name_full}"
@@ -242,52 +241,42 @@ async def get_dump_query(ctx, table_schema: str, table_name: str, table_rule,
242241
# the table is transferred with the specific fields for anonymization
243242
fields_list = await get_fields_list(
244243
connection_params=ctx.conn_params,
244+
server_settings=ctx.server_settings,
245245
table_schema=table_schema,
246246
table_name=table_name
247247
)
248248

249249
sql_expr = ""
250250

251-
def check_fld(fld_name):
252-
if fld_name in table_rule["fields"]:
253-
return fld_name, table_rule["fields"][fld_name]
251+
def check_field(field_name: str):
252+
if field_name in table_rule["fields"]:
253+
return field_name, table_rule["fields"][field_name]
254254
return None, None
255255

256256
for cnt, column_info in enumerate(fields_list):
257257
column_name = column_info["column_name"]
258258
udt_name = column_info["udt_name"]
259-
fld_name, fld_val = check_fld(column_name)
260-
if fld_name:
261-
if fld_val.find("SQL:") == 0:
262-
sql_expr += f'({fld_val[4:]}) as "{fld_name}"'
259+
field_name, field_value = check_field(column_name)
260+
261+
if field_name:
262+
if field_value.find("SQL:") == 0:
263+
sql_expr += f'({field_value[4:]}) as "{field_name}"'
263264
else:
264-
sql_expr += f'{fld_val}::{udt_name} as "{fld_name}"'
265+
sql_expr += f'{field_value}::{udt_name} as "{field_name}"'
265266
else:
266267
# field "as is"
267-
if (
268-
not column_name.islower() and not column_name.isupper()
269-
) or column_name.isupper():
270-
sql_expr += f'"{column_name}" as "{column_name}"'
271-
else:
272-
sql_expr += f'"{column_name}" as "{column_name}"'
268+
sql_expr += f'"{column_name}" as "{column_name}"'
269+
273270
if cnt != len(fields_list) - 1:
274271
sql_expr += ",\n"
275272

273+
query = f"SELECT {sql_expr} FROM {table_name_full}"
276274
if (ctx.args.dbg_stage_1_validate_dict
277275
or ctx.args.dbg_stage_2_validate_data
278276
or ctx.args.dbg_stage_3_validate_full):
279-
query = "SELECT %s FROM %s %s" % (
280-
sql_expr,
281-
table_name_full,
282-
ctx.validate_limit,
283-
)
284-
return query
285-
else:
286-
query = "SELECT %s FROM %s" % (
287-
sql_expr,
288-
table_name_full,
289-
)
290-
return query
277+
query += f" {ctx.validate_limit}"
278+
279+
return query
291280

292281

293282
def get_file_name_from_path(path: str) -> str:

pg_anon/context.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import os
33
from typing import Dict, Optional
44

5+
from pg_anon.common.constants import ANON_UTILS_DB_SCHEMA_NAME
56
from pg_anon.common.enums import VerboseOptions, AnonMode, ScanMode
67
from pg_anon.common.utils import (
78
exception_handler,
@@ -24,12 +25,20 @@ def __init__(self, args):
2425
self.total_rows = 0
2526
self.create_dict_sens_matches = {} # for create-dict mode
2627
self.create_dict_no_sens_matches = {} # for create-dict mode
27-
self.exclude_schemas = ["anon_funcs", "columnar_internal"]
28+
self.exclude_schemas = [ANON_UTILS_DB_SCHEMA_NAME, "columnar_internal"]
2829
self.logger = None
2930

3031
if args.db_user_password == "" and os.environ.get("PGPASSWORD") is not None:
3132
args.db_user_password = os.environ["PGPASSWORD"]
3233

34+
self.server_settings = {
35+
"application_name": "pg_anon",
36+
"statement_timeout": "0",
37+
"lock_timeout": "0",
38+
"idle_in_transaction_session_timeout": "0",
39+
"idle_session_timeout": "0",
40+
}
41+
3342
self.conn_params = {
3443
"host": args.db_host,
3544
"database": args.db_name,
@@ -208,8 +217,8 @@ def read_prepared_dict(self, save_dict_file_name_for_each_rule: bool = False):
208217
dictionary_rule['dict_file_name'] = dict_file
209218
self.prepared_dictionary_obj["dictionary"].extend(dictionary_rules)
210219

211-
if dictionary := dict_data.get("dictionary_exclude", []):
212-
self.prepared_dictionary_obj["dictionary_exclude"].extend(dictionary)
220+
if dictionary_exclude_rules := dict_data.get("dictionary_exclude", []):
221+
self.prepared_dictionary_obj["dictionary_exclude"].extend(dictionary_exclude_rules)
213222

214223
if validate_tables := dict_data.get("validate_tables", []):
215224
self.prepared_dictionary_obj["validate_tables"].extend(validate_tables)
@@ -260,10 +269,10 @@ def get_arg_parser():
260269
help="In 'create-dict' mode input file or file list with scan rules of sensitive and not sensitive fields"
261270
)
262271
parser.add_argument(
263-
"--threads",
272+
"--db-connections-per-process",
264273
type=int,
265274
default=4,
266-
help="Amount of threads for IO operations.",
275+
help="Amount of db connections for each process for IO operations.",
267276
)
268277
parser.add_argument(
269278
"--processes",

0 commit comments

Comments
 (0)