forked from scylladb/scylladb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcompress.hh
130 lines (121 loc) · 5.15 KB
/
compress.hh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright (C) 2015 Cloudius Systems, Ltd.
*/
#pragma once
#include "exceptions/exceptions.hh"
enum class compressor {
none,
lz4,
snappy,
deflate,
};
class compression_parameters {
public:
static constexpr int32_t DEFAULT_CHUNK_LENGTH = 64 * 1024;
static constexpr double DEFAULT_CRC_CHECK_CHANCE = 1.0;
static constexpr auto SSTABLE_COMPRESSION = "sstable_compression";
static constexpr auto CHUNK_LENGTH_KB = "chunk_length_kb";
static constexpr auto CRC_CHECK_CHANCE = "crc_check_chance";
private:
compressor _compressor = compressor::none;
std::experimental::optional<int> _chunk_length;
std::experimental::optional<double> _crc_check_chance;
public:
compression_parameters() = default;
compression_parameters(compressor c) : _compressor(c) { }
compression_parameters(const std::map<sstring, sstring>& options) {
validate_options(options);
auto it = options.find(SSTABLE_COMPRESSION);
if (it == options.end() || it->second.empty()) {
return;
}
const auto& compressor_class = it->second;
if (is_compressor_class(compressor_class, "LZ4Compressor")) {
_compressor = compressor::lz4;
} else if (is_compressor_class(compressor_class, "SnappyCompressor")) {
_compressor = compressor::snappy;
} else if (is_compressor_class(compressor_class, "DeflateCompressor")) {
_compressor = compressor::deflate;
} else {
throw exceptions::configuration_exception(sstring("Unsupported compression class '") + compressor_class + "'.");
}
auto chunk_length = options.find(CHUNK_LENGTH_KB);
if (chunk_length != options.end()) {
try {
_chunk_length = std::stoi(chunk_length->second) * 1024;
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sstring("Invalid integer value ") + chunk_length->second + " for " + CHUNK_LENGTH_KB);
}
}
auto crc_chance = options.find(CRC_CHECK_CHANCE);
if (crc_chance != options.end()) {
try {
_crc_check_chance = std::stod(crc_chance->second);
} catch (const std::exception& e) {
throw exceptions::syntax_exception(sstring("Invalid double value ") + crc_chance->second + "for " + CRC_CHECK_CHANCE);
}
}
}
compressor get_compressor() const { return _compressor; }
int32_t chunk_length() const { return _chunk_length.value_or(int(DEFAULT_CHUNK_LENGTH)); }
double crc_check_chance() const { return _crc_check_chance.value_or(double(DEFAULT_CRC_CHECK_CHANCE)); }
void validate() {
if (_chunk_length) {
auto chunk_length = _chunk_length.value();
if (chunk_length <= 0) {
throw exceptions::configuration_exception(sstring("Invalid negative or null ") + CHUNK_LENGTH_KB);
}
// _chunk_length must be a power of two
if (chunk_length & (chunk_length - 1)) {
throw exceptions::configuration_exception(sstring(CHUNK_LENGTH_KB) + " must be a power of 2.");
}
}
if (_crc_check_chance && (_crc_check_chance.value() < 0.0 || _crc_check_chance.value() > 1.0)) {
throw exceptions::configuration_exception(sstring(CRC_CHECK_CHANCE) + " must be between 0.0 and 1.0.");
}
}
std::map<sstring, sstring> get_options() const {
if (_compressor == compressor::none) {
return std::map<sstring, sstring>();
}
std::map<sstring, sstring> opts;
opts.emplace(sstring(SSTABLE_COMPRESSION), compressor_name());
if (_chunk_length) {
opts.emplace(sstring(CHUNK_LENGTH_KB), std::to_string(_chunk_length.value() / 1024));
}
if (_crc_check_chance) {
opts.emplace(sstring(CRC_CHECK_CHANCE), std::to_string(_crc_check_chance.value()));
}
return opts;
}
private:
void validate_options(const std::map<sstring, sstring>& options) {
// currently, there are no options specific to a particular compressor
static std::set<sstring> keywords({
sstring(SSTABLE_COMPRESSION),
sstring(CHUNK_LENGTH_KB),
sstring(CRC_CHECK_CHANCE),
});
for (auto&& opt : options) {
if (!keywords.count(opt.first)) {
throw exceptions::configuration_exception(sprint("Unknown compression option '%s'.", opt.first));
}
}
}
bool is_compressor_class(const sstring& value, const sstring& class_name) {
static const sstring namespace_prefix = "org.apache.cassandra.io.compress.";
return value == class_name || value == namespace_prefix + class_name;
}
sstring compressor_name() const {
switch (_compressor) {
case compressor::lz4:
return "org.apache.cassandra.io.compress.LZ4Compressor";
case compressor::snappy:
return "org.apache.cassandra.io.compress.SnappyCompressor";
case compressor::deflate:
return "org.apache.cassandra.io.compress.DeflateCompressor";
default:
abort();
}
}
};