forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrow_cache.cc
132 lines (113 loc) · 3.85 KB
/
row_cache.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
* Copyright 2015 Cloudius Systems
*/
#include "row_cache.hh"
#include "core/memory.hh"
#include "core/do_with.hh"
#include "core/future-util.hh"
static thread_local logging::logger logger("cache");
cache_tracker& global_cache_tracker() {
static thread_local cache_tracker instance;
return instance;
}
cache_tracker::cache_tracker()
: _reclaimer([this] {
logger.warn("Clearing cache from reclaimer hook");
// FIXME: perform incremental eviction. We should first switch to a
// compacting memory allocator to avoid problems with memory
// fragmentation.
clear();
})
{ }
cache_tracker::~cache_tracker() {
clear();
}
void cache_tracker::clear() {
_lru.clear_and_dispose(std::default_delete<cache_entry>());
}
void cache_tracker::touch(cache_entry& e) {
_lru.erase(_lru.iterator_to(e));
_lru.push_front(e);
}
void cache_tracker::insert(cache_entry& entry) {
_lru.push_front(entry);
}
// Reader which populates the cache using data from the delegate.
class populating_reader {
row_cache& _cache;
mutation_reader _delegate;
public:
populating_reader(row_cache& cache, mutation_reader delegate)
: _cache(cache)
, _delegate(std::move(delegate))
{ }
future<mutation_opt> operator()() {
return _delegate().then([this] (mutation_opt&& mo) {
if (mo) {
_cache.populate(*mo);
}
return std::move(mo);
});
}
};
mutation_reader
row_cache::make_reader(const query::partition_range& range) {
if (range.is_singular()) {
const query::ring_position& pos = range.start_value();
if (!pos.has_key()) {
warn(unimplemented::cause::RANGE_QUERIES);
return populating_reader(*this, _underlying(range));
}
const dht::decorated_key& dk = pos.as_decorated_key();
auto i = _partitions.find(dk, cache_entry::compare(_schema));
if (i != _partitions.end()) {
cache_entry& e = *i;
_tracker.touch(e);
++_stats.hits;
return make_reader_returning(mutation(_schema, dk, e.partition()));
} else {
++_stats.misses;
return populating_reader(*this, _underlying(range));
}
}
warn(unimplemented::cause::RANGE_QUERIES);
return populating_reader(*this, _underlying(range));
}
row_cache::~row_cache() {
_partitions.clear_and_dispose(std::default_delete<cache_entry>());
}
void row_cache::populate(const mutation& m) {
populate(mutation(m));
}
void row_cache::populate(mutation&& m) {
auto i = _partitions.lower_bound(m.decorated_key(), cache_entry::compare(_schema));
if (i == _partitions.end()) {
cache_entry* entry = new cache_entry(m.decorated_key(), std::move(m.partition()));
_tracker.insert(*entry);
_partitions.insert(i, *entry);
} else {
cache_entry& entry = *i;
_tracker.touch(entry);
entry.partition().apply(*m.schema(), m.partition());
}
}
future<> row_cache::update(mutation_reader reader) {
return do_with(std::move(reader), [this] (mutation_reader& r) {
return consume(r, [this] (mutation&& m) {
auto i = _partitions.find(m.decorated_key(), cache_entry::compare(_schema));
// If cache doesn't contain the entry we cannot insert it because the mutation may be incomplete.
if (i != _partitions.end()) {
cache_entry& entry = *i;
_tracker.touch(entry);
entry.partition().apply(*m.schema(), std::move(m.partition()));
}
return stop_iteration::no;
});
});
}
row_cache::row_cache(schema_ptr s, mutation_source fallback_factory, cache_tracker& tracker)
: _tracker(tracker)
, _schema(std::move(s))
, _partitions(cache_entry::compare(_schema))
, _underlying(std::move(fallback_factory))
{ }