Skip to content

Commit f2deb21

Browse files
feat(clp-s): Add delta-encoding support for integer columns; Use it for the log_event_idx column. (#1021)
Co-authored-by: kirkrodrigues <2454684+kirkrodrigues@users.noreply.github.com>
1 parent bcae26c commit f2deb21

15 files changed

+253
-9
lines changed

components/core/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -679,6 +679,7 @@ set(SOURCE_FILES_unitTest
679679
tests/TestOutputCleaner.hpp
680680
tests/test-BoundedReader.cpp
681681
tests/test-BufferedFileReader.cpp
682+
tests/test-clp_s-delta-encode-log-order.cpp
682683
tests/test-clp_s-end_to_end.cpp
683684
tests/test-clp_s-range_index.cpp
684685
tests/test-clp_s-search.cpp

components/core/src/clp_s/ArchiveReader.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@ BaseColumnReader* ArchiveReader::append_reader_column(SchemaReader& reader, int3
188188
case NodeType::Integer:
189189
column_reader = new Int64ColumnReader(column_id);
190190
break;
191+
case NodeType::DeltaInteger:
192+
column_reader = new DeltaEncodedInt64ColumnReader(column_id);
193+
break;
191194
case NodeType::Float:
192195
column_reader = new FloatColumnReader(column_id);
193196
break;
@@ -238,6 +241,9 @@ void ArchiveReader::append_unordered_reader_columns(
238241
case NodeType::Integer:
239242
column_reader = new Int64ColumnReader(column_id);
240243
break;
244+
case NodeType::DeltaInteger:
245+
column_reader = new DeltaEncodedInt64ColumnReader(column_id);
246+
break;
241247
case NodeType::Float:
242248
column_reader = new FloatColumnReader(column_id);
243249
break;
@@ -324,10 +330,8 @@ void ArchiveReader::initialize_schema_reader(
324330
}
325331
BaseColumnReader* column_reader = append_reader_column(reader, column_id);
326332

327-
if (column_id == m_log_event_idx_column_id
328-
&& nullptr != dynamic_cast<Int64ColumnReader*>(column_reader))
329-
{
330-
reader.mark_column_as_log_event_idx(static_cast<Int64ColumnReader*>(column_reader));
333+
if (column_id == m_log_event_idx_column_id) {
334+
reader.mark_column_as_log_event_idx(column_reader);
331335
}
332336

333337
if (should_extract_timestamp && column_reader && timestamp_column_ids.count(column_id) > 0)

components/core/src/clp_s/ArchiveWriter.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,9 @@ void ArchiveWriter::initialize_schema_writer(SchemaWriter* writer, Schema const&
331331
case NodeType::DateString:
332332
writer->append_column(new DateStringColumnWriter(id));
333333
break;
334+
case NodeType::DeltaInteger:
335+
writer->append_column(new DeltaEncodedInt64ColumnWriter(id));
336+
break;
334337
case NodeType::Metadata:
335338
case NodeType::NullValue:
336339
case NodeType::Object:

components/core/src/clp_s/ColumnReader.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,36 @@ std::variant<int64_t, double, std::string, uint8_t> Int64ColumnReader::extract_v
1616
return m_values[cur_message];
1717
}
1818

19+
void DeltaEncodedInt64ColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
20+
m_values = reader.read_unaligned_span<int64_t>(num_messages);
21+
if (num_messages > 0) {
22+
m_cur_idx = 0;
23+
m_cur_value = m_values[0];
24+
}
25+
}
26+
27+
int64_t DeltaEncodedInt64ColumnReader::get_value_at_idx(size_t idx) {
28+
if (m_cur_idx == idx) {
29+
return m_cur_value;
30+
}
31+
if (idx > m_cur_idx) {
32+
for (; m_cur_idx < idx; ++m_cur_idx) {
33+
m_cur_value += m_values[m_cur_idx + 1];
34+
}
35+
return m_cur_value;
36+
}
37+
for (; m_cur_idx > idx; --m_cur_idx) {
38+
m_cur_value -= m_values[m_cur_idx];
39+
}
40+
return m_cur_value;
41+
}
42+
43+
std::variant<int64_t, double, std::string, uint8_t> DeltaEncodedInt64ColumnReader::extract_value(
44+
uint64_t cur_message
45+
) {
46+
return get_value_at_idx(cur_message);
47+
}
48+
1949
void FloatColumnReader::load(BufferViewReader& reader, uint64_t num_messages) {
2050
m_values = reader.read_unaligned_span<double>(num_messages);
2151
}
@@ -25,6 +55,13 @@ Int64ColumnReader::extract_string_value_into_buffer(uint64_t cur_message, std::s
2555
buffer.append(std::to_string(m_values[cur_message]));
2656
}
2757

58+
void DeltaEncodedInt64ColumnReader::extract_string_value_into_buffer(
59+
uint64_t cur_message,
60+
std::string& buffer
61+
) {
62+
buffer.append(std::to_string(get_value_at_idx(cur_message)));
63+
}
64+
2865
std::variant<int64_t, double, std::string, uint8_t> FloatColumnReader::extract_value(
2966
uint64_t cur_message
3067
) {

components/core/src/clp_s/ColumnReader.hpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,39 @@ class Int64ColumnReader : public BaseColumnReader {
9191
UnalignedMemSpan<int64_t> m_values;
9292
};
9393

94+
class DeltaEncodedInt64ColumnReader : public BaseColumnReader {
95+
public:
96+
// Constructor
97+
explicit DeltaEncodedInt64ColumnReader(int32_t id) : BaseColumnReader(id) {}
98+
99+
// Destructor
100+
~DeltaEncodedInt64ColumnReader() override = default;
101+
102+
// Methods inherited from BaseColumnReader
103+
void load(BufferViewReader& reader, uint64_t num_messages) override;
104+
105+
NodeType get_type() override { return NodeType::DeltaInteger; }
106+
107+
std::variant<int64_t, double, std::string, uint8_t> extract_value(
108+
uint64_t cur_message
109+
) override;
110+
111+
void extract_string_value_into_buffer(uint64_t cur_message, std::string& buffer) override;
112+
113+
private:
114+
/**
115+
* Gets the value stored at a given index by summing up the stored deltas between the requested
116+
* index and the last requested index.
117+
* @param idx
118+
* @return The value stored at the requested index.
119+
*/
120+
int64_t get_value_at_idx(size_t idx);
121+
122+
UnalignedMemSpan<int64_t> m_values;
123+
int64_t m_cur_value{};
124+
size_t m_cur_idx{};
125+
};
126+
94127
class FloatColumnReader : public BaseColumnReader {
95128
public:
96129
// Constructor

components/core/src/clp_s/ColumnWriter.cpp

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,23 @@ void Int64ColumnWriter::store(ZstdCompressor& compressor) {
1111
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
1212
}
1313

14+
size_t DeltaEncodedInt64ColumnWriter::add_value(ParsedMessage::variable_t& value) {
15+
if (0 == m_values.size()) {
16+
m_cur = std::get<int64_t>(value);
17+
m_values.push_back(m_cur);
18+
} else {
19+
auto next = std::get<int64_t>(value);
20+
m_values.push_back(next - m_cur);
21+
m_cur = next;
22+
}
23+
return sizeof(int64_t);
24+
}
25+
26+
void DeltaEncodedInt64ColumnWriter::store(ZstdCompressor& compressor) {
27+
size_t size = m_values.size() * sizeof(int64_t);
28+
compressor.write(reinterpret_cast<char const*>(m_values.data()), size);
29+
}
30+
1431
size_t FloatColumnWriter::add_value(ParsedMessage::variable_t& value) {
1532
m_values.push_back(std::get<double>(value));
1633
return sizeof(double);

components/core/src/clp_s/ColumnWriter.hpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,24 @@ class Int64ColumnWriter : public BaseColumnWriter {
6363
std::vector<int64_t> m_values;
6464
};
6565

66+
class DeltaEncodedInt64ColumnWriter : public BaseColumnWriter {
67+
public:
68+
// Constructor
69+
explicit DeltaEncodedInt64ColumnWriter(int32_t id) : BaseColumnWriter(id) {}
70+
71+
// Destructor
72+
~DeltaEncodedInt64ColumnWriter() override = default;
73+
74+
// Methods inherited from BaseColumnWriter
75+
size_t add_value(ParsedMessage::variable_t& value) override;
76+
77+
void store(ZstdCompressor& compressor) override;
78+
79+
private:
80+
std::vector<int64_t> m_values;
81+
int64_t m_cur{};
82+
};
83+
6684
class FloatColumnWriter : public BaseColumnWriter {
6785
public:
6886
// Constructor

components/core/src/clp_s/JsonParser.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ bool JsonParser::parse() {
513513
auto initialize_fields_for_archive = [&]() -> bool {
514514
if (m_record_log_order) {
515515
log_event_idx_node_id
516-
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
516+
= add_metadata_field(constants::cLogEventIdxName, NodeType::DeltaInteger);
517517
}
518518
if (auto const rc = m_archive_writer->add_field_to_current_range(
519519
std::string{constants::range_index::cFilename},
@@ -982,7 +982,7 @@ auto JsonParser::parse_from_ir() -> bool {
982982
auto initialize_fields_for_archive = [&]() -> bool {
983983
if (m_record_log_order) {
984984
log_event_idx_node_id
985-
= add_metadata_field(constants::cLogEventIdxName, NodeType::Integer);
985+
= add_metadata_field(constants::cLogEventIdxName, NodeType::DeltaInteger);
986986
}
987987
if (auto const rc = m_archive_writer->add_field_to_current_range(
988988
std::string{constants::range_index::cFilename},

components/core/src/clp_s/SchemaReader.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,11 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) {
2929
return std::get<int64_t>(static_cast<Int64ColumnReader*>(m_timestamp_column)
3030
->extract_value(m_cur_message));
3131
};
32+
} else if (m_timestamp_column->get_type() == NodeType::DeltaInteger) {
33+
m_get_timestamp = [this]() {
34+
return std::get<int64_t>(static_cast<DeltaEncodedInt64ColumnReader*>(m_timestamp_column)
35+
->extract_value(m_cur_message));
36+
};
3237
} else if (m_timestamp_column->get_type() == NodeType::Float) {
3338
m_get_timestamp = [this]() {
3439
return static_cast<epochtime_t>(
@@ -428,6 +433,7 @@ size_t SchemaReader::generate_structured_array_template(
428433
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
429434
break;
430435
}
436+
case NodeType::DeltaInteger:
431437
case NodeType::Integer: {
432438
m_json_serializer.add_op(JsonSerializer::Op::AddIntValue);
433439
m_reordered_columns.push_back(m_columns[column_idx++]);
@@ -512,6 +518,7 @@ size_t SchemaReader::generate_structured_object_template(
512518
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
513519
break;
514520
}
521+
case NodeType::DeltaInteger:
515522
case NodeType::Integer: {
516523
m_json_serializer.add_op(JsonSerializer::Op::AddIntField);
517524
m_reordered_columns.push_back(m_columns[column_idx++]);
@@ -620,6 +627,7 @@ void SchemaReader::generate_json_template(int32_t id) {
620627
m_json_serializer.add_op(JsonSerializer::Op::EndArray);
621628
break;
622629
}
630+
case NodeType::DeltaInteger:
623631
case NodeType::Integer: {
624632
m_json_serializer.add_op(JsonSerializer::Op::AddIntField);
625633
m_reordered_columns.push_back(m_column_map[child_global_id]);

components/core/src/clp_s/SchemaReader.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ class SchemaReader {
206206
/**
207207
* Marks a column as the log_event_idx column.
208208
*/
209-
void mark_column_as_log_event_idx(Int64ColumnReader* column_reader) {
209+
void mark_column_as_log_event_idx(BaseColumnReader* column_reader) {
210210
m_log_event_idx_column = column_reader;
211211
}
212212

@@ -321,7 +321,7 @@ class SchemaReader {
321321

322322
BaseColumnReader* m_timestamp_column;
323323
std::function<epochtime_t()> m_get_timestamp;
324-
Int64ColumnReader* m_log_event_idx_column{nullptr};
324+
BaseColumnReader* m_log_event_idx_column{nullptr};
325325

326326
std::shared_ptr<SchemaTree> m_global_schema_tree;
327327
SchemaTree m_local_schema_tree;

0 commit comments

Comments
 (0)