Skip to content

Commit

Permalink
avoid unnecessary fsync in ps (#7616)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
lidezhu authored Jun 9, 2023
1 parent 5e429c1 commit 5b90c7e
Show file tree
Hide file tree
Showing 14 changed files with 408 additions and 75 deletions.
1 change: 1 addition & 0 deletions dbms/src/Common/CurrentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
M(PSMVCCSnapshotsList) \
M(PSMVCCNumDelta) \
M(PSMVCCNumBase) \
M(PSPendingWriterNum) \
M(RWLockWaitingReaders) \
M(RWLockWaitingWriters) \
M(RWLockActiveReaders) \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,9 @@ namespace DB
F(type_fullgc_commit, {{"type", "fullgc_commit"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_clean_external, {{"type", "clean_external"}}, ExpBuckets{0.0005, 2, 20}), \
F(type_v3, {{"type", "v3"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_storage_page_command_count, "Total number of PageStorage's command, such as write / read / scan / snapshot", Counter, \
F(type_write, {"type", "write"}), F(type_read, {"type", "read"}), \
F(type_scan, {"type", "scan"}), F(type_snapshot, {"type", "snapshot"})) \
M(tiflash_storage_page_write_batch_size, "The size of each write batch in bytes", Histogram, \
F(type_v3, {{"type", "v3"}}, ExpBuckets{4 * 1024, 4, 10})) \
M(tiflash_storage_page_write_duration_seconds, "The duration of each write batch", Histogram, \
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ template <typename Trait>
typename BlobStore<Trait>::PageMap
BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
if (entries.empty())
{
return {};
Expand Down Expand Up @@ -985,6 +986,7 @@ BlobStore<Trait>::read(PageIdAndEntries & entries, const ReadLimiterPtr & read_l
template <typename Trait>
Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPtr & read_limiter)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
const auto & [page_id_v3, entry] = id_entry;
const size_t buf_size = entry.size;

Expand Down Expand Up @@ -1044,6 +1046,7 @@ Page BlobStore<Trait>::read(const PageIdAndEntry & id_entry, const ReadLimiterPt
template <typename Trait>
BlobFilePtr BlobStore<Trait>::read(const typename BlobStore<Trait>::PageId & page_id_v3, BlobFileId blob_id, BlobFileOffset offset, char * buffers, size_t size, const ReadLimiterPtr & read_limiter, bool background)
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
assert(buffers != nullptr);
BlobFilePtr blob_file = getBlobFile(blob_id);
try
Expand Down
57 changes: 33 additions & 24 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,33 +161,42 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(

// 2. For entry edits without the checkpoint info, or it is stored on an existing data file that needs compact,
// write the entry data to the data file, and assign a new checkpoint info.
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
try
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
auto page = data_source->read({rec_edit.page_id, rec_edit.entry});
RUNTIME_CHECK_MSG(page.isValid(), "failed to read page, record={}", rec_edit);
auto data_location = data_writer->write(
rec_edit.page_id,
rec_edit.version,
page.data.begin(),
page.data.size());
// the page data size uploaded in this checkpoint
write_down_stats.num_bytes[static_cast<size_t>(id_storage_type)] += rec_edit.entry.size;
current_write_size += data_location.size_in_file;
RUNTIME_CHECK(page.data.size() == rec_edit.entry.size, page.data.size(), rec_edit.entry.size);
bool is_local_data_reclaimed = rec_edit.entry.checkpoint_info.has_value() && rec_edit.entry.checkpoint_info.is_local_data_reclaimed;
rec_edit.entry.checkpoint_info = OptionalCheckpointInfo{
.data_location = data_location,
.is_valid = true,
.is_local_data_reclaimed = is_local_data_reclaimed,
};
locked_files.emplace(*data_location.data_file_id);
if (is_compaction)
{
write_down_stats.compact_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_compact += 1;
}
else
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
}
}
else
catch (...)
{
write_down_stats.incremental_data_bytes += rec_edit.entry.size;
write_down_stats.num_pages_incremental += 1;
LOG_ERROR(log, "failed to read page, record={}", rec_edit);
tryLogCurrentException(__PRETTY_FUNCTION__);
throw;
}
}

Expand Down
4 changes: 0 additions & 4 deletions dbms/src/Storages/Page/V3/LogFile/LogReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
#include <Storages/Page/V3/WALStore.h>
#include <common/types.h>

namespace Poco
{
class Logger;
}
namespace DB
{
class ReadBuffer;
Expand Down
50 changes: 28 additions & 22 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ LogWriter::LogWriter(
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_)
bool manual_sync_)
: path(path_)
, file_provider(file_provider_)
, block_offset(0)
, log_number(log_number_)
, recycle_log_files(recycle_log_files_)
, manual_flush(manual_flush_)
, manual_sync(manual_sync_)
, write_buffer(nullptr, 0)
{
log_file = file_provider->newWritableFile(
Expand Down Expand Up @@ -69,26 +69,9 @@ size_t LogWriter::writtenBytes() const
return written_bytes;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
void LogWriter::sync()
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);
log_file->fsync();
written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}

void LogWriter::close()
Expand Down Expand Up @@ -158,9 +141,10 @@ void LogWriter::addRecord(ReadBuffer & payload, const size_t payload_size, const
begin = false;
} while (payload.hasPendingData());

if (!manual_flush)
flush(write_limiter, background);
if (!manual_sync)
{
flush(write_limiter, background);
sync();
}
}

Expand Down Expand Up @@ -216,4 +200,26 @@ void LogWriter::emitPhysicalRecord(Format::RecordType type, ReadBuffer & payload

block_offset += header_size + length;
}

void LogWriter::flush(const WriteLimiterPtr & write_limiter, bool background)
{
if (write_buffer.offset() == 0)
{
return;
}

PageUtil::writeFile(log_file,
written_bytes,
write_buffer.buffer().begin(),
write_buffer.offset(),
write_limiter,
/*background=*/background,
/*truncate_if_failed=*/false,
/*enable_failpoint=*/false);

written_bytes += write_buffer.offset();

// reset the write_buffer
resetBuffer();
}
} // namespace DB::PS::V3
11 changes: 6 additions & 5 deletions dbms/src/Storages/Page/V3/LogFile/LogWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ class LogWriter final : private Allocator<false>
const FileProviderPtr & file_provider_,
Format::LogNumberType log_number_,
bool recycle_log_files_,
bool manual_flush_ = false);
bool manual_sync_ = false);

DISALLOW_COPY(LogWriter);

~LogWriter();

void addRecord(ReadBuffer & payload, size_t payload_size, const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);
void sync();

void close();

Expand All @@ -103,6 +103,8 @@ class LogWriter final : private Allocator<false>

void resetBuffer();

void flush(const WriteLimiterPtr & write_limiter = nullptr, bool background = false);

private:
String path;
FileProviderPtr file_provider;
Expand All @@ -112,9 +114,8 @@ class LogWriter final : private Allocator<false>
size_t block_offset; // Current offset in block
Format::LogNumberType log_number;
const bool recycle_log_files;
// If true, it does not flush after each write. Instead it relies on the upper
// layer to manually does the flush by calling ::flush()
const bool manual_flush;
// If true, the upper layer need manually sync the log file after write by calling LogWriter::sync()
const bool manual_sync;

size_t written_bytes = 0;

Expand Down
13 changes: 11 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
namespace CurrentMetrics
{
extern const Metric PSMVCCSnapshotsList;
extern const Metric PSPendingWriterNum;
} // namespace CurrentMetrics

namespace DB
Expand Down Expand Up @@ -964,6 +965,7 @@ PageDirectory<Trait>::PageDirectory(String storage_name, WALStorePtr && wal_, UI
template <typename Trait>
PageDirectorySnapshotPtr PageDirectory<Trait>::createSnapshot(const String & tracing_id) const
{
GET_METRIC(tiflash_storage_page_command_count, type_snapshot).Increment();
auto snap = std::make_shared<PageDirectorySnapshot>(sequence.load(), tracing_id);
{
std::lock_guard snapshots_lock(snapshots_mutex);
Expand Down Expand Up @@ -1018,6 +1020,7 @@ SnapshotsStatistics PageDirectory<Trait>::getSnapshotsStat() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdAndEntry PageDirectory<Trait>::getByIDImpl(const PageId & page_id, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;

// After two write batches applied: [ver=1]{put 10}, [ver=2]{ref 11->10, del 10}, the `mvcc_table_directory` is:
Expand Down Expand Up @@ -1112,6 +1115,7 @@ template <typename Trait>
std::pair<typename PageDirectory<Trait>::PageIdAndEntries, typename PageDirectory<Trait>::PageIds>
PageDirectory<Trait>::getByIDsImpl(const typename PageDirectory<Trait>::PageIds & page_ids, const PageDirectorySnapshotPtr & snap, bool throw_on_not_exist) const
{
GET_METRIC(tiflash_storage_page_command_count, type_read).Increment();
PageEntryV3 entry_got;
PageIds page_not_found = {};

Expand Down Expand Up @@ -1257,6 +1261,7 @@ UInt64 PageDirectory<Trait>::getMaxIdAfterRestart() const
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIds()
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
std::set<PageId> page_ids;

std::shared_lock read_lock(table_rw_mutex);
Expand All @@ -1273,6 +1278,7 @@ typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIds()
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsWithPrefix(const String & prefix, const DB::PageStorageSnapshotPtr & snap_)
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
PageIdSet page_ids;
Expand All @@ -1299,6 +1305,7 @@ typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsWith
template <typename Trait>
typename PageDirectory<Trait>::PageIdSet PageDirectory<Trait>::getAllPageIdsInRange(const PageId & start, const PageId & end, const DB::PageStorageSnapshotPtr & snap_)
{
GET_METRIC(tiflash_storage_page_command_count, type_scan).Increment();
if constexpr (std::is_same_v<Trait, universal::PageDirectoryTrait>)
{
PageIdSet page_ids;
Expand Down Expand Up @@ -1473,7 +1480,8 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
// We need to make sure there is only one apply thread to write wal and then increase `sequence`.
// Note that, as read threads use current `sequence` as read_seq, we cannot increase `sequence`
// before applying edit to `mvcc_table_directory`.

GET_METRIC(tiflash_storage_page_command_count, type_write).Increment();
CurrentMetrics::Increment pending_writer_size{CurrentMetrics::PSPendingWriterNum};
Writer w;
w.edit = &edit;

Expand All @@ -1484,6 +1492,7 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
watch.restart();

writers.push_back(&w);
SYNC_FOR("after_PageDirectory::enter_write_group");
w.cv.wait(apply_lock, [&] { return w.done || &w == writers.front(); });
GET_METRIC(tiflash_storage_page_write_duration_seconds, type_wait_in_group).Observe(watch.elapsedSeconds());
watch.restart();
Expand All @@ -1504,9 +1513,9 @@ std::unordered_set<String> PageDirectory<Trait>::apply(PageEntriesEdit && edit,
// group owner, others just return an empty set.
return {};
}

auto * last_writer = buildWriteGroup(&w, apply_lock);
apply_lock.unlock();
SYNC_FOR("before_PageDirectory::leader_apply");

// `true` means the write process has completed without exception
bool success = false;
Expand Down
10 changes: 9 additions & 1 deletion dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,13 @@ class PageDirectory
return u;
}

// `writers` should be used under the protection of apply_mutex
// So don't use this function in production code
size_t getWritersQueueSizeForTest()
{
return writers.size();
}

// No copying and no moving
DISALLOW_COPY_AND_MOVE(PageDirectory);

Expand Down Expand Up @@ -455,7 +462,8 @@ class PageDirectory
std::condition_variable cv;
};

// return the last writer in the group
// Return the last writer in the group
// All the edit in the write group will be merged into `first->edit`.
Writer * buildWriteGroup(Writer * first, std::unique_lock<std::mutex> & /*lock*/);

private:
Expand Down
Loading

0 comments on commit 5b90c7e

Please sign in to comment.