forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmutation_partition_serializer.cc
142 lines (124 loc) · 4.99 KB
/
mutation_partition_serializer.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
#include "mutation_partition_serializer.hh"
#include "db/serializer.hh"
//
// Representation layout:
//
// <partition> ::= <tombstone> <static-row> <range-tombstones> <row>*
// <range-tombstones> ::= <n-tombstones> <range-tombstone>*
// <range-tombstone> ::= <clustering-key-prefix> <tombstone>
// <row> ::= <tombstone> <created-at-timestamp> <cells>
// <static-row> ::= <cells>
// <cells> ::= <n-cells> <cell>+
// <cell> ::= <column-id> <value>
// <value> ::= <atomic-cell> | <collection-mutation>
//
// Notes:
// - <cell>s in <cells> are sorted by <column-id>
// - The count of <row>s is inferred from framing.
//
// FIXME: We could make this format more compact by using bitfields
// to mark absence of elements like:
// - range tombstones (they should be rare)
// - static row (maybe rare)
// - row's tombstone (present only when deleting rows) and timestamp
//
// Also atomic cells could be stored more compactly. Right now
// each cell has a one-byte flags field which tells if it's live or not
// and if it has ttl or not. We could condense these flags in a per-row bitmap.
using namespace db;
mutation_partition_serializer::mutation_partition_serializer(const schema& schema, const mutation_partition& p)
: _schema(schema), _p(p), _size(size(schema, p))
{ }
size_t
mutation_partition_serializer::size(const schema& schema, const mutation_partition& p) {
size_t size = 0;
size += tombstone_serializer(tombstone()).size();
// static row
size += sizeof(count_type);
for (auto&& e : p.static_row()) {
size += sizeof(column_id);
size += bytes_view_serializer(e.second.serialize()).size();
}
// row tombstones
size += sizeof(count_type);
for (const row_tombstones_entry& e : p.row_tombstones()) {
size += clustering_key_prefix_view_serializer(e.prefix()).size();
size += tombstone_serializer(e.t()).size();
}
// rows
for (const rows_entry& e : p.clustered_rows()) {
size += clustering_key_view_serializer(e.key()).size();
size += sizeof(api::timestamp_type); // e.row().created_at
size += tombstone_serializer(e.row().deleted_at()).size();
size += sizeof(count_type); // e.row().cells.size()
for (auto&& cell_entry : e.row().cells()) {
size += sizeof(column_id);
const column_definition& def = schema.regular_column_at(cell_entry.first);
if (def.is_atomic()) {
size += atomic_cell_view_serializer(cell_entry.second.as_atomic_cell()).size();
} else {
size += collection_mutation_view_serializer(cell_entry.second.as_collection_mutation()).size();
}
}
}
return size;
}
void
mutation_partition_serializer::write(data_output& out) const {
out.write<size_type>(_size);
write_without_framing(out);
}
void
mutation_partition_serializer::write_without_framing(data_output& out) const {
tombstone_serializer::write(out, _p.partition_tombstone());
// static row
auto n_static_columns = _p.static_row().size();
assert(n_static_columns == (count_type)n_static_columns);
out.write<count_type>(n_static_columns);
for (auto&& e : _p.static_row()) {
out.write(e.first);
bytes_view_serializer::write(out, e.second.serialize());
}
// row tombstones
auto n_tombstones = _p.row_tombstones().size();
assert(n_tombstones == (count_type)n_tombstones);
out.write<count_type>(n_tombstones);
for (const row_tombstones_entry& e : _p.row_tombstones()) {
clustering_key_prefix_view_serializer::write(out, e.prefix());
tombstone_serializer::write(out, e.t());
}
// rows
for (const rows_entry& e : _p.clustered_rows()) {
clustering_key_view_serializer::write(out, e.key());
out.write(e.row().created_at());
tombstone_serializer::write(out, e.row().deleted_at());
out.write<count_type>(e.row().cells().size());
for (auto&& cell_entry : e.row().cells()) {
out.write(cell_entry.first);
const column_definition& def = _schema.regular_column_at(cell_entry.first);
if (def.is_atomic()) {
atomic_cell_view_serializer::write(out, cell_entry.second.as_atomic_cell());
} else {
collection_mutation_view_serializer::write(out, cell_entry.second.as_collection_mutation());
}
}
}
}
void
mutation_partition_serializer::write(bytes_ostream& out) const {
out.write<size_type>(_size);
auto buf = out.write_place_holder(_size);
data_output data_out((char*)buf, _size);
write_without_framing(data_out);
}
mutation_partition_view
mutation_partition_serializer::read_as_view(data_input& in) {
auto size = in.read<size_type>();
return mutation_partition_view::from_bytes(in.read_view(size));
}
mutation_partition
mutation_partition_serializer::read(data_input& in, schema_ptr s) {
mutation_partition p(s);
p.apply(*s, read_as_view(in));
return p;
}