forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchained_delegating_reader.hh
100 lines (81 loc) · 3.16 KB
/
chained_delegating_reader.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
/*
* Copyright 2020-present ScyllaDB
*/
/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
#pragma once
#include <seastar/core/shared_future.hh>
#include "flat_mutation_reader_v2.hh"
// A reader which allows to insert a deferring operation before reading.
// All calls will first wait for a future to resolve, then forward to a given underlying reader.
class chained_delegating_reader : public flat_mutation_reader_v2::impl {
std::unique_ptr<flat_mutation_reader_v2> _underlying;
std::function<future<flat_mutation_reader_v2>()> _populate_reader;
std::function<void()> _on_destroyed;
public:
chained_delegating_reader(schema_ptr s, std::function<future<flat_mutation_reader_v2>()>&& populate, reader_permit permit, std::function<void()> on_destroyed = []{})
: impl(s, std::move(permit))
, _populate_reader(std::move(populate))
, _on_destroyed(std::move(on_destroyed))
{ }
chained_delegating_reader(chained_delegating_reader&& rd) = delete;
~chained_delegating_reader() {
_on_destroyed();
}
virtual future<> fill_buffer() override {
if (!_underlying) {
return _populate_reader().then([this] (flat_mutation_reader_v2&& rd) {
_underlying = std::make_unique<flat_mutation_reader_v2>(std::move(rd));
return fill_buffer();
});
}
if (is_buffer_full()) {
return make_ready_future<>();
}
return _underlying->fill_buffer().then([this] {
_end_of_stream = _underlying->is_end_of_stream();
_underlying->move_buffer_content_to(*this);
});
}
virtual future<> fast_forward_to(position_range pr) override {
if (!_underlying) {
return _populate_reader().then([this, pr = std::move(pr)] (flat_mutation_reader_v2&& rd) mutable {
_underlying = std::make_unique<flat_mutation_reader_v2>(std::move(rd));
return fast_forward_to(pr);
});
}
_end_of_stream = false;
forward_buffer_to(pr.start());
return _underlying->fast_forward_to(std::move(pr));
}
virtual future<> next_partition() override {
if (!_underlying) {
return make_ready_future<>();
}
clear_buffer_to_next_partition();
auto f = make_ready_future<>();
if (is_buffer_empty()) {
f = _underlying->next_partition();
}
_end_of_stream = _underlying->is_end_of_stream() && _underlying->is_buffer_empty();
return f;
}
virtual future<> fast_forward_to(const dht::partition_range& pr) override {
if (!_underlying) {
return _populate_reader().then([this, &pr] (flat_mutation_reader_v2&& rd) mutable {
_underlying = std::make_unique<flat_mutation_reader_v2>(std::move(rd));
return fast_forward_to(pr);
});
}
_end_of_stream = false;
clear_buffer();
return _underlying->fast_forward_to(pr);
}
virtual future<> close() noexcept override {
if (_underlying) {
return _underlying->close();
}
return make_ready_future<>();
}
};