Skip to content

Commit

Permalink
feat(clp-s): Chunk output by size (in bytes) during ordered decompres…
Browse files Browse the repository at this point in the history
…sion. (#600)

Co-authored-by: haiqi96 <14502009+haiqi96@users.noreply.github.com>
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 20, 2024
1 parent 12a5f8d commit ee7e493
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,9 @@ def main(argv):
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size", default=100000
"--target-chunk-size",
type=int,
help="Target chunk size (B).",
)

parsed_args = args_parser.parse_args(argv[1:])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def main(argv):
json_extraction_parser = command_args_parser.add_parser(EXTRACT_JSON_CMD)
json_extraction_parser.add_argument("archive_id", type=str, help="Archive ID")
json_extraction_parser.add_argument(
"--target-chunk-size", type=int, help="Target chunk size.", required=True
"--target-chunk-size", type=int, help="Target chunk size (B)."
)

parsed_args = args_parser.parse_args(argv[1:])
Expand Down
16 changes: 9 additions & 7 deletions components/core/src/clp_s/CommandLineArguments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,12 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
po::bool_switch(&m_ordered_decompression),
"Enable decompression in log order for this archive"
)(
"ordered-chunk-size",
po::value<size_t>(&m_ordered_chunk_size)
->default_value(m_ordered_chunk_size),
"Number of records to include in each output file when decompressing records "
"in log order"
"target-ordered-chunk-size",
po::value<size_t>(&m_target_ordered_chunk_size)
->default_value(m_target_ordered_chunk_size)
->value_name("SIZE"),
"Chunk size (B) for each output file when decompressing records in log order."
" When set to 0, no chunking is performed."
);
// clang-format on
extraction_options.add(decompression_options);
Expand Down Expand Up @@ -369,8 +370,9 @@ CommandLineArguments::parse_arguments(int argc, char const** argv) {
throw std::invalid_argument("No output directory specified");
}

if (0 != m_ordered_chunk_size && false == m_ordered_decompression) {
throw std::invalid_argument("ordered-chunk-size must be used with ordered argument"
if (0 != m_target_ordered_chunk_size && false == m_ordered_decompression) {
throw std::invalid_argument(
"target-ordered-chunk-size must be used with ordered argument"
);
}

Expand Down
4 changes: 2 additions & 2 deletions components/core/src/clp_s/CommandLineArguments.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ class CommandLineArguments {

bool get_ordered_decompression() const { return m_ordered_decompression; }

size_t get_ordered_chunk_size() const { return m_ordered_chunk_size; }
size_t get_target_ordered_chunk_size() const { return m_target_ordered_chunk_size; }

size_t get_minimum_table_size() const { return m_minimum_table_size; }

Expand Down Expand Up @@ -178,7 +178,7 @@ class CommandLineArguments {
size_t m_max_document_size{512ULL * 1024 * 1024}; // 512 MB
bool m_structurize_arrays{false};
bool m_ordered_decompression{false};
size_t m_ordered_chunk_size{0};
size_t m_target_ordered_chunk_size{};
size_t m_minimum_table_size{1ULL * 1024 * 1024}; // 1 MB
bool m_disable_log_order{false};

Expand Down
18 changes: 9 additions & 9 deletions components/core/src/clp_s/JsonConstructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ void JsonConstructor::construct_in_order() {
// a given table
tables.clear();

int64_t first_idx{0};
int64_t last_idx{0};
size_t num_records_marshalled{0};
int64_t first_idx{};
int64_t last_idx{};
size_t chunk_size{};
auto src_path = std::filesystem::path(m_option.output_dir) / m_option.archive_id;
FileWriter writer;
writer.open(src_path, FileWriter::OpenMode::CreateForWriting);
Expand Down Expand Up @@ -149,25 +149,25 @@ void JsonConstructor::construct_in_order() {
ReaderPointer next = record_queue.top();
record_queue.pop();
last_idx = next->get_next_log_event_idx();
if (0 == num_records_marshalled) {
if (0 == chunk_size) {
first_idx = last_idx;
}
next->get_next_message(buffer);
if (false == next->done()) {
record_queue.emplace(std::move(next));
}
writer.write(buffer.c_str(), buffer.length());
num_records_marshalled += 1;
chunk_size += buffer.length();

if (0 != m_option.ordered_chunk_size
&& num_records_marshalled >= m_option.ordered_chunk_size)
if (0 != m_option.target_ordered_chunk_size
&& chunk_size >= m_option.target_ordered_chunk_size)
{
finalize_chunk(true);
num_records_marshalled = 0;
chunk_size = 0;
}
}

if (num_records_marshalled > 0) {
if (chunk_size > 0) {
finalize_chunk(false);
} else {
writer.close();
Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/JsonConstructor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ struct JsonConstructorOption {
std::string archive_id;
std::string output_dir;
bool ordered{false};
size_t ordered_chunk_size{0};
size_t target_ordered_chunk_size{};
std::optional<MetadataDbOption> metadata_db;
};

Expand Down
2 changes: 1 addition & 1 deletion components/core/src/clp_s/clp-s.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ int main(int argc, char const* argv[]) {
option.output_dir = command_line_arguments.get_output_dir();
option.ordered = command_line_arguments.get_ordered_decompression();
option.archives_dir = archives_dir;
option.ordered_chunk_size = command_line_arguments.get_ordered_chunk_size();
option.target_ordered_chunk_size = command_line_arguments.get_target_ordered_chunk_size();
if (false == command_line_arguments.get_mongodb_uri().empty()) {
option.metadata_db
= {command_line_arguments.get_mongodb_uri(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def make_command(
stream_collection_name,
]
if extract_json_config.target_chunk_size is not None:
command.append("--ordered-chunk-size")
command.append("--target-ordered-chunk-size")
command.append(str(extract_json_config.target_chunk_size))
else:
logger.error(f"Unsupported storage engine {storage_engine}")
Expand Down

0 comments on commit ee7e493

Please sign in to comment.