Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 50 additions & 28 deletions category/async/storage_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@

#include <stdlib.h>

#include <quill/Quill.h>

#include <asm-generic/ioctl.h>
#include <fcntl.h>
#include <linux/falloc.h>
Expand Down Expand Up @@ -80,6 +82,12 @@ size_t storage_pool::device_t::chunks() const
return metadata_->chunks(size_of_file_);
}

size_t storage_pool::device_t::cnv_chunks() const
{
MONAD_ASSERT(!is_zoned_device(), "zonefs support isn't implemented yet");
return metadata_->num_cnv_chunks;
}

std::pair<file_offset_t, file_offset_t> storage_pool::device_t::capacity() const
{
switch (type_) {
Expand Down Expand Up @@ -454,6 +462,7 @@ storage_pool::device_t storage_pool::make_device_(
memcpy(metadata_footer->magic, "MND0", 4);
metadata_footer->chunk_capacity =
static_cast<uint32_t>(chunk_capacity);
metadata_footer->num_cnv_chunks = flags.num_cnv_chunks;
MONAD_ASSERT_PRINTF(
::pwrite(
readwritefd,
Expand All @@ -465,6 +474,17 @@ storage_pool::device_t storage_pool::make_device_(
}
total_size =
metadata_footer->total_size(static_cast<size_t>(stat.st_size));
if (flags.num_cnv_chunks > metadata_footer->num_cnv_chunks) {
LOG_WARNING(
"Flag-specified num_cnv_chunks ({}) is greater than the value "
"stored in metadata ({}). This setting will be ignored. "
"Existing databases cannot be reconfigured to use more chunks, "
"create a new database if you need a higher num_cnv_chunks.",
flags.num_cnv_chunks,
metadata_footer->num_cnv_chunks == 0
? 3
: metadata_footer->num_cnv_chunks);
}
}
size_t const offset = round_down_align<CPU_PAGE_BITS>(
static_cast<size_t>(stat.st_size) - total_size);
Expand Down Expand Up @@ -504,21 +524,29 @@ void storage_pool::fill_chunks_(creation_flags const &flags)
fnv1a_hash<uint32_t>::add(
hashshouldbe, uint32_t(device.unique_hash_ >> 32));
}
// Backward compatibility: databases created before `num_cnv_chunks` was
// added have this field set to 0. Treat 0 as the legacy default of 3
// chunks.
uint32_t const cnv_chunks_count =
devices_[0].metadata_->num_cnv_chunks == 0
? 3
: devices_[0].metadata_->num_cnv_chunks;
std::vector<size_t> chunks;
size_t total = 0;
chunks.reserve(devices_.size());
for (auto const &device : devices_) {
if (device.is_file() || device.is_block_device()) {
auto const devicechunks = device.chunks();
MONAD_ASSERT_PRINTF(
devicechunks >= 4,
"Device %s has %zu chunks the minimum allowed is four.",
devicechunks >= cnv_chunks_count + 1,
"Device %s has %zu chunks the minimum allowed is %u.",
device.current_path().c_str(),
devicechunks);
devicechunks,
cnv_chunks_count + 1);
MONAD_ASSERT(devicechunks <= std::numeric_limits<uint32_t>::max());
// Take off three for the cnv chunks
chunks.push_back(devicechunks - 3);
total += devicechunks - 3;
// Take off cnv_chunks_count for the cnv chunks
chunks.push_back(devicechunks - cnv_chunks_count);
total += devicechunks - cnv_chunks_count;
fnv1a_hash<uint32_t>::add(
hashshouldbe, static_cast<uint32_t>(devicechunks));
fnv1a_hash<uint32_t>::add(
Expand Down Expand Up @@ -560,22 +588,17 @@ void storage_pool::fill_chunks_(creation_flags const &flags)
auto const zone_id = [this](int const chunk_type) {
return static_cast<uint32_t>(chunks_[chunk_type].size());
};
// First three blocks of each device goes to conventional, remainder go to
// sequential
chunks_[cnv].reserve(devices_.size() * 3);
// First cnv_chunks_count blocks of each device goes to conventional,
// remainder go to sequential
chunks_[cnv].reserve(devices_.size() * cnv_chunks_count);
chunks_[seq].reserve(total);
if (flags.interleave_chunks_evenly) {
for (auto &device : devices_) {
chunks_[cnv].emplace_back(
activate_chunk(storage_pool::cnv, device, 0, zone_id(cnv)));
}
for (auto &device : devices_) {
chunks_[cnv].emplace_back(
activate_chunk(storage_pool::cnv, device, 1, zone_id(cnv)));
}
for (auto &device : devices_) {
chunks_[cnv].emplace_back(
activate_chunk(storage_pool::cnv, device, 2, zone_id(cnv)));
for (uint32_t chunk_idx = 0; chunk_idx < cnv_chunks_count;
++chunk_idx) {
for (auto &device : devices_) {
chunks_[cnv].emplace_back(activate_chunk(
storage_pool::cnv, device, chunk_idx, zone_id(cnv)));
}
}
// We now need to evenly spread the sequential chunks such that if
// device A has 20, device B has 10 and device C has 5, the interleaving
Expand All @@ -585,7 +608,7 @@ void storage_pool::fill_chunks_(creation_flags const &flags)
for (size_t n = 0; n < chunks.size(); n++) {
chunkratios[n] = double(total) / static_cast<double>(chunks[n]);
chunkcounts[n] = chunkratios[n];
chunks[n] = 3;
chunks[n] = cnv_chunks_count;
}
while (chunks_[seq].size() < chunks_[seq].capacity()) {
for (size_t n = 0; n < chunks.size(); n++) {
Expand All @@ -612,19 +635,18 @@ void storage_pool::fill_chunks_(creation_flags const &flags)
}
else {
for (auto &device : devices_) {
chunks_[cnv].emplace_back(
activate_chunk(cnv, device, 0, zone_id(cnv)));
chunks_[cnv].emplace_back(
activate_chunk(cnv, device, 1, zone_id(cnv)));
chunks_[cnv].emplace_back(
activate_chunk(cnv, device, 2, zone_id(cnv)));
for (uint32_t chunk_idx = 0; chunk_idx < cnv_chunks_count;
++chunk_idx) {
chunks_[cnv].emplace_back(
activate_chunk(cnv, device, chunk_idx, zone_id(cnv)));
}
}
for (size_t deviceidx = 0; deviceidx < chunks.size(); deviceidx++) {
for (size_t n = 0; n < chunks[deviceidx]; n++) {
chunks_[seq].emplace_back(activate_chunk(
seq,
devices_[deviceidx],
static_cast<uint32_t>(3 + n),
static_cast<uint32_t>(cnv_chunks_count + n),
zone_id(seq)));
}
}
Expand Down
10 changes: 9 additions & 1 deletion category/async/storage_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ class storage_pool
{
// Preceding this is an array of uint32_t of chunk bytes used

uint32_t spare_[13]; // set aside for flags later
uint32_t spare_[12]; // set aside for flags later
uint32_t num_cnv_chunks; // number of cnv chunks per device
uint32_t config_hash; // hash of this configuration
uint32_t chunk_capacity;
uint8_t magic[4]; // "MND0" for v1 of this metadata
Expand Down Expand Up @@ -178,6 +179,9 @@ class storage_pool

//! Returns the number of chunks on this device
size_t chunks() const;

//! Returns the number of cnv chunks on this device
size_t cnv_chunks() const;
//! Returns the capacity of the device, and how much of that is
//! currently filled with data, in that order.
std::pair<file_offset_t, file_offset_t> capacity() const;
Expand Down Expand Up @@ -321,12 +325,16 @@ class storage_pool
//! happily use any partition you feed it, including the system drive.
uint32_t disable_mismatching_storage_pool_check : 1;

//! Number of conventional chunks to allocate per device. Default is 3.
uint32_t num_cnv_chunks;

constexpr creation_flags()
: chunk_capacity(28)
, interleave_chunks_evenly(false)
, open_read_only(false)
, open_read_only_allow_dirty(false)
, disable_mismatching_storage_pool_check(false)
, num_cnv_chunks(3)
{
}
};
Expand Down
28 changes: 24 additions & 4 deletions category/mpt/cli_tool_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ struct impl_t
std::ostream &cerr;
MONAD_ASYNC_NAMESPACE::storage_pool::creation_flags flags;
uint8_t chunk_capacity = flags.chunk_capacity;
uint32_t root_offsets_chunk_count = 16;
bool allow_dirty = false;
bool no_prompt = false;
bool create_database = false;
Expand Down Expand Up @@ -728,7 +729,7 @@ struct impl_t
MONAD_ASYNC_NAMESPACE::AsyncIO::
MONAD_IO_BUFFERS_WRITE_SIZE);
auto io = MONAD_ASYNC_NAMESPACE::AsyncIO{*pool, rwbuf};
MONAD_MPT_NAMESPACE::UpdateAux<> aux(&io);
MONAD_MPT_NAMESPACE::UpdateAux<> aux(io);
for (;;) {
auto const *item = aux.db_metadata()->fast_list_begin();
if (item == nullptr) {
Expand Down Expand Up @@ -971,7 +972,7 @@ struct impl_t
2,
MONAD_ASYNC_NAMESPACE::AsyncIO::MONAD_IO_BUFFERS_READ_SIZE);
auto io = MONAD_ASYNC_NAMESPACE::AsyncIO{*pool, rwbuf};
MONAD_MPT_NAMESPACE::UpdateAux<> aux(&io);
MONAD_MPT_NAMESPACE::UpdateAux<> aux(io);
size_t slow_chunks_inserted = 0;
size_t fast_chunks_inserted = 0;
auto override_insertion_count =
Expand Down Expand Up @@ -1445,6 +1446,22 @@ opened.
"set chunk capacity during database creation (default is 28, "
"1<<28 "
"= 256Mb, max is 31).");
cli.add_option(
"--root-offsets-chunk-count",
impl.root_offsets_chunk_count,
"Number of chunks to allocate for storing root offsets. "
"Must be a positive number that is a power of 2. Default is "
"16. Each chunk holds approx 16.5M history entries.")
->check([](std::string const &s) {
auto const v = std::stoll(s);
if (v <= 0) {
return "Value must be positive";
}
if ((v & (v - 1)) != 0) {
return "Value must be a power of 2";
}
return "";
});
cli.add_flag(
"--chunk-increasing",
impl.create_chunk_increasing,
Expand Down Expand Up @@ -1478,6 +1495,9 @@ opened.
impl.flags.open_read_only = true;
impl.flags.open_read_only_allow_dirty =
impl.allow_dirty || !impl.archive_database.empty();
impl.flags.num_cnv_chunks =
impl.root_offsets_chunk_count +
monad::mpt::UpdateAuxImpl::cnv_chunks_for_db_metadata;
if (!impl.restore_database.empty()) {
if (!impl.archive_database.empty()) {
impl.cli_ask_question(
Expand Down Expand Up @@ -1558,7 +1578,7 @@ opened.
MONAD_ASYNC_NAMESPACE::AsyncIO::
MONAD_IO_BUFFERS_READ_SIZE);
auto io = MONAD_ASYNC_NAMESPACE::AsyncIO{*impl.pool, rwbuf};
MONAD_MPT_NAMESPACE::UpdateAux<> aux(&io);
MONAD_MPT_NAMESPACE::UpdateAux<> aux(io);

{
cout << R"(MPT database on storages:
Expand Down Expand Up @@ -1604,7 +1624,7 @@ opened.
impl.cli_ask_question(ss.str().c_str());
}
aux.unset_io();
aux.set_io(&io, impl.reset_history_length);
aux.set_io(io, impl.reset_history_length);
cout << "Success! Done resetting history to "
<< impl.reset_history_length.value() << ".\n";
impl.print_db_history_summary(aux);
Expand Down
15 changes: 9 additions & 6 deletions category/mpt/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ AsyncIOContext::AsyncIOContext(ReadOnlyOnDiskDbConfig const &options)

AsyncIOContext::AsyncIOContext(OnDiskDbConfig const &options)
: pool{[&] -> async::storage_pool {
async::storage_pool::creation_flags pool_options;
pool_options.num_cnv_chunks = options.root_offsets_chunk_count + 1;
auto len = options.file_size_db * 1024 * 1024 * 1024 + 24576;
if (options.dbname_paths.empty()) {
return async::storage_pool{
async::use_anonymous_sized_inode_tag{}, len};
async::use_anonymous_sized_inode_tag{}, len, pool_options};
}
// initialize db file on disk
for (auto const &dbname_path : options.dbname_paths) {
Expand All @@ -145,7 +147,8 @@ AsyncIOContext::AsyncIOContext(OnDiskDbConfig const &options)
return async::storage_pool{
options.dbname_paths,
options.append ? async::storage_pool::mode::open_existing
: async::storage_pool::mode::truncate};
: async::storage_pool::mode::truncate,
pool_options};
}()}
, read_ring{{options.uring_entries, options.sq_thread_cpu}}
, write_ring{io::RingConfig{options.wr_buffers}}
Expand All @@ -166,7 +169,7 @@ class Db::ROOnDiskBlocking final : public Db::Impl

public:
explicit ROOnDiskBlocking(AsyncIOContext &io_ctx)
: aux_(&io_ctx.io)
: aux_(io_ctx.io)
{
}

Expand Down Expand Up @@ -257,7 +260,7 @@ class Db::InMemory final : public Db::Impl

public:
explicit InMemory(StateMachine &machine)
: aux_{nullptr}
: aux_{}
, machine_{machine}
{
}
Expand Down Expand Up @@ -402,7 +405,7 @@ struct OnDiskWithWorkerThreadImpl
ReadOnlyOnDiskDbConfig const &options)
: parent(parent)
, async_io(options)
, aux(&async_io.io)
, aux(async_io.io)
{
}

Expand All @@ -411,7 +414,7 @@ struct OnDiskWithWorkerThreadImpl
OnDiskDbConfig const &options)
: parent(parent)
, async_io(options)
, aux{&async_io.io, options.fixed_history_length}
, aux{async_io.io, options.fixed_history_length}
{
if (options.rewind_to_latest_finalized) {
auto const latest_block_id = aux.get_latest_finalized_version();
Expand Down
16 changes: 7 additions & 9 deletions category/mpt/find_notify_fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ namespace
{
static constexpr bool lifetime_managed_internally = true;

UpdateAuxImpl *aux;
inflight_map_t &inflights;
Node::SharedPtr parent;
chunk_offset_t rd_offset; // required for sender
Expand All @@ -62,10 +61,9 @@ namespace
unsigned const branch_index;

find_receiver(
UpdateAuxImpl &aux, inflight_map_t &inflights,
Node::SharedPtr parent_, unsigned char const branch)
: aux(&aux)
, inflights(inflights)
inflight_map_t &inflights, Node::SharedPtr parent_,
unsigned char const branch)
: inflights(inflights)
, parent(std::move(parent_))
, rd_offset(0, 0)
, branch_index(parent->to_child_index(branch))
Expand Down Expand Up @@ -112,7 +110,7 @@ namespace
{
static constexpr bool lifetime_managed_internally = true;

UpdateAuxImpl *aux;
UpdateAuxImpl &aux;
NodeCache &node_cache;
inflight_map_owning_t &inflights;
chunk_offset_t offset;
Expand All @@ -125,7 +123,7 @@ namespace
UpdateAuxImpl &aux, NodeCache &node_cache,
inflight_map_owning_t &inflights, chunk_offset_t const offset,
virtual_chunk_offset_t const virtual_offset)
: aux(&aux)
: aux(aux)
, node_cache(node_cache)
, inflights(inflights)
, offset(offset)
Expand Down Expand Up @@ -154,7 +152,7 @@ namespace
CacheNodeCursor start_cursor{};
// verify the offset it read is still valid and has not been reused
// to write new data.
auto const virtual_offset_after = aux->physical_to_virtual(offset);
auto const virtual_offset_after = aux.physical_to_virtual(offset);
if (virtual_offset_after == virtual_offset) {
{
NodeCache::ConstAccessor acc;
Expand Down Expand Up @@ -270,7 +268,7 @@ void find_notify_fiber_future(
return;
}
inflights[offset].emplace_back(cont);
find_receiver receiver(aux, inflights, std::move(node), branch);
find_receiver receiver(inflights, std::move(node), branch);
detail::initiate_async_read_update(
*aux.io, std::move(receiver), receiver.bytes_to_read);
}
Expand Down
4 changes: 4 additions & 0 deletions category/mpt/ondisk_db_config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ struct OnDiskDbConfig
// fixed history length if contains value, otherwise rely on db to adjust
// history length upon disk usage
std::optional<uint64_t> fixed_history_length{std::nullopt};
// Number of chunks to allocate for root offsets when initializing the disk.
// Each chunk can hold 1 << 24 = 16777216 historical entries.
// This field must be power of 2.
uint32_t root_offsets_chunk_count{2};
};

struct ReadOnlyOnDiskDbConfig
Expand Down
Loading
Loading