Skip to content

Commit f14031c

Browse files
committed
Merge remote-tracking branch 'upstream/main' into disable-log-order-no-range-index
2 parents 3038b47 + d8ac122 commit f14031c

File tree

14 files changed

+258
-119
lines changed

14 files changed

+258
-119
lines changed

components/core/src/clp_s/ArchiveWriter.cpp

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
#include <nlohmann/json.hpp>
88
#include <spdlog/spdlog.h>
99

10-
#include "../clp/streaming_archive/Constants.hpp"
1110
#include "archive_constants.hpp"
1211
#include "Defs.hpp"
1312
#include "SchemaTree.hpp"
@@ -58,7 +57,7 @@ void ArchiveWriter::open(ArchiveWriterOption const& option) {
5857
m_array_dict->open(array_dict_path, m_compression_level, UINT64_MAX);
5958
}
6059

61-
void ArchiveWriter::close() {
60+
auto ArchiveWriter::close(bool is_split) -> ArchiveStats {
6261
if (m_range_open) {
6362
if (auto const rc = close_current_range(); ErrorCodeSuccess != rc) {
6463
throw OperationFailed(rc, __FILENAME__, __LINE__);
@@ -87,15 +86,16 @@ void ArchiveWriter::close() {
8786
offset += original_size;
8887
}
8988

89+
nlohmann::json archive_range_index;
9090
if (m_single_file_archive) {
91-
write_single_file_archive(files);
91+
archive_range_index = write_single_file_archive(files);
9292
} else {
9393
FileWriter header_and_metadata_writer;
9494
header_and_metadata_writer.open(
9595
m_archive_path + constants::cArchiveHeaderFile,
9696
FileWriter::OpenMode::CreateForWriting
9797
);
98-
write_archive_metadata(header_and_metadata_writer, files);
98+
archive_range_index = write_archive_metadata(header_and_metadata_writer, files);
9999
size_t metadata_size = header_and_metadata_writer.get_pos() - sizeof(ArchiveHeader);
100100

101101
m_compressed_size
@@ -107,8 +107,18 @@ void ArchiveWriter::close() {
107107
header_and_metadata_writer.close();
108108
}
109109

110+
ArchiveStats archive_stats{
111+
m_id,
112+
m_timestamp_dict.get_begin_timestamp(),
113+
m_timestamp_dict.get_end_timestamp(),
114+
m_uncompressed_size,
115+
m_compressed_size,
116+
archive_range_index,
117+
is_split
118+
};
110119
if (m_print_archive_stats) {
111-
print_archive_stats();
120+
std::cout << archive_stats.as_string() << '\n';
121+
std::cout << std::flush;
112122
}
113123

114124
m_id_to_schema_writer.clear();
@@ -123,14 +133,17 @@ void ArchiveWriter::close() {
123133
m_authoritative_timestamp_namespace.clear();
124134
m_matched_timestamp_prefix_length = 0ULL;
125135
m_matched_timestamp_prefix_node_id = constants::cRootNodeId;
136+
return archive_stats;
126137
}
127138

128-
void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const& files) {
139+
auto ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const& files)
140+
-> nlohmann::json {
129141
std::string single_file_archive_path = (std::filesystem::path(m_archives_dir) / m_id).string();
130142
FileWriter archive_writer;
131143
archive_writer.open(single_file_archive_path, FileWriter::OpenMode::CreateForWriting);
132144

133-
write_archive_metadata(archive_writer, files);
145+
// Avoid brace initialization to avoid wrapping nlohmann::json return value in a JSON array
146+
auto archive_range_index(write_archive_metadata(archive_writer, files));
134147
size_t metadata_section_size = archive_writer.get_pos() - sizeof(ArchiveHeader);
135148
write_archive_files(archive_writer, files);
136149
m_compressed_size = archive_writer.get_pos();
@@ -141,12 +154,13 @@ void ArchiveWriter::write_single_file_archive(std::vector<ArchiveFileInfo> const
141154
if (false == std::filesystem::remove(m_archive_path, ec)) {
142155
throw OperationFailed(ErrorCodeFileExists, __FILENAME__, __LINE__);
143156
}
157+
return archive_range_index;
144158
}
145159

146-
void ArchiveWriter::write_archive_metadata(
160+
auto ArchiveWriter::write_archive_metadata(
147161
FileWriter& archive_writer,
148162
std::vector<ArchiveFileInfo> const& files
149-
) {
163+
) -> nlohmann::json {
150164
archive_writer.seek_from_begin(sizeof(ArchiveHeader));
151165

152166
ZstdCompressor compressor;
@@ -184,11 +198,15 @@ void ArchiveWriter::write_archive_metadata(
184198
compressor.write(encoded_timestamp_dict.data(), encoded_timestamp_dict.size());
185199

186200
// Write range index
187-
if (auto rc = m_range_index_writer.write(compressor); ErrorCodeSuccess != rc) {
201+
nlohmann::json archive_range_index;
202+
if (auto rc = m_range_index_writer.write(compressor, archive_range_index);
203+
ErrorCodeSuccess != rc)
204+
{
188205
throw OperationFailed(rc, __FILENAME__, __LINE__);
189206
}
190207

191208
compressor.close();
209+
return archive_range_index;
192210
}
193211

194212
void ArchiveWriter::write_archive_files(
@@ -441,15 +459,4 @@ std::pair<size_t, size_t> ArchiveWriter::store_tables() {
441459

442460
return {table_metadata_compressed_size, table_compressed_size};
443461
}
444-
445-
auto ArchiveWriter::print_archive_stats() const -> void {
446-
namespace Archive = clp::streaming_archive::cMetadataDB::Archive;
447-
nlohmann::json json_msg
448-
= {{Archive::Id, m_id},
449-
{Archive::BeginTimestamp, m_timestamp_dict.get_begin_timestamp()},
450-
{Archive::EndTimestamp, m_timestamp_dict.get_end_timestamp()},
451-
{Archive::UncompressedSize, m_uncompressed_size},
452-
{Archive::Size, m_compressed_size}};
453-
std::cout << json_msg.dump(-1, ' ', true, nlohmann::json::error_handler_t::ignore) << std::endl;
454-
}
455462
} // namespace clp_s

components/core/src/clp_s/ArchiveWriter.hpp

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@
88

99
#include <boost/uuid/uuid.hpp>
1010
#include <boost/uuid/uuid_io.hpp>
11+
#include <nlohmann/json.hpp>
1112

13+
#include "../clp/streaming_archive/Constants.hpp"
1214
#include "archive_constants.hpp"
1315
#include "DictionaryWriter.hpp"
1416
#include "RangeIndexWriter.hpp"
@@ -31,6 +33,70 @@ struct ArchiveWriterOption {
3133
std::string authoritative_timestamp_namespace;
3234
};
3335

36+
class ArchiveStats {
37+
public:
38+
// Constructors
39+
explicit ArchiveStats(
40+
std::string id,
41+
epochtime_t begin_timestamp,
42+
epochtime_t end_timestamp,
43+
size_t uncompressed_size,
44+
size_t compressed_size,
45+
nlohmann::json range_index,
46+
bool is_split
47+
)
48+
: m_id{id},
49+
m_begin_timestamp{begin_timestamp},
50+
m_end_timestamp{end_timestamp},
51+
m_uncompressed_size{uncompressed_size},
52+
m_compressed_size{compressed_size},
53+
m_range_index(std::move(range_index)), // Avoid {} to prevent wrapping in JSON array.
54+
m_is_split{is_split} {}
55+
56+
// Methods
57+
/**
58+
* @return The contents of `ArchiveStats` as a JSON object in a string.
59+
*/
60+
[[nodiscard]] auto as_string() const -> std::string {
61+
namespace Archive = clp::streaming_archive::cMetadataDB::Archive;
62+
namespace File = clp::streaming_archive::cMetadataDB::File;
63+
constexpr std::string_view cRangeIndex{"range_index"};
64+
65+
nlohmann::json json_msg
66+
= {{Archive::Id, m_id},
67+
{Archive::BeginTimestamp, m_begin_timestamp},
68+
{Archive::EndTimestamp, m_end_timestamp},
69+
{Archive::UncompressedSize, m_uncompressed_size},
70+
{Archive::Size, m_compressed_size},
71+
{File::IsSplit, m_is_split},
72+
{cRangeIndex, m_range_index}};
73+
return json_msg.dump(-1, ' ', false, nlohmann::json::error_handler_t::ignore);
74+
}
75+
76+
auto get_id() const -> std::string const& { return m_id; }
77+
78+
auto get_begin_timestamp() const -> epochtime_t { return m_begin_timestamp; }
79+
80+
auto get_end_timestamp() const -> epochtime_t { return m_end_timestamp; }
81+
82+
auto get_uncompressed_size() const -> size_t { return m_uncompressed_size; }
83+
84+
auto get_compressed_size() const -> size_t { return m_compressed_size; }
85+
86+
auto get_range_index() const -> nlohmann::json const& { return m_range_index; }
87+
88+
auto get_is_split() const -> bool { return m_is_split; }
89+
90+
private:
91+
std::string m_id;
92+
epochtime_t m_begin_timestamp{};
93+
epochtime_t m_end_timestamp{};
94+
size_t m_uncompressed_size{};
95+
size_t m_compressed_size{};
96+
nlohmann::json m_range_index;
97+
bool m_is_split{};
98+
};
99+
34100
class ArchiveWriter {
35101
public:
36102
class OperationFailed : public TraceableException {
@@ -80,9 +146,11 @@ class ArchiveWriter {
80146
void open(ArchiveWriterOption const& option);
81147

82148
/**
83-
* Closes the archive writer
149+
* Closes the archive writer.
150+
* @param is_split Whether the last file ingested into the archive is split.
151+
* @return Statistics for the newly-written archive.
84152
*/
85-
void close();
153+
[[nodiscard]] auto close(bool is_split = false) -> ArchiveStats;
86154

87155
/**
88156
* Appends a message to the archive writer
@@ -220,16 +288,20 @@ class ArchiveWriter {
220288
/**
221289
* Writes the archive to a single file
222290
* @param files
291+
* @return The archive range index as a JSON object.
223292
*/
224-
void write_single_file_archive(std::vector<ArchiveFileInfo> const& files);
293+
[[nodiscard]] auto write_single_file_archive(std::vector<ArchiveFileInfo> const& files)
294+
-> nlohmann::json;
225295

226296
/**
227297
* Writes the metadata section of an archive.
228298
* @param archive_writer
229299
* @param files
300+
* @return The archive range index as a JSON object.
230301
*/
231-
void
232-
write_archive_metadata(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files);
302+
[[nodiscard]] auto
303+
write_archive_metadata(FileWriter& archive_writer, std::vector<ArchiveFileInfo> const& files)
304+
-> nlohmann::json;
233305

234306
/**
235307
* Writes the file section of the single file archive
@@ -245,11 +317,6 @@ class ArchiveWriter {
245317
*/
246318
void write_archive_header(FileWriter& archive_writer, size_t metadata_section_size);
247319

248-
/**
249-
* Prints the archive's statistics (id, uncompressed size, compressed size, etc.)
250-
*/
251-
auto print_archive_stats() const -> void;
252-
253320
static constexpr size_t cReadBlockSize = 4 * 1024;
254321

255322
size_t m_encoded_message_size{};

components/core/src/clp_s/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ target_link_libraries(
232232
PUBLIC
233233
absl::flat_hash_map
234234
clp_s::io
235+
nlohmann_json::nlohmann_json
235236
simdjson::simdjson
236237
PRIVATE
237238
Boost::url
@@ -240,7 +241,6 @@ target_link_libraries(
240241
${CURL_LIBRARIES}
241242
fmt::fmt
242243
msgpack-cxx
243-
nlohmann_json::nlohmann_json
244244
spdlog::spdlog
245245
)
246246

0 commit comments

Comments
 (0)