Skip to content

Commit 12a5f8d

Browse files
gibber9809wraymo
andauthored
feat(clp-s): Record log-order at compression time. (#584)
Co-authored-by: wraymo <37269683+wraymo@users.noreply.github.com>
1 parent d969aaf commit 12a5f8d

26 files changed

+377
-91
lines changed

components/core/src/clp_s/ArchiveReader.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) {
2727
m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str);
2828
m_schema_map = ReaderUtils::read_schemas(archive_path_str);
2929

30+
m_log_event_idx_column_id = m_schema_tree->get_metadata_field_id(constants::cLogEventIdxName);
31+
3032
m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile);
3133
m_stream_reader.open_packed_streams(archive_path_str + constants::cArchiveTablesFile);
3234
}
@@ -310,6 +312,12 @@ void ArchiveReader::initialize_schema_reader(
310312
}
311313
BaseColumnReader* column_reader = append_reader_column(reader, column_id);
312314

315+
if (column_id == m_log_event_idx_column_id
316+
&& nullptr != dynamic_cast<Int64ColumnReader*>(column_reader))
317+
{
318+
reader.mark_column_as_log_event_idx(static_cast<Int64ColumnReader*>(column_reader));
319+
}
320+
313321
if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)
314322
{
315323
reader.mark_column_as_timestamp(column_reader);
@@ -346,6 +354,7 @@ void ArchiveReader::close() {
346354
m_cur_stream_id = 0;
347355
m_stream_buffer.reset();
348356
m_stream_buffer_size = 0ULL;
357+
m_log_event_idx_column_id = -1;
349358
}
350359

351360
std::shared_ptr<char[]> ArchiveReader::read_stream(size_t stream_id, bool reuse_buffer) {

components/core/src/clp_s/ArchiveReader.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,11 @@ class ArchiveReader {
142142
m_projection = projection;
143143
}
144144

145+
/**
146+
* @return true if this archive has log ordering information, and false otherwise.
147+
*/
148+
bool has_log_order() { return m_log_event_idx_column_id >= 0; }
149+
145150
private:
146151
/**
147152
* Initializes a schema reader passed by reference to become a reader for a given schema.
@@ -214,6 +219,7 @@ class ArchiveReader {
214219
std::shared_ptr<char[]> m_stream_buffer{};
215220
size_t m_stream_buffer_size{0ULL};
216221
size_t m_cur_stream_id{0ULL};
222+
int32_t m_log_event_idx_column_id{-1};
217223
};
218224
} // namespace clp_s
219225

components/core/src/clp_s/ArchiveWriter.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ void ArchiveWriter::close() {
6868
m_encoded_message_size = 0UL;
6969
m_uncompressed_size = 0UL;
7070
m_compressed_size = 0UL;
71+
m_next_log_event_id = 0;
7172
}
7273

7374
void ArchiveWriter::append_message(
@@ -86,6 +87,7 @@ void ArchiveWriter::append_message(
8687
}
8788

8889
m_encoded_message_size += schema_writer->append_message(message);
90+
++m_next_log_event_id;
8991
}
9092

9193
size_t ArchiveWriter::get_data_size() {

components/core/src/clp_s/ArchiveWriter.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#ifndef CLP_S_ARCHIVEWRITER_HPP
22
#define CLP_S_ARCHIVEWRITER_HPP
33

4+
#include <string_view>
45
#include <utility>
56

67
#include <boost/filesystem.hpp>
@@ -93,10 +94,15 @@ class ArchiveWriter {
9394
* @param key
9495
* @return the node id
9596
*/
96-
int32_t add_node(int parent_node_id, NodeType type, std::string const& key) {
97+
int32_t add_node(int parent_node_id, NodeType type, std::string_view const key) {
9798
return m_schema_tree.add_node(parent_node_id, type, key);
9899
}
99100

101+
/**
102+
* @return The Id that will be assigned to the next log event when appended to the archive.
103+
*/
104+
int64_t get_next_log_event_id() const { return m_next_log_event_id; }
105+
100106
/**
101107
* Return a schema's Id and add the schema to the
102108
* schema map if it does not already exist.
@@ -174,6 +180,7 @@ class ArchiveWriter {
174180
size_t m_encoded_message_size{};
175181
size_t m_uncompressed_size{};
176182
size_t m_compressed_size{};
183+
int64_t m_next_log_event_id{};
177184

178185
std::string m_id;
179186

components/core/src/clp_s/CommandLineArguments.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
194194
"structurize-arrays",
195195
po::bool_switch(&m_structurize_arrays),
196196
"Structurize arrays instead of compressing them as clp strings."
197+
)(
198+
"disable-log-order",
199+
po::bool_switch(&m_disable_log_order),
200+
"Do not record log order at ingestion time."
197201
);
198202
// clang-format on
199203

@@ -296,13 +300,13 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
296300
decompression_options.add_options()(
297301
"ordered",
298302
po::bool_switch(&m_ordered_decompression),
299-
"Enable decompression in ascending timestamp order for this archive"
303+
"Enable decompression in log order for this archive"
300304
)(
301305
"ordered-chunk-size",
302306
po::value<size_t>(&m_ordered_chunk_size)
303307
->default_value(m_ordered_chunk_size),
304308
"Number of records to include in each output file when decompressing records "
305-
"in ascending timestamp order"
309+
"in log order"
306310
);
307311
// clang-format on
308312
extraction_options.add(decompression_options);

components/core/src/clp_s/CommandLineArguments.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ class CommandLineArguments {
112112

113113
std::vector<std::string> const& get_projection_columns() const { return m_projection_columns; }
114114

115+
bool get_record_log_order() const { return false == m_disable_log_order; }
116+
115117
private:
116118
// Methods
117119
/**
@@ -178,6 +180,7 @@ class CommandLineArguments {
178180
bool m_ordered_decompression{false};
179181
size_t m_ordered_chunk_size{0};
180182
size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB
183+
bool m_disable_log_order{false};
181184

182185
// Metadata db variables
183186
std::optional<clp::GlobalMetadataDBConfig> m_metadata_db_config;

components/core/src/clp_s/JsonConstructor.cpp

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,13 @@ void JsonConstructor::store() {
4848
m_archive_reader = std::make_unique<ArchiveReader>();
4949
m_archive_reader->open(m_option.archives_dir, m_option.archive_id);
5050
m_archive_reader->read_dictionaries_and_metadata();
51-
if (false == m_option.ordered) {
51+
52+
if (m_option.ordered && false == m_archive_reader->has_log_order()) {
53+
SPDLOG_WARN("This archive is missing ordering information and can not be decompressed in "
54+
"log order. Falling back to out of order decompression.");
55+
}
56+
57+
if (false == m_option.ordered || false == m_archive_reader->has_log_order()) {
5258
FileWriter writer;
5359
writer.open(
5460
m_option.output_dir + "/original",
@@ -68,15 +74,15 @@ void JsonConstructor::construct_in_order() {
6874
auto tables = m_archive_reader->read_all_tables();
6975
using ReaderPointer = std::shared_ptr<SchemaReader>;
7076
auto cmp = [](ReaderPointer& left, ReaderPointer& right) {
71-
return left->get_next_timestamp() > right->get_next_timestamp();
77+
return left->get_next_log_event_idx() > right->get_next_log_event_idx();
7278
};
7379
std::priority_queue record_queue(tables.begin(), tables.end(), cmp);
7480
// Clear tables vector so that memory gets deallocated after we have marshalled all records for
7581
// a given table
7682
tables.clear();
7783

78-
epochtime_t first_timestamp{0};
79-
epochtime_t last_timestamp{0};
84+
int64_t first_idx{0};
85+
int64_t last_idx{0};
8086
size_t num_records_marshalled{0};
8187
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
8288
FileWriter writer;
@@ -97,9 +103,11 @@ void JsonConstructor::construct_in_order() {
97103

98104
std::vector<bsoncxx::document::value> results;
99105
auto finalize_chunk = [&](bool open_new_writer) {
106+
// Add one to last_idx to match clp's behaviour of having the end index be exclusive
107+
++last_idx;
100108
writer.close();
101-
std::string new_file_name = src_path.string() + "_" + std::to_string(first_timestamp) + "_"
102-
+ std::to_string(last_timestamp) + ".jsonl";
109+
std::string new_file_name = src_path.string() + "_" + std::to_string(first_idx) + "_"
110+
+ std::to_string(last_idx) + ".jsonl";
103111
auto new_file_path = std::filesystem::path(new_file_name);
104112
std::error_code ec;
105113
std::filesystem::rename(src_path, new_file_path, ec);
@@ -119,11 +127,11 @@ void JsonConstructor::construct_in_order() {
119127
),
120128
bsoncxx::builder::basic::kvp(
121129
constants::results_cache::decompression::cBeginMsgIx,
122-
static_cast<int64_t>(first_timestamp)
130+
first_idx
123131
),
124132
bsoncxx::builder::basic::kvp(
125133
constants::results_cache::decompression::cEndMsgIx,
126-
static_cast<int64_t>(last_timestamp)
134+
last_idx
127135
),
128136
bsoncxx::builder::basic::kvp(
129137
constants::results_cache::decompression::cIsLastIrChunk,
@@ -140,9 +148,9 @@ void JsonConstructor::construct_in_order() {
140148
while (false == record_queue.empty()) {
141149
ReaderPointer next = record_queue.top();
142150
record_queue.pop();
143-
last_timestamp = next->get_next_timestamp();
151+
last_idx = next->get_next_log_event_idx();
144152
if (0 == num_records_marshalled) {
145-
first_timestamp = last_timestamp;
153+
first_idx = last_idx;
146154
}
147155
next->get_next_message(buffer);
148156
if (false == next->done()) {

components/core/src/clp_s/JsonConstructor.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class JsonConstructor {
6666
private:
6767
/**
6868
* Reads all of the tables from m_archive_reader and writes all of the records
69-
* they contain to writer in timestamp order.
69+
* they contain to writer in log order.
7070
*/
7171
void construct_in_order();
7272

components/core/src/clp_s/JsonParser.cpp

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ JsonParser::JsonParser(JsonParserOption const& option)
1515
m_target_encoded_size(option.target_encoded_size),
1616
m_max_document_size(option.max_document_size),
1717
m_timestamp_key(option.timestamp_key),
18-
m_structurize_arrays(option.structurize_arrays) {
18+
m_structurize_arrays(option.structurize_arrays),
19+
m_record_log_order(option.record_log_order) {
1920
if (false == FileUtils::validate_path(option.file_paths)) {
2021
exit(1);
2122
}
@@ -447,6 +448,16 @@ bool JsonParser::parse() {
447448
m_num_messages = 0;
448449
size_t bytes_consumed_up_to_prev_archive = 0;
449450
size_t bytes_consumed_up_to_prev_record = 0;
451+
452+
int32_t log_event_idx_node_id{};
453+
auto add_log_event_idx_node = [&]() {
454+
if (m_record_log_order) {
455+
log_event_idx_node_id
456+
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
457+
}
458+
};
459+
add_log_event_idx_node();
460+
450461
while (json_file_iterator.get_json(json_it)) {
451462
m_current_schema.clear();
452463

@@ -467,11 +478,20 @@ bool JsonParser::parse() {
467478
return false;
468479
}
469480

481+
// Add log_event_idx field to metadata for record
482+
if (m_record_log_order) {
483+
m_current_parsed_message.add_value(
484+
log_event_idx_node_id,
485+
m_archive_writer->get_next_log_event_id()
486+
);
487+
m_current_schema.insert_ordered(log_event_idx_node_id);
488+
}
489+
470490
// Some errors from simdjson are latent until trying to access invalid JSON fields.
471491
// Instead of checking for an error every time we access a JSON field in parse_line we
472492
// just catch simdjson_error here instead.
473493
try {
474-
parse_line(ref.value(), -1, "");
494+
parse_line(ref.value(), constants::cRootNodeId, constants::cRootNodeName);
475495
} catch (simdjson::simdjson_error& error) {
476496
SPDLOG_ERROR(
477497
"Encountered error - {} - while trying to parse {} after parsing {} bytes",
@@ -496,6 +516,7 @@ bool JsonParser::parse() {
496516
);
497517
bytes_consumed_up_to_prev_archive = bytes_consumed_up_to_prev_record;
498518
split_archive();
519+
add_log_event_idx_node();
499520
}
500521

501522
m_current_parsed_message.clear();
@@ -526,6 +547,15 @@ bool JsonParser::parse() {
526547
return true;
527548
}
528549

550+
int32_t JsonParser::add_metadata_field(std::string_view const field_name, NodeType type) {
551+
auto metadata_subtree_id = m_archive_writer->add_node(
552+
constants::cRootNodeId,
553+
NodeType::Metadata,
554+
constants::cMetadataSubtreeName
555+
);
556+
return m_archive_writer->add_node(metadata_subtree_id, type, field_name);
557+
}
558+
529559
void JsonParser::store() {
530560
m_archive_writer->close();
531561
}

components/core/src/clp_s/JsonParser.hpp

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
#include <map>
55
#include <string>
6+
#include <string_view>
67
#include <variant>
78
#include <vector>
89

@@ -30,12 +31,13 @@ struct JsonParserOption {
3031
std::vector<std::string> file_paths;
3132
std::string timestamp_key;
3233
std::string archives_dir;
33-
size_t target_encoded_size;
34-
size_t max_document_size;
35-
size_t min_table_size;
36-
int compression_level;
37-
bool print_archive_stats;
38-
bool structurize_arrays;
34+
size_t target_encoded_size{};
35+
size_t max_document_size{};
36+
size_t min_table_size{};
37+
int compression_level{};
38+
bool print_archive_stats{};
39+
bool structurize_arrays{};
40+
bool record_log_order{true};
3941
std::shared_ptr<clp::GlobalMySQLMetadataDB> metadata_db;
4042
};
4143

@@ -94,6 +96,14 @@ class JsonParser {
9496
*/
9597
void split_archive();
9698

99+
/**
100+
* Adds an internal field to the MPT and get its Id.
101+
*
102+
* Note: this method should be called before parsing a record so that internal fields come first
103+
* in each table. This isn't strictly necessary, but it is a nice convention.
104+
*/
105+
int32_t add_metadata_field(std::string_view const field_name, NodeType type);
106+
97107
int m_num_messages;
98108
std::vector<std::string> m_file_paths;
99109

@@ -109,6 +119,7 @@ class JsonParser {
109119
size_t m_target_encoded_size;
110120
size_t m_max_document_size;
111121
bool m_structurize_arrays{false};
122+
bool m_record_log_order{true};
112123
};
113124
} // namespace clp_s
114125

0 commit comments

Comments
 (0)