Skip to content

Commit

Permalink
PageStorage: Make deserialize out of WALReader (#6070)
Browse files Browse the repository at this point in the history
ref #6071
  • Loading branch information
JaySon-Huang authored Oct 4, 2022
1 parent f6528f2 commit 2811c02
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 62 deletions.
38 changes: 38 additions & 0 deletions dbms/src/Storages/Page/ExternalPageCallbacks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/Page/PageDefines.h>

#include <vector>

namespace DB
{

struct ExternalPageCallbacks
{
// `scanner` for scanning available external page ids on disks.
// `remover` will be called with living normal page ids after gc run a round, user should remove those
// external pages(files) in `pending_external_pages` but not in `valid_normal_pages`
using PathAndIdsVec = std::vector<std::pair<String, std::set<PageId>>>;
using ExternalPagesScanner = std::function<PathAndIdsVec()>;
using ExternalPagesRemover
= std::function<void(const PathAndIdsVec & pending_external_pages, const std::set<PageId> & valid_normal_pages)>;
ExternalPagesScanner scanner = nullptr;
ExternalPagesRemover remover = nullptr;
NamespaceId ns_id = MAX_NAMESPACE_ID;
};

} // namespace DB
15 changes: 1 addition & 14 deletions dbms/src/Storages/Page/PageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <Interpreters/SettingsCommon.h>
#include <Storages/FormatVersion.h>
#include <Storages/Page/Config.h>
#include <Storages/Page/ExternalPageCallbacks.h>
#include <Storages/Page/FileUsage.h>
#include <Storages/Page/Page.h>
#include <Storages/Page/PageDefines.h>
Expand Down Expand Up @@ -66,20 +67,6 @@ enum class PageStorageRunMode : UInt8
MIX_MODE = 3,
};

struct ExternalPageCallbacks
{
// `scanner` for scanning available external page ids on disks.
// `remover` will be called with living normal page ids after gc run a round, user should remove those
// external pages(files) in `pending_external_pages` but not in `valid_normal_pages`
using PathAndIdsVec = std::vector<std::pair<String, std::set<PageId>>>;
using ExternalPagesScanner = std::function<PathAndIdsVec()>;
using ExternalPagesRemover
= std::function<void(const PathAndIdsVec & pending_external_pages, const std::set<PageId> & valid_normal_pages)>;
ExternalPagesScanner scanner = nullptr;
ExternalPagesRemover remover = nullptr;
NamespaceId ns_id = MAX_NAMESPACE_ID;
};

/**
* A storage system stored pages. Pages are serialized objects referenced by PageID. Store Page with the same PageID
* will cover the old ones.
Expand Down
36 changes: 18 additions & 18 deletions dbms/src/Storages/Page/V3/PageDirectory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ std::shared_ptr<PageIdV3Internal> VersionedPageEntries::fromRestored(const PageE
}
}

std::tuple<VersionedPageEntries::ResolveResult, PageIdV3Internal, PageVersion>
std::tuple<ResolveResult, PageIdV3Internal, PageVersion>
VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry)
{
auto page_lock = acquireLock();
Expand All @@ -306,7 +306,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV
if (!ignore_delete && iter->second.isDelete())
{
// the page is not visible
return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)};
return {ResolveResult::FAIL, buildV3Id(0, 0), PageVersion(0)};
}

// If `ignore_delete` is true, we need the page entry even if it is logical deleted.
Expand All @@ -323,7 +323,7 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV
// copy and return the entry
if (entry != nullptr)
*entry = iter->second.entry;
return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)};
return {ResolveResult::TO_NORMAL, buildV3Id(0, 0), PageVersion(0)};
}
// else fallthrough to FAIL
} // else fallthrough to FAIL
Expand All @@ -335,23 +335,23 @@ VersionedPageEntries::resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV
bool ok = ignore_delete || (!is_deleted || seq < delete_ver.sequence);
if (create_ver.sequence <= seq && ok)
{
return {RESOLVE_TO_NORMAL, buildV3Id(0, 0), PageVersion(0)};
return {ResolveResult::TO_NORMAL, buildV3Id(0, 0), PageVersion(0)};
}
}
else if (type == EditRecordType::VAR_REF)
{
// Return the origin page id if this ref is visible by `seq`.
if (create_ver.sequence <= seq && (!is_deleted || seq < delete_ver.sequence))
{
return {RESOLVE_TO_REF, ori_page_id, create_ver};
return {ResolveResult::TO_REF, ori_page_id, create_ver};
}
}
else
{
LOG_WARNING(&Poco::Logger::get("VersionedPageEntries"), "Can't resolve the EditRecordType {}", type);
}

return {RESOLVE_FAIL, buildV3Id(0, 0), PageVersion(0)};
return {ResolveResult::FAIL, buildV3Id(0, 0), PageVersion(0)};
}

std::optional<PageEntryV3> VersionedPageEntries::getEntry(UInt64 seq) const
Expand Down Expand Up @@ -852,12 +852,12 @@ PageIDAndEntryV3 PageDirectory::getByIDImpl(PageIdV3Internal page_id, const Page
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, &entry_got);
switch (resolve_state)
{
case VersionedPageEntries::RESOLVE_TO_NORMAL:
case ResolveResult::TO_NORMAL:
return PageIDAndEntryV3(page_id, entry_got);
case VersionedPageEntries::RESOLVE_FAIL:
case ResolveResult::FAIL:
ok = false;
break;
case VersionedPageEntries::RESOLVE_TO_REF:
case ResolveResult::TO_REF:
if (id_to_resolve == next_id_to_resolve)
{
ok = false;
Expand Down Expand Up @@ -912,12 +912,12 @@ std::pair<PageIDAndEntriesV3, PageIds> PageDirectory::getByIDsImpl(const PageIdV
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, &entry_got);
switch (resolve_state)
{
case VersionedPageEntries::RESOLVE_TO_NORMAL:
case ResolveResult::TO_NORMAL:
return true;
case VersionedPageEntries::RESOLVE_FAIL:
case ResolveResult::FAIL:
ok = false;
break;
case VersionedPageEntries::RESOLVE_TO_REF:
case ResolveResult::TO_REF:
if (id_to_resolve == next_id_to_resolve)
{
ok = false;
Expand Down Expand Up @@ -982,13 +982,13 @@ PageIdV3Internal PageDirectory::getNormalPageId(PageIdV3Internal page_id, const
auto [resolve_state, next_id_to_resolve, next_ver_to_resolve] = iter->second->resolveToPageId(ver_to_resolve.sequence, /*ignore_delete=*/id_to_resolve != page_id, nullptr);
switch (resolve_state)
{
case VersionedPageEntries::RESOLVE_TO_NORMAL:
case ResolveResult::TO_NORMAL:
return id_to_resolve;
case VersionedPageEntries::RESOLVE_FAIL:
case ResolveResult::FAIL:
// resolve failed
keep_resolve = false;
break;
case VersionedPageEntries::RESOLVE_TO_REF:
case ResolveResult::TO_REF:
if (id_to_resolve == next_id_to_resolve)
{
// dead-loop, so break the `while(keep_resolve)`
Expand Down Expand Up @@ -1087,11 +1087,11 @@ void PageDirectory::applyRefEditRecord(
nullptr);
switch (resolve_state)
{
case VersionedPageEntries::RESOLVE_FAIL:
case ResolveResult::FAIL:
return {false, id_to_resolve, ver_to_resolve};
case VersionedPageEntries::RESOLVE_TO_NORMAL:
case ResolveResult::TO_NORMAL:
return {true, id_to_resolve, ver_to_resolve};
case VersionedPageEntries::RESOLVE_TO_REF:
case ResolveResult::TO_REF:
if (id_to_resolve == next_id_to_resolve)
{
return {false, next_id_to_resolve, next_ver_to_resolve};
Expand Down
14 changes: 8 additions & 6 deletions dbms/src/Storages/Page/V3/PageDirectory.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ struct EntryOrDelete
class VersionedPageEntries;
using VersionedPageEntriesPtr = std::shared_ptr<VersionedPageEntries>;
using PageLock = std::lock_guard<std::mutex>;

enum class ResolveResult
{
FAIL,
TO_REF,
TO_NORMAL,
};

class VersionedPageEntries
{
public:
Expand Down Expand Up @@ -161,12 +169,6 @@ class VersionedPageEntries

std::shared_ptr<PageIdV3Internal> fromRestored(const PageEntriesEdit::EditRecord & rec);

enum ResolveResult
{
RESOLVE_FAIL,
RESOLVE_TO_REF,
RESOLVE_TO_NORMAL,
};
std::tuple<ResolveResult, PageIdV3Internal, PageVersion>
resolveToPageId(UInt64 seq, bool ignore_delete, PageEntryV3 * entry);

Expand Down
6 changes: 4 additions & 2 deletions dbms/src/Storages/Page/V3/PageDirectoryFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Storages/Page/V3/PageDirectoryFactory.h>
#include <Storages/Page/V3/PageEntriesEdit.h>
#include <Storages/Page/V3/WAL/WALReader.h>
#include <Storages/Page/V3/WAL/serialize.h>
#include <Storages/Page/V3/WALStore.h>

#include <memory>
Expand Down Expand Up @@ -196,8 +197,8 @@ void PageDirectoryFactory::loadFromDisk(const PageDirectoryPtr & dir, WALStoreRe
{
while (reader->remained())
{
auto [ok, edit] = reader->next();
if (!ok)
auto record = reader->next();
if (!record)
{
// TODO: Handle error, some error could be ignored.
// If the file happened to some error,
Expand All @@ -208,6 +209,7 @@ void PageDirectoryFactory::loadFromDisk(const PageDirectoryPtr & dir, WALStoreRe
}

// apply the edit read
auto edit = ser::deserializeFrom(record.value());
loadEdit(dir, edit);
}
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Storages/Page/V3/WAL/WALReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ LogFilenameSet WALStoreReader::listAllFiles(
std::tuple<std::optional<LogFilename>, LogFilenameSet>
WALStoreReader::findCheckpoint(LogFilenameSet && all_files)
{
LogFilenameSet::const_iterator latest_checkpoint_iter = all_files.cend();
auto latest_checkpoint_iter = all_files.cend();
for (auto iter = all_files.cbegin(); iter != all_files.cend(); ++iter)
{
if (iter->level_num > 0)
Expand Down Expand Up @@ -170,7 +170,7 @@ bool WALStoreReader::remained() const
return false;
}

std::tuple<bool, PageEntriesEdit> WALStoreReader::next()
std::optional<String> WALStoreReader::next()
{
bool ok = false;
String record;
Expand All @@ -179,14 +179,14 @@ std::tuple<bool, PageEntriesEdit> WALStoreReader::next()
std::tie(ok, record) = reader->readRecord();
if (ok)
{
return {true, ser::deserializeFrom(record)};
return record;
}

// Roll to read the next file
if (bool next_file = openNextFile(); !next_file)
{
// No more file to be read.
return {false, PageEntriesEdit{}};
return std::nullopt;
}
} while (true);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Page/V3/WAL/WALReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class WALStoreReader

bool remained() const;

std::tuple<bool, PageEntriesEdit> next();
std::optional<String> next();

void throwIfError() const
{
Expand Down
Loading

0 comments on commit 2811c02

Please sign in to comment.