forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquery-result-writer.hh
141 lines (121 loc) · 3.78 KB
/
query-result-writer.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
/*
* Copyright 2015 Cloudius Systems
*/
#pragma once
#include "types.hh"
#include "atomic_cell.hh"
#include "query-request.hh"
#include "query-result.hh"
// Refer to query-result.hh for the query result format
namespace query {
class result::row_writer {
bytes_ostream& _w;
const partition_slice& _slice;
bytes_ostream::place_holder<uint32_t> _size_ph;
size_t _start_pos;
bool _finished = false;
public:
row_writer(
const partition_slice& slice,
bytes_ostream& w,
bytes_ostream::place_holder<uint32_t> size_ph)
: _w(w)
, _slice(slice)
, _size_ph(size_ph)
, _start_pos(w.size())
{ }
~row_writer() {
assert(_finished);
}
void add_empty() {
// FIXME: store this in a bitmap
_w.write<int8_t>(false);
}
void add(::atomic_cell_view c) {
// FIXME: store this in a bitmap
_w.write<int8_t>(true);
assert(c.is_live());
if (_slice.options.contains<partition_slice::option::send_timestamp_and_expiry>()) {
_w.write(c.timestamp());
if (c.is_live_and_has_ttl()) {
_w.write<gc_clock::rep>(c.expiry().time_since_epoch().count());
} else {
_w.write<gc_clock::rep>(std::numeric_limits<gc_clock::rep>::max());
}
}
_w.write_blob(c.value());
}
void add(collection_mutation::view v) {
// FIXME: store this in a bitmap
_w.write<int8_t>(true);
_w.write_blob(v.data);
}
void finish() {
auto row_size = _w.size() - _start_pos;
assert((uint32_t)row_size == row_size);
_w.set(_size_ph, (uint32_t)row_size);
_finished = true;
}
};
class result::partition_writer {
bytes_ostream& _w;
const partition_slice& _slice;
bytes_ostream::place_holder<uint32_t> _count_ph;
uint32_t _row_count = 0;
bool _static_row_added = false;
bool _finished = false;
public:
partition_writer(
const partition_slice& slice,
bytes_ostream::place_holder<uint32_t> count_ph,
bytes_ostream& w)
: _w(w)
, _slice(slice)
, _count_ph(count_ph)
{ }
~partition_writer() {
assert(_finished);
}
row_writer add_row(const clustering_key& key) {
if (_slice.options.contains<partition_slice::option::send_clustering_key>()) {
_w.write_blob(key);
}
++_row_count;
auto size_placeholder = _w.write_place_holder<uint32_t>();
return row_writer(_slice, _w, size_placeholder);
}
// Call before any add_row()
row_writer add_static_row() {
assert(!_static_row_added); // Static row can be added only once
assert(!_row_count); // Static row must be added before clustered rows
_static_row_added = true;
auto size_placeholder = _w.write_place_holder<uint32_t>();
return row_writer(_slice, _w, size_placeholder);
}
uint32_t row_count() const {
return std::max(_row_count, (uint32_t)_static_row_added);
}
void finish() {
_w.set(_count_ph, _row_count);
_finished = true;
}
};
class result::builder {
bytes_ostream _w;
const partition_slice& _slice;
public:
builder(const partition_slice& slice) : _slice(slice) { }
// Starts new partition and returns a builder for its contents.
// Invalidates all previously obtained builders
partition_writer add_partition(const partition_key& key) {
auto count_place_holder = _w.write_place_holder<uint32_t>();
if (_slice.options.contains<partition_slice::option::send_partition_key>()) {
_w.write_blob(key);
}
return partition_writer(_slice, count_place_holder, _w);
}
result build() {
return result(std::move(_w));
};
};
}