Skip to content

Commit

Permalink
sstables: add initial support to generation of summary file
Browse files Browse the repository at this point in the history
Signed-off-by: Raphael S. Carvalho <raphaelsc@cloudius-systems.com>
  • Loading branch information
raphaelsc committed May 20, 2015
1 parent 9e71f6d commit 57060b5
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 17 deletions.
3 changes: 3 additions & 0 deletions sstables/key.hh
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public:
explicit operator bytes_view() const {
return _bytes;
}
bytes& get_bytes() {
return _bytes;
}
};

bytes composite_from_clustering_key(const schema& s, const clustering_key& ck);
Expand Down
54 changes: 51 additions & 3 deletions sstables/sstables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1043,20 +1043,66 @@ static future<> write_index_entry(file_writer& out, disk_string_view<uint16_t>&
return write(out, key, pos, promoted_index_size);
}

static constexpr int BASE_SAMPLING_LEVEL = 128;

static void prepare_summary(summary& s, const memtable& mt) {
auto all_partitions = mt.all_partitions();
assert(all_partitions.size() >= 1);

s.header.min_index_interval = BASE_SAMPLING_LEVEL;
s.header.sampling_level = BASE_SAMPLING_LEVEL;

uint64_t max_expected_entries = all_partitions.size() / BASE_SAMPLING_LEVEL + 1;
// FIXME: handle case where max_expected_entries is greater than max value stored by uint32_t.
assert(max_expected_entries <= std::numeric_limits<uint32_t>::max());
s.header.size = max_expected_entries;
assert(s.header.size >= 1);

// memory_size only accounts size of vector positions at this point.
s.header.memory_size = s.header.size * sizeof(uint32_t);
s.header.size_at_full_sampling = s.header.size;

s.positions.reserve(s.header.size);
s.entries.reserve(s.header.size);
s.keys_written = 0;

auto begin = all_partitions.begin();
auto last = --all_partitions.end();

auto first_key = key::from_partition_key(*mt.schema(), begin->first._key);
s.first_key.value = std::move(first_key.get_bytes());

auto last_key = key::from_partition_key(*mt.schema(), last->first._key);
s.last_key.value = std::move(last_key.get_bytes());
}

static void maybe_add_summary_entry(summary& s, bytes_view key, uint64_t offset) {
if ((s.keys_written % s.header.min_index_interval) == 0) {
s.positions.push_back(s.header.memory_size);
s.entries.push_back({ bytes(key.data(), key.size()), offset });
s.header.memory_size += key.size() + sizeof(uint64_t);
}
s.keys_written++;
}

future<> sstable::write_components(const memtable& mt) {
return create_data().then([&mt, this] {
// TODO: Add compression support by having a specialized output stream.
auto w = make_shared<file_writer>(_data_file, 4096);
auto index = make_shared<file_writer>(_index_file, 4096);

prepare_summary(_summary, mt);

// Iterate through CQL partitions, then CQL rows, then CQL columns.
// Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key.
return do_for_each(mt.all_partitions(),
[w, index, &mt] (const std::pair<const dht::decorated_key, mutation_partition>& partition_entry) {
// TODO: Write summary file on-the-fly.
[w, index, &mt, this] (const std::pair<const dht::decorated_key, mutation_partition>& partition_entry) {

return do_with(key::from_partition_key(*mt.schema(), partition_entry.first._key),
[w, index, &partition_entry] (auto& partition_key) {
[w, index, &partition_entry, this] (auto& partition_key) {

// Maybe add summary entry into in-memory representation of summary file.
maybe_add_summary_entry(_summary, bytes_view(partition_key), index->offset());

return do_with(disk_string_view<uint16_t>(), [w, index, &partition_key] (auto& p_key) {
p_key.value = bytes_view(partition_key);
Expand Down Expand Up @@ -1103,6 +1149,8 @@ future<> sstable::write_components(const memtable& mt) {
return w->close().then([w] {});
}).then([index] {
return index->close().then([index] {});
}).then([this] {
return write_summary();
});
});
}
Expand Down
4 changes: 4 additions & 0 deletions sstables/types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ struct summary_la {
disk_string<uint32_t> first_key;
disk_string<uint32_t> last_key;

// Used to determine when a summary entry should be added based on min_index_interval.
// NOTE: keys_written isn't part of on-disk format of summary.
size_t keys_written;

// NOTE4: There is a structure written by Cassandra into the end of the Summary
// file, after the field last_key, that we haven't understand yet, but we know
// that its content isn't related to the summary itself.
Expand Down
104 changes: 90 additions & 14 deletions tests/urchin/sstable_datafile_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira
}
}

static inline future<> remove_files(sstring dir, unsigned long generation) {
return when_all(remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Data)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Index)),
remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Summary))).then([] (auto t) {});
}

SEASTAR_TEST_CASE(datafile_generation_01) {
// Data file with clustering key
//
Expand Down Expand Up @@ -92,8 +98,7 @@ SEASTAR_TEST_CASE(datafile_generation_01) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 1, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 1, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 1);
});
});
}
Expand Down Expand Up @@ -164,8 +169,7 @@ SEASTAR_TEST_CASE(datafile_generation_02) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 2, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 2, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 2);
});
});
}
Expand Down Expand Up @@ -236,8 +240,7 @@ SEASTAR_TEST_CASE(datafile_generation_03) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 3, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 3, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 3);
});
});
}
Expand Down Expand Up @@ -314,8 +317,7 @@ SEASTAR_TEST_CASE(datafile_generation_04) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 4, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 4, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 4);
});
});
}
Expand Down Expand Up @@ -382,8 +384,7 @@ SEASTAR_TEST_CASE(datafile_generation_05) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 5, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 5, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 5);
});
});
}
Expand Down Expand Up @@ -457,8 +458,7 @@ SEASTAR_TEST_CASE(datafile_generation_06) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 6, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 6, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 6);
});
});
}
Expand Down Expand Up @@ -521,8 +521,84 @@ SEASTAR_TEST_CASE(datafile_generation_07) {
BOOST_REQUIRE(size == offset);
});
}).then([] {
return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 7, big, sstable::component_type::Data)),
remove_file(sstable::filename("tests/urchin/sstables", la, 7, big, sstable::component_type::Index))).then([] (auto t) {});
return remove_files("tests/urchin/sstables", 7);
});
});
}

SEASTAR_TEST_CASE(datafile_generation_08) {
// Data file with multiple rows.
// Only summary file is validated in this test case.
//
// Respective CQL table and CQL insert:
// CREATE TABLE test (
// p1 int,
// c1 text,
// r1 int,
// PRIMARY KEY (p1, c1)
// ) WITH compression = {};

auto s = make_lw_shared(schema({}, some_keyspace, some_column_family,
{{"p1", int32_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type));

memtable mt(s);

const column_definition& r1_col = *s->get_column_definition("r1");

// Create 150 partitions so that summary file store 2 entries, assuming min index
// interval is 128.
for (int32_t i = 0; i < 150; i++) {
auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)});
auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")});

mutation m(key, s);
m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(1)));
mt.apply(std::move(m));
}

auto mtp = make_shared<memtable>(std::move(mt));
auto sst = make_lw_shared<sstable>("tests/urchin/sstables", 8, la, big);

return sst->write_components(*mtp).then([mtp, sst, s] {
auto fname = sstable::filename("tests/urchin/sstables", la, 8, big, sstable::component_type::Summary);
return engine().open_file_dma(fname, open_flags::ro).then([] (file f) {
auto bufptr = allocate_aligned_buffer<char>(4096, 4096);

auto fut = f.dma_read(0, bufptr.get(), 4096);
return std::move(fut).then([f = std::move(f), bufptr = std::move(bufptr)] (size_t size) {
auto buf = bufptr.get();
size_t offset = 0;

std::vector<uint8_t> header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2,
/* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80,
/* size_at_full_sampling */ 0, 0, 0, 2 };
BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0);
offset += header.size();

std::vector<uint8_t> positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0);
offset += positions.size();

std::vector<uint8_t> first_entry = { /* key */ 0, 0, 0, 0x17, /* position */ 0, 0, 0, 0, 0, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0);
offset += first_entry.size();

std::vector<uint8_t> second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 };
BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0);
offset += second_entry.size();

std::vector<uint8_t> first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 };
BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0);
offset += first_key.size();

std::vector<uint8_t> last_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x67 };
BOOST_REQUIRE(::memcmp(last_key.data(), &buf[offset], last_key.size()) == 0);
offset += last_key.size();

BOOST_REQUIRE(size == offset);
});
}).then([] {
return remove_files("tests/urchin/sstables", 8);
});
});
}

0 comments on commit 57060b5

Please sign in to comment.