From 57060b5dfe57bfd140365358700cb239a1c16d4d Mon Sep 17 00:00:00 2001 From: "Raphael S. Carvalho" Date: Wed, 20 May 2015 15:17:21 -0300 Subject: [PATCH] sstables: add initial support to generation of summary file Signed-off-by: Raphael S. Carvalho --- sstables/key.hh | 3 + sstables/sstables.cc | 54 ++++++++++++- sstables/types.hh | 4 + tests/urchin/sstable_datafile_test.cc | 104 ++++++++++++++++++++++---- 4 files changed, 148 insertions(+), 17 deletions(-) diff --git a/sstables/key.hh b/sstables/key.hh index a99d6215b69a..9f64175a1ebd 100644 --- a/sstables/key.hh +++ b/sstables/key.hh @@ -60,6 +60,9 @@ public: explicit operator bytes_view() const { return _bytes; } + bytes& get_bytes() { + return _bytes; + } }; bytes composite_from_clustering_key(const schema& s, const clustering_key& ck); diff --git a/sstables/sstables.cc b/sstables/sstables.cc index 62c33901b102..c75e36353ede 100644 --- a/sstables/sstables.cc +++ b/sstables/sstables.cc @@ -1043,20 +1043,66 @@ static future<> write_index_entry(file_writer& out, disk_string_view& return write(out, key, pos, promoted_index_size); } +static constexpr int BASE_SAMPLING_LEVEL = 128; + +static void prepare_summary(summary& s, const memtable& mt) { + auto all_partitions = mt.all_partitions(); + assert(all_partitions.size() >= 1); + + s.header.min_index_interval = BASE_SAMPLING_LEVEL; + s.header.sampling_level = BASE_SAMPLING_LEVEL; + + uint64_t max_expected_entries = all_partitions.size() / BASE_SAMPLING_LEVEL + 1; + // FIXME: handle case where max_expected_entries is greater than max value stored by uint32_t. + assert(max_expected_entries <= std::numeric_limits::max()); + s.header.size = max_expected_entries; + assert(s.header.size >= 1); + + // memory_size only accounts size of vector positions at this point. + s.header.memory_size = s.header.size * sizeof(uint32_t); + s.header.size_at_full_sampling = s.header.size; + + s.positions.reserve(s.header.size); + s.entries.reserve(s.header.size); + s.keys_written = 0; + + auto begin = all_partitions.begin(); + auto last = --all_partitions.end(); + + auto first_key = key::from_partition_key(*mt.schema(), begin->first._key); + s.first_key.value = std::move(first_key.get_bytes()); + + auto last_key = key::from_partition_key(*mt.schema(), last->first._key); + s.last_key.value = std::move(last_key.get_bytes()); +} + +static void maybe_add_summary_entry(summary& s, bytes_view key, uint64_t offset) { + if ((s.keys_written % s.header.min_index_interval) == 0) { + s.positions.push_back(s.header.memory_size); + s.entries.push_back({ bytes(key.data(), key.size()), offset }); + s.header.memory_size += key.size() + sizeof(uint64_t); + } + s.keys_written++; +} + future<> sstable::write_components(const memtable& mt) { return create_data().then([&mt, this] { // TODO: Add compression support by having a specialized output stream. auto w = make_shared(_data_file, 4096); auto index = make_shared(_index_file, 4096); + prepare_summary(_summary, mt); + // Iterate through CQL partitions, then CQL rows, then CQL columns. // Each mt.all_partitions() entry is a set of clustered rows sharing the same partition key. return do_for_each(mt.all_partitions(), - [w, index, &mt] (const std::pair& partition_entry) { - // TODO: Write summary file on-the-fly. + [w, index, &mt, this] (const std::pair& partition_entry) { return do_with(key::from_partition_key(*mt.schema(), partition_entry.first._key), - [w, index, &partition_entry] (auto& partition_key) { + [w, index, &partition_entry, this] (auto& partition_key) { + + // Maybe add summary entry into in-memory representation of summary file. + maybe_add_summary_entry(_summary, bytes_view(partition_key), index->offset()); return do_with(disk_string_view(), [w, index, &partition_key] (auto& p_key) { p_key.value = bytes_view(partition_key); @@ -1103,6 +1149,8 @@ future<> sstable::write_components(const memtable& mt) { return w->close().then([w] {}); }).then([index] { return index->close().then([index] {}); + }).then([this] { + return write_summary(); }); }); } diff --git a/sstables/types.hh b/sstables/types.hh index d59f95b6b755..8952167a3ab9 100644 --- a/sstables/types.hh +++ b/sstables/types.hh @@ -113,6 +113,10 @@ struct summary_la { disk_string first_key; disk_string last_key; + // Used to determine when a summary entry should be added based on min_index_interval. + // NOTE: keys_written isn't part of on-disk format of summary. + size_t keys_written; + // NOTE4: There is a structure written by Cassandra into the end of the Summary // file, after the field last_key, that we haven't understand yet, but we know // that its content isn't related to the summary itself. diff --git a/tests/urchin/sstable_datafile_test.cc b/tests/urchin/sstable_datafile_test.cc index fcdc1583bd67..89035531b0e4 100644 --- a/tests/urchin/sstable_datafile_test.cc +++ b/tests/urchin/sstable_datafile_test.cc @@ -28,6 +28,12 @@ atomic_cell make_atomic_cell(bytes_view value, uint32_t ttl = 0, uint32_t expira } } +static inline future<> remove_files(sstring dir, unsigned long generation) { + return when_all(remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Data)), + remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Index)), + remove_file(sstable::filename(dir, la, generation, big, sstable::component_type::Summary))).then([] (auto t) {}); +} + SEASTAR_TEST_CASE(datafile_generation_01) { // Data file with clustering key // @@ -92,8 +98,7 @@ SEASTAR_TEST_CASE(datafile_generation_01) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 1, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 1, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 1); }); }); } @@ -164,8 +169,7 @@ SEASTAR_TEST_CASE(datafile_generation_02) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 2, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 2, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 2); }); }); } @@ -236,8 +240,7 @@ SEASTAR_TEST_CASE(datafile_generation_03) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 3, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 3, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 3); }); }); } @@ -314,8 +317,7 @@ SEASTAR_TEST_CASE(datafile_generation_04) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 4, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 4, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 4); }); }); } @@ -382,8 +384,7 @@ SEASTAR_TEST_CASE(datafile_generation_05) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 5, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 5, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 5); }); }); } @@ -457,8 +458,7 @@ SEASTAR_TEST_CASE(datafile_generation_06) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 6, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 6, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 6); }); }); } @@ -521,8 +521,84 @@ SEASTAR_TEST_CASE(datafile_generation_07) { BOOST_REQUIRE(size == offset); }); }).then([] { - return when_all(remove_file(sstable::filename("tests/urchin/sstables", la, 7, big, sstable::component_type::Data)), - remove_file(sstable::filename("tests/urchin/sstables", la, 7, big, sstable::component_type::Index))).then([] (auto t) {}); + return remove_files("tests/urchin/sstables", 7); + }); + }); +} + +SEASTAR_TEST_CASE(datafile_generation_08) { + // Data file with multiple rows. + // Only summary file is validated in this test case. + // + // Respective CQL table and CQL insert: + // CREATE TABLE test ( + // p1 int, + // c1 text, + // r1 int, + // PRIMARY KEY (p1, c1) + // ) WITH compression = {}; + + auto s = make_lw_shared(schema({}, some_keyspace, some_column_family, + {{"p1", int32_type}}, {{"c1", utf8_type}}, {{"r1", int32_type}}, {}, utf8_type)); + + memtable mt(s); + + const column_definition& r1_col = *s->get_column_definition("r1"); + + // Create 150 partitions so that summary file store 2 entries, assuming min index + // interval is 128. + for (int32_t i = 0; i < 150; i++) { + auto key = partition_key::from_exploded(*s, {int32_type->decompose(i)}); + auto c_key = clustering_key::from_exploded(*s, {to_bytes("abc")}); + + mutation m(key, s); + m.set_clustered_cell(c_key, r1_col, make_atomic_cell(int32_type->decompose(1))); + mt.apply(std::move(m)); + } + + auto mtp = make_shared(std::move(mt)); + auto sst = make_lw_shared("tests/urchin/sstables", 8, la, big); + + return sst->write_components(*mtp).then([mtp, sst, s] { + auto fname = sstable::filename("tests/urchin/sstables", la, 8, big, sstable::component_type::Summary); + return engine().open_file_dma(fname, open_flags::ro).then([] (file f) { + auto bufptr = allocate_aligned_buffer(4096, 4096); + + auto fut = f.dma_read(0, bufptr.get(), 4096); + return std::move(fut).then([f = std::move(f), bufptr = std::move(bufptr)] (size_t size) { + auto buf = bufptr.get(); + size_t offset = 0; + + std::vector header = { /* min_index_interval */ 0, 0, 0, 0x80, /* size */ 0, 0, 0, 2, + /* memory_size */ 0, 0, 0, 0, 0, 0, 0, 0x20, /* sampling_level */ 0, 0, 0, 0x80, + /* size_at_full_sampling */ 0, 0, 0, 2 }; + BOOST_REQUIRE(::memcmp(header.data(), &buf[offset], header.size()) == 0); + offset += header.size(); + + std::vector positions = { 0x8, 0, 0, 0, 0x14, 0, 0, 0 }; + BOOST_REQUIRE(::memcmp(positions.data(), &buf[offset], positions.size()) == 0); + offset += positions.size(); + + std::vector first_entry = { /* key */ 0, 0, 0, 0x17, /* position */ 0, 0, 0, 0, 0, 0, 0, 0 }; + BOOST_REQUIRE(::memcmp(first_entry.data(), &buf[offset], first_entry.size()) == 0); + offset += first_entry.size(); + + std::vector second_entry = { /* key */ 0, 0, 0, 0x65, /* position */ 0, 0x9, 0, 0, 0, 0, 0, 0 }; + BOOST_REQUIRE(::memcmp(second_entry.data(), &buf[offset], second_entry.size()) == 0); + offset += second_entry.size(); + + std::vector first_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x17 }; + BOOST_REQUIRE(::memcmp(first_key.data(), &buf[offset], first_key.size()) == 0); + offset += first_key.size(); + + std::vector last_key = { 0, 0, 0, 0x4, 0, 0, 0, 0x67 }; + BOOST_REQUIRE(::memcmp(last_key.data(), &buf[offset], last_key.size()) == 0); + offset += last_key.size(); + + BOOST_REQUIRE(size == offset); + }); + }).then([] { + return remove_files("tests/urchin/sstables", 8); }); }); }