Skip to content

Commit eeb0147

Browse files
authored
refactor(clp-package): Move archive metadata update from clp-s to compression task workers. (#819)
1 parent 18ea029 commit eeb0147

File tree

10 files changed

+36
-109
lines changed

10 files changed

+36
-109
lines changed

components/clp-py-utils/clp_py_utils/clp_config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
QUERY_TASKS_TABLE_NAME = "query_tasks"
3737
COMPRESSION_JOBS_TABLE_NAME = "compression_jobs"
3838
COMPRESSION_TASKS_TABLE_NAME = "compression_tasks"
39+
ARCHIVES_TABLE_SUFFIX = "archives"
3940

4041
OS_RELEASE_FILE_PATH = pathlib.Path("etc") / "os-release"
4142

components/core/src/clp_s/ArchiveWriter.cpp

Lines changed: 9 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <json/single_include/nlohmann/json.hpp>
88

9+
#include "../clp/streaming_archive/Constants.hpp"
910
#include "archive_constants.hpp"
1011
#include "Defs.hpp"
1112
#include "SchemaTree.hpp"
@@ -100,10 +101,6 @@ void ArchiveWriter::close() {
100101
header_and_metadata_writer.close();
101102
}
102103

103-
if (m_metadata_db) {
104-
update_metadata_db();
105-
}
106-
107104
if (m_print_archive_stats) {
108105
print_archive_stats();
109106
}
@@ -430,29 +427,14 @@ std::pair<size_t, size_t> ArchiveWriter::store_tables() {
430427
return {table_metadata_compressed_size, table_compressed_size};
431428
}
432429

433-
void ArchiveWriter::update_metadata_db() {
434-
m_metadata_db->open();
435-
clp::streaming_archive::ArchiveMetadata metadata(
436-
cArchiveFormatDevelopmentVersionFlag,
437-
"",
438-
0ULL
439-
);
440-
metadata.increment_static_compressed_size(m_compressed_size);
441-
metadata.increment_static_uncompressed_size(m_uncompressed_size);
442-
metadata.expand_time_range(
443-
m_timestamp_dict.get_begin_timestamp(),
444-
m_timestamp_dict.get_end_timestamp()
445-
);
446-
447-
m_metadata_db->add_archive(m_id, metadata);
448-
m_metadata_db->close();
449-
}
450-
451-
void ArchiveWriter::print_archive_stats() {
452-
nlohmann::json json_msg;
453-
json_msg["id"] = m_id;
454-
json_msg["uncompressed_size"] = m_uncompressed_size;
455-
json_msg["size"] = m_compressed_size;
430+
auto ArchiveWriter::print_archive_stats() const -> void {
431+
namespace Archive = clp::streaming_archive::cMetadataDB::Archive;
432+
nlohmann::json json_msg
433+
= {{Archive::Id, m_id},
434+
{Archive::BeginTimestamp, m_timestamp_dict.get_begin_timestamp()},
435+
{Archive::EndTimestamp, m_timestamp_dict.get_end_timestamp()},
436+
{Archive::UncompressedSize, m_uncompressed_size},
437+
{Archive::Size, m_compressed_size}};
456438
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
457439
}
458440
} // namespace clp_s

components/core/src/clp_s/ArchiveWriter.hpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include <boost/uuid/uuid.hpp>
88
#include <boost/uuid/uuid_io.hpp>
99

10-
#include "../clp/GlobalMySQLMetadataDB.hpp"
1110
#include "archive_constants.hpp"
1211
#include "DictionaryWriter.hpp"
1312
#include "Schema.hpp"
@@ -66,8 +65,7 @@ class ArchiveWriter {
6665
};
6766

6867
// Constructor
69-
explicit ArchiveWriter(std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db)
70-
: m_metadata_db(std::move(metadata_db)) {}
68+
ArchiveWriter() = default;
7169

7270
// Destructor
7371
~ArchiveWriter() = default;
@@ -211,15 +209,10 @@ class ArchiveWriter {
211209
*/
212210
void write_archive_header(FileWriter& archive_writer, size_t metadata_section_size);
213211

214-
/**
215-
* Updates the metadata db with the archive's metadata (id, size, timestamp ranges, etc.)
216-
*/
217-
void update_metadata_db();
218-
219212
/**
220213
* Prints the archive's statistics (id, uncompressed size, compressed size, etc.)
221214
*/
222-
void print_archive_stats();
215+
auto print_archive_stats() const -> void;
223216

224217
static constexpr size_t cReadBlockSize = 4 * 1024;
225218

@@ -238,7 +231,6 @@ class ArchiveWriter {
238231
std::shared_ptr<LogTypeDictionaryWriter> m_log_dict;
239232
std::shared_ptr<LogTypeDictionaryWriter> m_array_dict; // log type dictionary for arrays
240233
TimestampDictionaryWriter m_timestamp_dict;
241-
std::shared_ptr<clp::GlobalMySQLMetadataDB> m_metadata_db;
242234
int m_compression_level{};
243235
bool m_print_archive_stats{};
244236
bool m_single_file_archive{};

components/core/src/clp_s/CMakeLists.txt

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@ set(
1818
../clp/CurlStringList.hpp
1919
../clp/cli_utils.cpp
2020
../clp/cli_utils.hpp
21-
../clp/database_utils.cpp
22-
../clp/database_utils.hpp
2321
../clp/Defs.h
2422
../clp/ErrorCode.hpp
2523
../clp/ffi/ir_stream/decoding_methods.cpp
@@ -42,23 +40,12 @@ set(
4240
../clp/FileDescriptor.hpp
4341
../clp/FileReader.cpp
4442
../clp/FileReader.hpp
45-
../clp/GlobalMetadataDB.hpp
46-
../clp/GlobalMetadataDBConfig.cpp
47-
../clp/GlobalMetadataDBConfig.hpp
48-
../clp/GlobalMySQLMetadataDB.cpp
49-
../clp/GlobalMySQLMetadataDB.hpp
5043
../clp/hash_utils.cpp
5144
../clp/hash_utils.hpp
5245
../clp/ir/EncodedTextAst.cpp
5346
../clp/ir/EncodedTextAst.hpp
5447
../clp/ir/parsing.cpp
5548
../clp/ir/parsing.hpp
56-
../clp/MySQLDB.cpp
57-
../clp/MySQLDB.hpp
58-
../clp/MySQLParamBindings.cpp
59-
../clp/MySQLParamBindings.hpp
60-
../clp/MySQLPreparedStatement.cpp
61-
../clp/MySQLPreparedStatement.hpp
6249
../clp/NetworkReader.cpp
6350
../clp/NetworkReader.hpp
6451
../clp/networking/socket_utils.cpp
@@ -70,6 +57,7 @@ set(
7057
../clp/spdlog_with_specializations.hpp
7158
../clp/streaming_archive/ArchiveMetadata.cpp
7259
../clp/streaming_archive/ArchiveMetadata.hpp
60+
../clp/streaming_archive/Constants.hpp
7361
../clp/streaming_compression/zstd/Decompressor.cpp
7462
../clp/streaming_compression/zstd/Decompressor.hpp
7563
../clp/Thread.cpp

components/core/src/clp_s/CommandLineArguments.cpp

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
205205
// clang-format on
206206

207207
po::options_description compression_options("Compression options");
208-
std::string metadata_db_config_file_path;
209208
std::string input_path_list_file_path;
210209
constexpr std::string_view cJsonFileType{"json"};
211210
constexpr std::string_view cKeyValueIrFileType{"kv-ir"};
@@ -238,11 +237,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
238237
po::value<std::string>(&m_timestamp_key)->value_name("TIMESTAMP_COLUMN_KEY")->
239238
default_value(m_timestamp_key),
240239
"Path (e.g. x.y) for the field containing the log event's timestamp."
241-
)(
242-
"db-config-file",
243-
po::value<std::string>(&metadata_db_config_file_path)->value_name("FILE")->
244-
default_value(metadata_db_config_file_path),
245-
"Global metadata DB YAML config"
246240
)(
247241
"files-from,f",
248242
po::value<std::string>(&input_path_list_file_path)
@@ -353,29 +347,6 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
353347
}
354348

355349
validate_network_auth(auth, m_network_auth);
356-
357-
// Parse and validate global metadata DB config
358-
if (false == metadata_db_config_file_path.empty()) {
359-
clp::GlobalMetadataDBConfig metadata_db_config;
360-
try {
361-
metadata_db_config.parse_config_file(metadata_db_config_file_path);
362-
} catch (std::exception& e) {
363-
SPDLOG_ERROR("Failed to validate metadata database config - {}.", e.what());
364-
return ParsingResult::Failure;
365-
}
366-
367-
if (clp::GlobalMetadataDBConfig::MetadataDBType::MySQL
368-
!= metadata_db_config.get_metadata_db_type())
369-
{
370-
SPDLOG_ERROR(
371-
"Invalid metadata database type for {}; only supported type is MySQL.",
372-
m_program_name
373-
);
374-
return ParsingResult::Failure;
375-
}
376-
377-
m_metadata_db_config = std::move(metadata_db_config);
378-
}
379350
} else if ((char)Command::Extract == command_input) {
380351
po::options_description extraction_options;
381352
std::string archive_path;

components/core/src/clp_s/CommandLineArguments.hpp

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
#include <boost/program_options/options_description.hpp>
1010
#include <boost/program_options/variables_map.hpp>
1111

12-
#include "../clp/GlobalMetadataDBConfig.hpp"
1312
#include "../reducer/types.hpp"
1413
#include "Defs.hpp"
1514
#include "InputConfig.hpp"
@@ -94,10 +93,6 @@ class CommandLineArguments {
9493

9594
bool get_ignore_case() const { return m_ignore_case; }
9695

97-
std::optional<clp::GlobalMetadataDBConfig> const& get_metadata_db_config() const {
98-
return m_metadata_db_config;
99-
}
100-
10196
std::string const& get_reducer_host() const { return m_reducer_host; }
10297

10398
int get_reducer_port() const { return m_reducer_port; }
@@ -200,9 +195,6 @@ class CommandLineArguments {
200195
bool m_disable_log_order{false};
201196
FileType m_file_type{FileType::Json};
202197

203-
// Metadata db variables
204-
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;
205-
206198
// MongoDB configuration variables
207199
std::string m_mongodb_uri;
208200
std::string m_mongodb_collection;

components/core/src/clp_s/JsonParser.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ JsonParser::JsonParser(JsonParserOption const& option)
127127
m_archive_options.authoritative_timestamp = m_timestamp_column;
128128
m_archive_options.authoritative_timestamp_namespace = m_timestamp_namespace;
129129

130-
m_archive_writer = std::make_unique<ArchiveWriter>(option.metadata_db);
130+
m_archive_writer = std::make_unique<ArchiveWriter>();
131131
m_archive_writer->open(m_archive_options);
132132
}
133133

components/core/src/clp_s/JsonParser.hpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include "../clp/ffi/KeyValuePairLogEvent.hpp"
1818
#include "../clp/ffi/SchemaTree.hpp"
1919
#include "../clp/ffi/Value.hpp"
20-
#include "../clp/GlobalMySQLMetadataDB.hpp"
2120
#include "../clp/ReaderInterface.hpp"
2221
#include "ArchiveWriter.hpp"
2322
#include "CommandLineArguments.hpp"
@@ -51,7 +50,6 @@ struct JsonParserOption {
5150
bool structurize_arrays{};
5251
bool record_log_order{true};
5352
bool single_file_archive{false};
54-
std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
5553
NetworkAuthOption network_auth{};
5654
};
5755

components/core/src/clp_s/clp-s.cpp

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include <spdlog/spdlog.h>
1414

1515
#include "../clp/CurlGlobalInstance.hpp"
16-
#include "../clp/GlobalMySQLMetadataDB.hpp"
1716
#include "../clp/streaming_archive/ArchiveMetadata.hpp"
1817
#include "../reducer/network_utils.hpp"
1918
#include "CommandLineArguments.hpp"
@@ -102,19 +101,6 @@ bool compress(CommandLineArguments const& command_line_arguments) {
102101
option.structurize_arrays = command_line_arguments.get_structurize_arrays();
103102
option.record_log_order = command_line_arguments.get_record_log_order();
104103

105-
auto const& db_config_container = command_line_arguments.get_metadata_db_config();
106-
if (db_config_container.has_value()) {
107-
auto const& db_config = db_config_container.value();
108-
option.metadata_db = std::make_shared<clp::GlobalMySQLMetadataDB>(
109-
db_config.get_metadata_db_host(),
110-
db_config.get_metadata_db_port(),
111-
db_config.get_metadata_db_username(),
112-
db_config.get_metadata_db_password(),
113-
db_config.get_metadata_db_name(),
114-
db_config.get_metadata_table_prefix()
115-
);
116-
}
117-
118104
clp_s::JsonParser parser(option);
119105
if (CommandLineArguments::FileType::KeyValueIr == option.input_file_type) {
120106
if (false == parser.parse_from_ir()) {

components/job-orchestration/job_orchestration/executor/compress/compression_task.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from celery.app.task import Task
1111
from celery.utils.log import get_task_logger
1212
from clp_py_utils.clp_config import (
13+
ARCHIVES_TABLE_SUFFIX,
1314
COMPRESSION_JOBS_TABLE_NAME,
1415
COMPRESSION_TASKS_TABLE_NAME,
1516
Database,
@@ -82,6 +83,23 @@ def update_job_metadata_and_tags(db_cursor, job_id, table_prefix, tag_ids, archi
8283
)
8384

8485

86+
def update_archive_metadata(db_cursor, table_prefix, archive_stats):
87+
archive_stats_defaults = {
88+
"begin_timestamp": 0,
89+
"end_timestamp": 0,
90+
"creator_id": "",
91+
"creation_ix": 0,
92+
}
93+
for k, v in archive_stats_defaults.items():
94+
archive_stats.setdefault(k, v)
95+
keys = ", ".join(archive_stats.keys())
96+
value_placeholders = ", ".join(["%s"] * len(archive_stats))
97+
query = (
98+
f"INSERT INTO {table_prefix}{ARCHIVES_TABLE_SUFFIX} ({keys}) VALUES ({value_placeholders})"
99+
)
100+
db_cursor.execute(query, list(archive_stats.values()))
101+
102+
85103
def _generate_fs_logs_list(
86104
output_file_path: pathlib.Path,
87105
paths_to_compress: PathsToCompress,
@@ -161,15 +179,13 @@ def make_clp_s_command_and_env(
161179
clp_home: pathlib.Path,
162180
archive_output_dir: pathlib.Path,
163181
clp_config: ClpIoConfig,
164-
db_config_file_path: pathlib.Path,
165182
use_single_file_archive: bool,
166183
) -> Tuple[List[str], Optional[Dict[str, str]]]:
167184
"""
168185
Generates the command and environment variables for a clp_s compression job.
169186
:param clp_home:
170187
:param archive_output_dir:
171188
:param clp_config:
172-
:param db_config_file_path:
173189
:param use_single_file_archive:
174190
:return: Tuple of (compression_command, compression_env_vars)
175191
"""
@@ -182,7 +198,6 @@ def make_clp_s_command_and_env(
182198
"--target-encoded-size",
183199
str(clp_config.output.target_segment_size + clp_config.output.target_dictionaries_size),
184200
"--compression-level", str(clp_config.output.compression_level),
185-
"--db-config-file", str(db_config_file_path),
186201
]
187202
# fmt: on
188203

@@ -271,7 +286,6 @@ def run_clp(
271286
clp_home=clp_home,
272287
archive_output_dir=archive_output_dir,
273288
clp_config=clp_config,
274-
db_config_file_path=db_config_file_path,
275289
use_single_file_archive=enable_s3_write,
276290
)
277291
else:
@@ -347,10 +361,13 @@ def run_clp(
347361
with closing(sql_adapter.create_connection(True)) as db_conn, closing(
348362
db_conn.cursor(dictionary=True)
349363
) as db_cursor:
364+
table_prefix = clp_metadata_db_connection_config["table_prefix"]
365+
if StorageEngine.CLP_S == clp_storage_engine:
366+
update_archive_metadata(db_cursor, table_prefix, last_archive_stats)
350367
update_job_metadata_and_tags(
351368
db_cursor,
352369
job_id,
353-
clp_metadata_db_connection_config["table_prefix"],
370+
table_prefix,
354371
tag_ids,
355372
last_archive_stats,
356373
)

0 commit comments

Comments
 (0)