Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 55 additions & 54 deletions partitionmanager/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,56 @@ def migrate_cmd(args):
)
MIGRATE_PARSER.set_defaults(func=migrate_cmd)

def _partition_table(conf, log, table, metrics):
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.error(f"Cannot proceed: {table} {table_problems}")
return None

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = conf.partition_period
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
return None

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
log.info(f"{table} planned SQL: {composite_sql_command}")
return {"sql": composite_sql_command, "noop": True}

log.info(f"{table} running SQL: {composite_sql_command}")

time_start = datetime.utcnow()
output = conf.dbcmd.run(composite_sql_command)
time_end = datetime.utcnow()
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

log.info(f"{table} results: {output}")
return {"sql": composite_sql_command, "output": output}

def do_partition(conf):
"""Produces SQL statements to manage partitions per the supplied configuration.
Expand Down Expand Up @@ -306,56 +356,15 @@ def do_partition(conf):

all_results = dict()
for table in conf.tables:
time_start = None
try:
table_problems = pm_tap.get_table_compatibility_problems(conf.dbcmd, table)
if table_problems:
log.error(f"Cannot proceed: {table} {table_problems}")
continue

map_data = pm_tap.get_partition_map(conf.dbcmd, table)

duration = conf.partition_period
if table.partition_period:
duration = table.partition_period

log.info(f"Evaluating {table} (duration={duration})")
cur_pos = partitionmanager.database_helpers.get_position_of_table(
conf.dbcmd, table, map_data
)

sql_cmds = pm_tap.get_pending_sql_reorganize_partition_commands(
database=conf.dbcmd,
table=table,
partition_list=map_data["partitions"],
current_position=cur_pos,
allowed_lifespan=duration,
num_empty_partitions=conf.num_empty,
evaluation_time=conf.curtime,
)

if not sql_cmds:
log.debug(f"{table} has no pending SQL updates.")
continue

composite_sql_command = "\n".join(sql_cmds)

if conf.noop:
all_results[table.name] = {"sql": composite_sql_command, "noop": True}
log.info(f"{table} planned SQL: {composite_sql_command}")
continue

log.info(f"{table} running SQL: {composite_sql_command}")
time_start = datetime.utcnow()
output = conf.dbcmd.run(composite_sql_command)

all_results[table.name] = {"sql": composite_sql_command, "output": output}
log.info(f"{table} results: {output}")
results = _partition_table(conf, log, table, metrics)
if results:
all_results[table.name] = results

except partitionmanager.types.NoEmptyPartitionsAvailableException:
log.warning(
f"Unable to automatically handle {table}: No empty "
"partition is available."
"Unable to automatically handle %s: No empty "
"partition is available.", table
)
except partitionmanager.types.DatabaseCommandException as e:
log.warning("Failed to automatically handle %s: %s", table, e)
Expand All @@ -364,14 +373,6 @@ def do_partition(conf):
log.warning("Failed to handle %s: %s", table, e)
metrics.add("alter_errors", table.name, 1)

time_end = datetime.utcnow()
if time_start:
metrics.add(
"alter_time_seconds",
table.name,
(time_end - time_start).total_seconds(),
)

if conf.prometheus_stats_path:
do_stats(conf, metrics=metrics)
return all_results
Expand Down
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ ignore = ["S101"] # Allow assert statements
max-complexity = 16 # default is 10

[tool.ruff.lint.per-file-ignores]
"partitionmanager/cli.py" = ["B008"] # TODO: Fix me
"partitionmanager/cli.py" = ["B008", "PERF203"] # TODO: Fix B008, upgrade to Py3.11 for PERF203
"partitionmanager/cli_test.py" = ["S608", "SIM115", "SIM117"] # TODO: Fix SIMs
"partitionmanager/sql.py" = ["B904", "S603"] # TODO: Fix S603
"partitionmanager/table_append_partition.py" = ["S608", "SIM102"] # TODO: Fix S608
Expand All @@ -116,4 +116,3 @@ max-complexity = 16 # default is 10
[tool.ruff.lint.pylint]
max-args = 7 # default is 5
max-branches = 15 # default is 12
max-statements = 52 # default is 50