Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GC data on S3 #6899

Merged
merged 29 commits into from
Mar 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5a92bbb
add new type
JaySon-Huang Feb 23, 2023
da8ba34
v3
JaySon-Huang Feb 24, 2023
44c2362
Add S3GCManager
JaySon-Huang Feb 24, 2023
b077dbb
Clean outdated manifest
JaySon-Huang Feb 24, 2023
30d648e
expired with same timepoint
JaySon-Huang Feb 24, 2023
c8526e1
One gc config
JaySon-Huang Feb 24, 2023
36009fd
Add ut test
JaySon-Huang Feb 24, 2023
9ec8e98
Register s3gc manager to TMTContext
JaySon-Huang Feb 24, 2023
4de4a03
init with mock lock client
JaySon-Huang Feb 25, 2023
bb5ae7c
Add todo for remove dmfile objects
JaySon-Huang Feb 25, 2023
b197f02
Clean files of tombstoned store
JaySon-Huang Feb 27, 2023
9bf0e1c
Add shutdown
JaySon-Huang Feb 27, 2023
3858440
Check with owner manager
JaySon-Huang Feb 27, 2023
2ff13c3
Resolve conflict
JaySon-Huang Feb 28, 2023
430afa3
update by PS::V3::CheckpointLocation
JaySon-Huang Feb 28, 2023
94e31a2
Parse manifest for locks
JaySon-Huang Feb 28, 2023
59a05ec
save
JaySon-Huang Mar 2, 2023
b6c1f1a
Remove logic of putting remote page
JaySon-Huang Mar 2, 2023
6fe1e86
preserve manifest logic
JaySon-Huang Mar 2, 2023
6a9eee2
Fix ut
JaySon-Huang Mar 3, 2023
d23c21c
Merge remote-tracking branch 'upstream/master' into s3gc
JaySon-Huang Mar 3, 2023
dfaaee5
Address comment
JaySon-Huang Mar 3, 2023
9fbd561
Merge branch 'master' into s3gc
ti-chi-bot Mar 3, 2023
9856036
Merge branch 'master' into s3gc
ti-chi-bot Mar 3, 2023
9a3f415
Fix bug
JaySon-Huang Mar 3, 2023
6c7a875
Add ut
JaySon-Huang Mar 3, 2023
df165f2
Merge branch 'master' into s3gc
ti-chi-bot Mar 3, 2023
fbd3dc3
Format files
JaySon-Huang Mar 3, 2023
60166fe
Merge branch 'master' into s3gc
ti-chi-bot Mar 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions dbms/src/Flash/Disaggregated/S3LockClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

namespace DB
{
class Context;
class Logger;
using LoggerPtr = std::shared_ptr<Logger>;
} // namespace DB
Expand All @@ -43,7 +42,7 @@ class IS3LockClient
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
// `timeout_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) = 0;

Expand All @@ -52,7 +51,7 @@ class IS3LockClient
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
// `timeout_s`, it will throw exception.
virtual std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) = 0;
};
Expand All @@ -69,7 +68,7 @@ class S3LockClient : public IS3LockClient
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
// `timeout_s`, it will throw exception.
std::pair<bool, String>
sendTryAddLockRequest(const String & data_file_key, UInt32 lock_store_id, UInt32 lock_seq, Int64 timeout_s) override;

Expand All @@ -78,7 +77,7 @@ class S3LockClient : public IS3LockClient
// Otherwise return <false, conflict_message>
// This method will update the owner info when owner changed.
// If deadline exceed or failed to get the owner info within
// `timeour_s`, it will throw exception.
// `timeout_s`, it will throw exception.
std::pair<bool, String>
sendTryMarkDeleteRequest(const String & data_file_key, Int64 timeout_s) override;

Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ class Context
}
bool isDisaggregatedStorageMode() const
{
return disaggregated_mode == DisaggregatedMode::Storage;
// there is no difference
return disaggregated_mode == DisaggregatedMode::Storage || disaggregated_mode == DisaggregatedMode::None;
}

const std::shared_ptr<DB::DM::SharedBlockSchemas> & getSharedBlockSchemas() const;
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Storages/Page/V2/PageFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include <boost/algorithm/string/classification.hpp>
#include <ext/scope_guard.h>
#include <magic_enum.hpp>

#ifndef __APPLE__
#include <fcntl.h>
Expand Down Expand Up @@ -120,7 +121,7 @@ std::pair<ByteBuffer, ByteBuffer> genWriteData( //
meta_write_bytes += (sizeof(PageId) + sizeof(PageId));
break;
case WriteBatchWriteType::PUT_EXTERNAL:
throw Exception("Should not serialize with `PUT_EXTERNAL`");
throw Exception(ErrorCodes::LOGICAL_ERROR, "Should not serialize with {}", magic_enum::enum_name(write.type));
break;
}
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Storages/Page/V2/VersionSet/PageEntriesBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include <Storages/Page/V2/VersionSet/PageEntriesBuilder.h>

#include <magic_enum.hpp>

namespace DB::PS::V2
{
void PageEntriesBuilder::apply(const PageEntriesEdit & edit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <common/logger_useful.h>
#include <common/types.h>

#include <magic_enum.hpp>
#include <stack>

#ifdef FIU_ENABLE
Expand Down Expand Up @@ -454,7 +455,7 @@ void DeltaVersionEditAcceptor::apply(PageEntriesEdit & edit)
this->applyRef(rec);
break;
case WriteBatchWriteType::UPSERT:
throw Exception("WriteType::UPSERT should only write by gcApply!", ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "DeltaVersionEditAcceptor::apply with invalid type {}", magic_enum::enum_name(rec.type));
break;
}
}
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Storages/Page/V3/BlobStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ BlobStore<Trait>::handleLargeWrite(typename Trait::WriteBatch & wb, const WriteL
edit.putExternal(wb.getFullPageId(write.page_id));
break;
case WriteBatchWriteType::UPSERT:
throw Exception(fmt::format("Unknown write type: {}", magic_enum::enum_name(write.type)));
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown write type: {}", magic_enum::enum_name(write.type));
}
}

Expand Down Expand Up @@ -280,8 +280,7 @@ BlobStore<Trait>::write(typename Trait::WriteBatch & wb, const WriteLimiterPtr &
}
case WriteBatchWriteType::PUT:
case WriteBatchWriteType::UPSERT:
throw Exception(fmt::format("write batch have a invalid total size == 0 while this kind of entry exist, write_type={}", static_cast<Int32>(write.type)),
ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "write batch have a invalid total size == 0 while this kind of entry exist, write_type={}", magic_enum::enum_name(write.type));
}
}
return edit;
Expand Down
3 changes: 3 additions & 0 deletions dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class UniversalPageStorage final
void registerUniversalExternalPagesCallbacks(const UniversalExternalPageCallbacks & callbacks);
void unregisterUniversalExternalPagesCallbacks(const String & prefix);

friend class PageReaderImplUniversal;

// private: // TODO: make these private
String storage_name; // Identify between different Storage
PSDiskDelegatorPtr delegator; // Get paths for storing data
PageStorageConfig config;
Expand Down
19 changes: 10 additions & 9 deletions dbms/src/Storages/Page/V3/Universal/UniversalWriteBatchImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#pragma once

#include <IO/ReadBufferFromString.h>
#include <Storages/Page/PageDefinesBase.h>
#include <Storages/Page/V3/Universal/UniversalPageId.h>
#include <Storages/Page/V3/Universal/UniversalPageIdFormatImpl.h>
#include <Storages/Page/WriteBatchImpl.h>
Expand Down Expand Up @@ -152,13 +153,6 @@ class UniversalWriteBatch : private boost::noncopyable
total_data_size += rhs.total_data_size;
}

void clear()
{
Writes tmp;
writes.swap(tmp);
total_data_size = 0;
}

size_t getTotalDataSize() const
{
return total_data_size;
Expand Down Expand Up @@ -196,7 +190,14 @@ class UniversalWriteBatch : private boost::noncopyable
return fmt_buffer.toString();
}

UniversalWriteBatch(UniversalWriteBatch && rhs)
void clear()
{
Writes tmp;
writes.swap(tmp);
total_data_size = 0;
}

UniversalWriteBatch(UniversalWriteBatch && rhs) noexcept
: prefix(std::move(rhs.prefix))
, writes(std::move(rhs.writes))
, total_data_size(rhs.total_data_size)
Expand All @@ -206,7 +207,7 @@ class UniversalWriteBatch : private boost::noncopyable
{
prefix.swap(o.prefix);
writes.swap(o.writes);
std::swap(o.total_data_size, total_data_size);
std::swap(total_data_size, o.total_data_size);
}

private:
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Storages/Page/tools/PageCtl/PageStorageCtlV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Encryption/MockKeyManager.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/FormattingChannel.h>
Expand All @@ -27,6 +28,8 @@
#include <Storages/PathPool.h>
#include <TestUtils/MockDiskDelegator.h>

#include <magic_enum.hpp>

using namespace DB::PS::V2;
DB::WriteBatch::SequenceID debugging_recover_stop_sequence = 0;

Expand Down Expand Up @@ -272,6 +275,8 @@ void dump_all_entries(PageFileSet & page_files, int32_t mode)
page_file.getFileId(),
page_file.getLevel());
break;
default:
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "illegal type: {}", magic_enum::enum_name(record.type));
}
}
}
Expand Down
109 changes: 109 additions & 0 deletions dbms/src/Storages/S3/CheckpointManifestS3Set.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/Exception.h>
#include <Storages/S3/CheckpointManifestS3Set.h>
#include <Storages/S3/S3Filename.h>

#include <unordered_set>

namespace DB::S3
{
CheckpointManifestS3Set
CheckpointManifestS3Set::getFromS3(const S3::TiFlashS3Client & client, StoreID store_id)
{
const auto manifest_prefix = S3::S3Filename::fromStoreId(store_id).toManifestPrefix();

std::vector<CheckpointManifestS3Object> manifests;

listPrefix(client, client.bucket(), manifest_prefix, [&](const Aws::S3::Model::ListObjectsV2Result & result) {
const auto & objects = result.GetContents();
manifests.reserve(manifests.size() + objects.size());
for (const auto & object : objects)
{
const auto & mf_key = object.GetKey();
// also store the object.GetLastModified() for removing
// outdated manifest objects
manifests.emplace_back(CheckpointManifestS3Object{mf_key, object.GetLastModified()});
}
return DB::S3::PageResult{.num_keys = objects.size(), .more = true};
});
return CheckpointManifestS3Set::create(manifests);
}

CheckpointManifestS3Set
CheckpointManifestS3Set::create(const std::vector<CheckpointManifestS3Object> & manifest_keys)
{
CheckpointManifestS3Set set;
for (const auto & mf_obj : manifest_keys)
{
const auto filename_view = S3::S3FilenameView::fromKey(mf_obj.key);
RUNTIME_CHECK(filename_view.type == S3::S3FilenameType::CheckpointManifest, mf_obj.key);
auto upload_seq = filename_view.getUploadSequence();
auto [iter, ok] = set.manifests.emplace(upload_seq, mf_obj);
RUNTIME_CHECK_MSG(ok, "duplicated upload seq, prev_mf_key={} duplicated_mf_key={}", iter->second.key, mf_obj.key);
}
return set;
}

Strings CheckpointManifestS3Set::preservedManifests(size_t max_preserved, Int64 expired_hour, const Aws::Utils::DateTime & timepoint) const
{
assert(!manifests.empty());

Strings preserved_mf;
// the latest manifest
auto iter = manifests.rbegin();
preserved_mf.emplace_back(iter->second.key);
iter++; // move to next
const auto expired_bound_sec = expired_hour * 3600;
for (; iter != manifests.rend(); ++iter)
{
auto diff_sec = Aws::Utils::DateTime::Diff(timepoint, iter->second.last_modification).count() / 1000.0;
if (diff_sec > expired_bound_sec)
{
break;
}

preserved_mf.emplace_back(iter->second.key);
if (preserved_mf.size() >= max_preserved)
{
break;
}
}
return preserved_mf;
}

std::vector<CheckpointManifestS3Object>
CheckpointManifestS3Set::outdatedObjects(
size_t max_preserved,
Int64 expired_hour,
const Aws::Utils::DateTime & timepoint) const
{
auto preserved_mfs = preservedManifests(max_preserved, expired_hour, timepoint);
std::unordered_set<String> preserved_set;
for (const auto & s : preserved_mfs)
preserved_set.emplace(s);

// the manifest object that does not appear in reserved set
std::vector<CheckpointManifestS3Object> outdated;
for (const auto & [seq, obj] : manifests)
{
if (preserved_set.count(obj.key) > 0)
continue;
outdated.emplace_back(obj);
}
return outdated;
}

} // namespace DB::S3
72 changes: 72 additions & 0 deletions dbms/src/Storages/S3/CheckpointManifestS3Set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/Types.h>
#include <Storages/S3/S3Common.h>
#include <Storages/Transaction/Types.h>
#include <aws/core/utils/DateTime.h>
#include <common/defines.h>
#include <common/types.h>

#include <map>
#include <vector>

namespace DB::S3
{
struct CheckpointManifestS3Object
{
String key;
Aws::Utils::DateTime last_modification;
};

class CheckpointManifestS3Set
{
public:
static CheckpointManifestS3Set getFromS3(const S3::TiFlashS3Client & client, StoreID store_id);

static CheckpointManifestS3Set create(const std::vector<CheckpointManifestS3Object> & manifest_keys);

ALWAYS_INLINE bool empty() const { return manifests.empty(); }

UInt64 latestUploadSequence() const
{
assert(!manifests.empty());
return manifests.rbegin()->first;
}

const String & latestManifestKey() const
{
assert(!manifests.empty());
return manifests.rbegin()->second.key;
}

// The number of preserved manifest file is 1 <= num <= max_preserved.
// If the manifest modification time is older than timepoint - expired_hour,
// we won't preserve it.
Strings preservedManifests(size_t max_preserved, Int64 expired_hour, const Aws::Utils::DateTime & timepoint) const;

// The manifest objects that should be removed from S3
std::vector<CheckpointManifestS3Object> outdatedObjects(size_t max_preserved, Int64 expired_hour, const Aws::Utils::DateTime & timepoint) const;

const std::map<UInt64, CheckpointManifestS3Object> & objects() const { return manifests; }

private:
// a order map to let values sorted by upload_seq
// upload_seq -> {manifest_key, mtime}
std::map<UInt64, CheckpointManifestS3Object> manifests;
};

} // namespace DB::S3
1 change: 1 addition & 0 deletions dbms/src/Storages/S3/S3Common.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class ClientFactory
bool isEnabled() const;

void init(const StorageS3Config & config_, bool mock_s3_ = false);

void shutdown();

const String & bucket() const;
Expand Down
Loading