Skip to content

Commit 349b722

Browse files
authored
Merge pull request #7 from ssl-oyamata/workload_sampler#1
Workload sampler#1
2 parents 5ffbe07 + af5245d commit 349b722

15 files changed

+444
-48
lines changed

conf/postgres_opttune.conf

+19-2
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,28 @@ pg_os_user = postgres # PostgrSQL ower user(OS user)
1616
# When tuning remote PostgreSQL, use an SSH connection to restart PostgreSQL or clear the cache.
1717
# Note: When using remote PostgreSQL, it is necessary to grant sudo permission without password to the remote os user.
1818
ssh_port = 22 # ssh port
19-
ssh_password = postgres # pg_os_user's ssh passwoed
19+
ssh_password = postgres # pg_os_user's ssh password
20+
21+
[workload-sampling]
22+
workload_sampling_time_second = 300
23+
# Time (in seconds) to sample the workload running on the database in the [PostgreSQL] section
24+
my_workload_save_dir = ./workload_data/
25+
# Workload save database settings
26+
pghost = localhost # PostgreSQL server host
27+
pgport = 5432 # PostgreSQL server port
28+
pguser = postgres # PostgreSQL user name(Database user)
29+
pgpassword = postgres12 # PostgreSQL user password(Database user)
30+
pgdatabase = sampling # PostgreSQL Database
31+
# workload save directory
32+
2033

2134
[turning]
2235
study_name = pgbench_study # study name
2336
required_recovery_time_second = 0
2437
# The maximum recovery time allowed by the user in case of a PostgreSQL crash,
2538
# which is used to estimate the wax_wal_size parameter.
2639
# Note: The default value of 0 does not perform the estimation of the wax_wal_size parameter.
27-
benchmark = pgbench # Benchmark tool name('pgbench' or 'oltpbench' or 'star_schema_benchmark')
40+
benchmark = pgbench # Benchmark tool name('my_workload' or pgbench' or 'oltpbench' or 'star_schema_benchmark')
2841
parameter_json_dir = ./conf/
2942
number_trail = 100 # Number of benchmarks to run for turning
3043
data_load_interval = 10 # Specify the data load interval by the number of benchmarks
@@ -34,6 +47,10 @@ save_study_history = True # Whether to save study history
3447
load_study_history = True # Whether to load study history if a study name already exists.
3548
history_database_url = sqlite:///study-history.db # Example PostgreSQL. postgresql://postgres@localhost/study_history
3649

50+
[my-workload]
51+
my_workload_save_file = workload_data/2020-07-05_180647.531417-2020-07-05_180657.531661.pkl
52+
# File saved using workload_sampler.py
53+
3754
[pgbench]
3855
scale_factor = 10 # pgbench scale factor
3956
clients = 10 # Number of clients
+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import os
2+
from pgopttune.config.config import Config
3+
4+
5+
class MyWorkloadConfig(Config):
6+
def __init__(self, conf_path, section='my-workload'):
7+
super().__init__(conf_path)
8+
self.conf_path = conf_path
9+
self.config_dict = dict(self.config.items(section))
10+
self._check_is_exist_my_workload_save_file()
11+
12+
def _check_is_exist_my_workload_save_file(self):
13+
if not os.path.exists(self.get_parameter_value('my_workload_save_file')):
14+
raise ValueError("{} does not exist."
15+
"Check the my_workload_save_file parameter in {}."
16+
.format(self.get_parameter_value('my_workload_save_file'), self.conf_path))
17+
18+
@property
19+
def my_workload_save_file(self):
20+
return self.get_parameter_value('my_workload_save_file')
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from pgopttune.config.config import Config
2+
from pgopttune.utils.pg_connect import get_pg_dsn
3+
4+
5+
class WorkloadSamplingConfig(Config):
6+
def __init__(self, conf_path, section='workload-sampling'):
7+
super().__init__(conf_path)
8+
self.config_dict = dict(self.config.items(section))
9+
10+
@property
11+
def workload_sampling_time_second(self):
12+
return float(self.get_parameter_value('workload_sampling_time_second'))
13+
14+
@property
15+
def my_workload_save_dir(self):
16+
return self.get_parameter_value('my_workload_save_dir')
17+
18+
@property
19+
def dsn(self):
20+
return get_pg_dsn(pghost=self.get_parameter_value('pghost'),
21+
pgport=self.get_parameter_value('pgport'),
22+
pgdatabase=self.get_parameter_value('pgdatabase'),
23+
pguser=self.get_parameter_value('pguser'),
24+
pgpassword=self.get_parameter_value('pgpassword'))

pgopttune/log/pg_csv_log.py

+41-27
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@
1111

1212

1313
class PostgresCsvLog:
14-
def __init__(self, postgres_server_config: PostgresServerConfig, csv_log_table_name="csv_log"):
14+
def __init__(self, postgres_server_config: PostgresServerConfig):
1515
self._postgres_server_config = postgres_server_config
1616
self._postgres_parameter = PostgresParameter(postgres_server_config)
17-
self.csv_log_table_name = csv_log_table_name
18-
self.start_csv_log = None # The time when the csv log started to be output(epoch time)
19-
self.end_csv_log = None # The time when the csv log output was finished(epoch time)
17+
self._csv_log_table_name = "csv_log"
18+
self.start_csv_log_unix_time = None # The time when the csv log started to be output(epoch time)
19+
self.end_csv_log_unix_time = None # The time when the csv log output was finished(epoch time)
2020
self._csv_log_file_path = None
2121
self.csv_log_local_file_path = None # File path when the csv log file is copied to localhost
2222

@@ -26,7 +26,10 @@ def __init__(self, postgres_server_config: PostgresServerConfig, csv_log_table_n
2626
The current setting is off; to turn it on, you need to restart PostgreSQL.")
2727
# get current settings
2828
self._current_log_destination = self._get_log_destination()
29-
self._log_min_duration_statement = self._get_log_min_duration_statement()
29+
self._current_log_min_duration_statement = self._get_log_min_duration_statement()
30+
31+
def __del__(self):
32+
self.disable()
3033

3134
def enable(self):
3235
# set log_destination = '[current_setting],csvlog'
@@ -36,33 +39,41 @@ def enable(self):
3639
else:
3740
log_destination_value = self._current_log_destination + ',csvlog'
3841
self._postgres_parameter.set_parameter(param_name="log_destination", param_value=log_destination_value)
42+
logger.debug("Start outputting PostgreSQL log message to CSV file.\n"
43+
"log_destination = '{}'".format(log_destination_value))
3944
# set log_min_duration_statement = 0
4045
self._postgres_parameter.set_parameter(param_name="log_min_duration_statement", param_value=0, pg_reload=True)
41-
self.start_csv_log = time.time()
46+
logger.debug("Changed setting to output all executed SQL to PostgreSQL log file.\n"
47+
"log_min_duration_statement = '0'")
48+
self.start_csv_log_unix_time = time.time()
4249
self._csv_log_file_path = self._get_csv_log_file_path()
4350

4451
def disable(self):
4552
# set log_destination = '[current_setting]'
4653
self._postgres_parameter.set_parameter(param_name="log_destination", param_value=self._current_log_destination)
54+
logger.debug("PostgreSQL log message output to CSV file is disabled.\n"
55+
"log_destination = '{}'".format(self._current_log_min_duration_statement))
4756
# set log_min_duration_statement = current_setting
4857
self._postgres_parameter.set_parameter(param_name="log_min_duration_statement",
49-
param_value=self._log_min_duration_statement, pg_reload=True)
50-
self.end_csv_log = time.time()
51-
52-
def load_csv_to_local_database(self):
53-
self._copy_csv_logfile_to_local() # copy logfile to /tmp directory(localhost)
54-
self._create_csv_log_table()
55-
self._truncate_csv_log_table() # truncate csv log table
56-
with get_pg_connection(dsn=self._postgres_server_config.dsn) as conn:
58+
param_value=self._current_log_min_duration_statement, pg_reload=True)
59+
logger.debug("The value of the log_min_duration_statement parameter has been restored to its original value.\n"
60+
"log_min_duration_statement = '{}'".format(self._current_log_min_duration_statement))
61+
self.end_csv_log_unix_time = time.time()
62+
63+
def load_csv_to_database(self, copy_dir="/tmp", dsn=None):
64+
self._copy_csv_logfile_to_local(copy_dir) # copy logfile to directory(localhost)
65+
self._create_csv_log_table(dsn)
66+
self._truncate_csv_log_table(dsn) # truncate csv log table
67+
with get_pg_connection(dsn=dsn) as conn:
5768
conn.set_session(autocommit=True)
5869
with conn.cursor() as cur:
5970
with open(self.csv_log_local_file_path) as f:
6071
# cur.copy_from(f, self.csv_log_table_name, sep=',')
61-
cur.copy_expert("copy {} from stdin (format csv)".format(self.csv_log_table_name), f)
72+
cur.copy_expert("copy {} from stdin (format csv)".format(self._csv_log_table_name), f)
6273

63-
def _copy_csv_logfile_to_local(self):
74+
def _copy_csv_logfile_to_local(self, copy_dir="/tmp"):
6475
file_name = os.path.basename(self._csv_log_file_path)
65-
self.csv_log_local_file_path = os.path.join("/tmp", file_name)
76+
self.csv_log_local_file_path = os.path.join(copy_dir, file_name)
6677
ssh = SSHCommandExecutor(user=self._postgres_server_config.os_user,
6778
password=self._postgres_server_config.ssh_password,
6879
hostname=self._postgres_server_config.host,
@@ -94,7 +105,7 @@ def _get_csv_log_file_path(self):
94105
csv_file_path = os.path.join(self._postgres_server_config.pgdata, csv_file_path)
95106
return csv_file_path
96107

97-
def _create_csv_log_table(self):
108+
def _create_csv_log_table(self, dsn):
98109
create_table_sql = "CREATE TABLE IF NOT EXISTS {} (" \
99110
"log_time timestamp(3) with time zone," \
100111
"user_name text," \
@@ -119,15 +130,15 @@ def _create_csv_log_table(self):
119130
"query_pos integer," \
120131
"location text," \
121132
"application_name text," \
122-
"PRIMARY KEY (session_id, session_line_num));".format(self.csv_log_table_name)
123-
with get_pg_connection(dsn=self._postgres_server_config.dsn) as conn: # FIXME
133+
"PRIMARY KEY (session_id, session_line_num));".format(self._csv_log_table_name)
134+
with get_pg_connection(dsn=dsn) as conn:
124135
conn.set_session(autocommit=True)
125136
with conn.cursor() as cur:
126137
cur.execute(create_table_sql)
127138

128-
def _truncate_csv_log_table(self):
129-
truncate_table_sql = "TRUNCATE {}".format(self.csv_log_table_name)
130-
with get_pg_connection(dsn=self._postgres_server_config.dsn) as conn: # FIXME
139+
def _truncate_csv_log_table(self, dsn):
140+
truncate_table_sql = "TRUNCATE {}".format(self._csv_log_table_name)
141+
with get_pg_connection(dsn=dsn) as conn:
131142
conn.set_session(autocommit=True)
132143
with conn.cursor() as cur:
133144
cur.execute(truncate_table_sql)
@@ -136,12 +147,15 @@ def _truncate_csv_log_table(self):
136147
if __name__ == "__main__":
137148
from pgopttune.config.postgres_server_config import PostgresServerConfig
138149

150+
logging.basicConfig(level=logging.DEBUG)
139151
conf_path = './conf/postgres_opttune.conf'
140152
postgres_server_config_test = PostgresServerConfig(conf_path) # PostgreSQL Server config
141153
csv_log = PostgresCsvLog(postgres_server_config_test)
142154
csv_log.enable()
143-
print(csv_log._csv_log_file_path)
144-
print("sleep...")
145-
time.sleep(300)
155+
# logging.debug(csv_log._csv_log_file_path)
156+
logging.debug("sleep...")
157+
time.sleep(60)
146158
csv_log.disable()
147-
csv_log.load_csv_to_local_database()
159+
csv_log.load_csv_to_database()
160+
logging.debug(csv_log.start_csv_log_unix_time)
161+
logging.debug(csv_log.end_csv_log_unix_time)

pgopttune/objective/objective.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ def run_workload(self, trial):
3737
if (int(trial.number) == 0) or (int(trial.number) % self.data_load_interval == 0):
3838
self.workload.data_load() # data load
3939
self.params.reset_database() # cache free and database restart
40-
tps = self.workload.run() # benchmark run
41-
return tps
40+
objective_value = self.workload.run() # benchmark run
41+
return objective_value
4242

4343
def reset_param(self):
4444
# reset parameter value(reset postgresql.auto.conf)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import logging
2+
from pgopttune.workload.my_workload import MyWorkload
3+
from pgopttune.objective.objective import Objective
4+
from pgopttune.config.postgres_server_config import PostgresServerConfig
5+
from pgopttune.config.tune_config import TuneConfig
6+
from pgopttune.config.my_workload_config import MyWorkloadConfig
7+
8+
logger = logging.getLogger(__name__)
9+
10+
11+
class ObjectiveMyWorkload(Objective):
12+
13+
def __init__(self,
14+
postgres_server_config: PostgresServerConfig,
15+
tune_config: TuneConfig,
16+
my_workload_config: MyWorkloadConfig):
17+
super().__init__(postgres_server_config, tune_config)
18+
self.workload = MyWorkload.load_my_workload(my_workload_config.my_workload_save_file,
19+
postgres_server_config=postgres_server_config)

pgopttune/recovery/pg_recovery.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def estimate_max_wal_size(self):
4242
# Two to three checkpoints are performed within the WAL size specified in the max_wal_size parameter.
4343
# Therefore, we divide by 2 assuming the worst case.
4444
estimate_max_wal_size_mb = str(math.floor(estimate_max_wal_size / (1024 * 1024))) + 'MB'
45-
logging.info("The maximum value of max_wal_size estimated based on the measured values is {}.".format(
45+
logger.info("The maximum value of max_wal_size estimated based on the measured values is {}.".format(
4646
estimate_max_wal_size_mb))
4747
return estimate_max_wal_size_mb
4848

@@ -63,7 +63,7 @@ def measurement_recovery_time(self, measurement_rows_scale=100000, measurement_p
6363
self._crash_database()
6464
self._free_cache()
6565
recovery_time = self._measurement_recovery_database_time()
66-
# logging.info('The wal size written after the checkpoint is {}B. '
66+
# logger.info('The wal size written after the checkpoint is {}B. '
6767
# 'And crash recovery time is {}s'.format(recovery_wal_size, recovery_time))
6868
self.x_recovery_time = np.append(self.x_recovery_time, recovery_time)
6969
self.y_recovery_wal_size = np.append(self.y_recovery_wal_size, recovery_wal_size)
@@ -72,8 +72,8 @@ def measurement_recovery_time(self, measurement_rows_scale=100000, measurement_p
7272
self.reset_param()
7373
progress_bar.close()
7474
np.set_printoptions(precision=3)
75-
logging.info("The wal size written after the checkpoint(Byte) : {}".format(self.y_recovery_wal_size))
76-
logging.info("PostgreSQL Recovery time(Sec) : {}".format(self.x_recovery_time))
75+
logger.info("The wal size written after the checkpoint(Byte) : {}".format(self.y_recovery_wal_size))
76+
logger.info("PostgreSQL Recovery time(Sec) : {}".format(self.x_recovery_time))
7777

7878
def _create_test_table(self):
7979
create_table_sql = "CREATE TABLE IF NOT EXISTS " + self._test_table_name + "(id INT, test TEXT)"
@@ -136,8 +136,8 @@ def _get_latest_checkpoint_lsn(self):
136136
cut_res = subprocess.Popen(cut_cmd, stdout=subprocess.PIPE, stdin=grep_res.stdout)
137137
latest_checkpoint_lsn = cut_res.communicate()[0].decode('utf-8')
138138
except ValueError:
139-
logging.critical(traceback.format_exc())
140-
logging.info('Failed Command: {} '.format(get_latest_checkpoint_cmd_str))
139+
logger.critical(traceback.format_exc())
140+
logger.info('Failed Command: {} '.format(get_latest_checkpoint_cmd_str))
141141
sys.exit(1)
142142
else:
143143
recovery_database_cmd = 'sudo -i -u {} {}/pg_controldata -D {} ' \
@@ -212,4 +212,4 @@ def random_string(length):
212212
conf_path = '../../conf/postgres_opttune.conf'
213213
postgres_server_config_test = PostgresServerConfig(conf_path) # PostgreSQL Server config
214214
recovery = Recovery(postgres_server_config_test, required_recovery_time_second=300)
215-
print(recovery.estimate_max_wal_size())
215+
# print(recovery.estimate_max_wal_size())

pgopttune/study/study.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
logger = logging.getLogger(__name__)
55

66

7-
def create_study(study_name, sampler, save_study_history=False, load_study_history=False,
7+
def create_study(study_name, sampler, save_study_history=False, load_study_history=False, direction='minimize',
88
history_database_url='postgresql://postgres@localhost:5432/study_history'):
99
"""
1010
create study.
@@ -15,7 +15,7 @@ def create_study(study_name, sampler, save_study_history=False, load_study_histo
1515
db_storage = optuna.storages.RDBStorage(history_database_url)
1616

1717
try:
18-
study = optuna.create_study(study_name=study_name, sampler=sampler, direction='maximize',
18+
study = optuna.create_study(study_name=study_name, sampler=sampler, direction=direction,
1919
storage=db_storage)
2020
except optuna.exceptions.OptunaError:
2121
# If a study with the same name already exists, Load past history.

pgopttune/utils/remote_command.py

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import paramiko
22
import scp
33

4+
45
class SSHCommandExecutor:
56
def __init__(self, user, password='postgres', hostname='localhost', port=22, timeout=15.0):
67
self.hostname = hostname

pgopttune/workload/my_transaction.py

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import time
2+
import logging
3+
from pgopttune.utils.pg_connect import get_pg_connection
4+
from pgopttune.config.postgres_server_config import PostgresServerConfig
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
class MyTransaction:
10+
def __init__(self, session_id: str, query_start_time: list, statement: list):
11+
self.session_id = session_id
12+
self.query_start_time = query_start_time
13+
self.statement = statement
14+
15+
def run(self, postgres_server_config: PostgresServerConfig):
16+
start_time = time.time()
17+
elapsed_times = 0
18+
19+
# sleep until first statement start
20+
self._sleep_until_statement_start_time(start_time, self.query_start_time[0])
21+
22+
with get_pg_connection(dsn=postgres_server_config.dsn) as conn:
23+
conn.autocommit = False
24+
with conn.cursor() as cur:
25+
for index in range(len(self.query_start_time)):
26+
self._sleep_until_statement_start_time(start_time, self.query_start_time[index])
27+
query_start_time = time.time()
28+
if "vacuum" in self.statement[index].lower():
29+
cur.execute("END;")
30+
cur.execute(self.statement[index])
31+
# logger.info("Execute Statement : {}".format(self.statement[index]))
32+
elapsed_times += (time.time() - query_start_time)
33+
return elapsed_times
34+
35+
@staticmethod
36+
def _sleep_until_statement_start_time(start_time, query_start_time):
37+
while (time.time() - start_time) < query_start_time.total_seconds():
38+
sleep_time = query_start_time.total_seconds() - (time.time() - start_time)
39+
# logger.debug("sleep: {0:.4f}".format(sleep_time))
40+
if sleep_time > 1:
41+
time.sleep(sleep_time)
42+
else:
43+
time.sleep(0.1) # FIXME

0 commit comments

Comments
 (0)