Skip to content
This repository has been archived by the owner on Jan 3, 2024. It is now read-only.

rgw/sfs: dbconn: add connection pool #233

Merged
merged 10 commits into from
Nov 6, 2023
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,4 @@
url = https://github.com/open-telemetry/opentelemetry-cpp.git
[submodule "src/rgw/driver/sfs/sqlite/sqlite_orm"]
path = src/rgw/driver/sfs/sqlite/sqlite_orm
url = https://github.com/fnc12/sqlite_orm.git
url = https://github.com/aquarist-labs/sqlite_orm.git
62 changes: 44 additions & 18 deletions src/rgw/driver/sfs/sqlite/dbconn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,20 @@ static int sqlite_profile_callback(
}

DBConn::DBConn(CephContext* _cct)
: storage(_make_storage(getDBPath(_cct))),
first_sqlite_conn(nullptr),
: main_thread(std::this_thread::get_id()),
storage_pool_mutex(),
cct(_cct),
profile_enabled(_cct->_conf.get_val<bool>("rgw_sfs_sqlite_profile")) {
sqlite3_config(SQLITE_CONFIG_LOG, &sqlite_error_callback, cct);
storage.on_open = [this](sqlite3* db) {
if (first_sqlite_conn == nullptr) {
first_sqlite_conn = db;
}
// get_storage() relies on there already being an entry in the pool
// for the main thread (i.e. the thread that created the DBConn).
storage_pool.emplace(main_thread, _make_storage(getDBPath(cct)));
StorageRef storage = get_storage();
storage->on_open = [this](sqlite3* db) {
// This is safe because we're either in the main thread, or inside
// storage->on_open() called from get_storage(), which has the exclusive
// mutex.
sqlite_conns.emplace_back(db);

sqlite3_extended_result_codes(db, 1);
sqlite3_busy_timeout(db, 10000);
Expand Down Expand Up @@ -149,11 +154,33 @@ DBConn::DBConn(CephContext* _cct)
);
}
};
storage.open_forever();
storage.busy_timeout(5000);
storage->open_forever();
storage->busy_timeout(5000);
maybe_upgrade_metadata();
check_metadata_is_compatible();
storage.sync_schema();
storage->sync_schema();
}

StorageRef DBConn::get_storage() {
std::thread::id this_thread = std::this_thread::get_id();
try {
std::shared_lock lock(storage_pool_mutex);
return &storage_pool.at(this_thread);
} catch (const std::out_of_range& ex) {
std::unique_lock lock(storage_pool_mutex);
auto [it, _] =
storage_pool.emplace(this_thread, storage_pool.at(main_thread));
StorageRef storage = &(*it).second;
// A copy of the main thread's Storage object won't have an open DB
// connection yet, so we'd better make it have one (otherwise we're
// back to a gadzillion sqlite3_open()/sqlite3_close() calls again)
storage->open_forever();
storage->busy_timeout(5000);
lsubdout(cct, rgw, 10) << "[SQLITE CONNECTION NEW] Added Storage "
<< storage << " to pool for thread " << std::hex
<< this_thread << std::dec << dendl;
return storage;
}
}

void DBConn::check_metadata_is_compatible() const {
Expand All @@ -165,7 +192,7 @@ void DBConn::check_metadata_is_compatible() const {
int rc = sqlite3_open(temporary_db_path.c_str(), &temporary_db);
if (rc == SQLITE_OK) {
sqlite3_backup* backup =
sqlite3_backup_init(temporary_db, "main", first_sqlite_conn, "main");
sqlite3_backup_init(temporary_db, "main", first_sqlite_conn(), "main");
if (backup) {
sqlite3_backup_step(backup, -1);
sqlite3_backup_finish(backup);
Expand Down Expand Up @@ -222,11 +249,9 @@ void DBConn::check_metadata_is_compatible() const {
}
}

static int get_version(
CephContext* cct, rgw::sal::sfs::sqlite::Storage& storage
) {
static int get_version(CephContext* cct, StorageRef storage) {
try {
return storage.pragma.user_version();
return storage->pragma.user_version();
} catch (const std::system_error& e) {
lsubdout(cct, rgw, -1) << "error opening db: " << e.code().message() << " ("
<< e.code().value() << "), " << e.what() << dendl;
Expand Down Expand Up @@ -324,7 +349,7 @@ static int upgrade_metadata_from_v2(sqlite3* db, std::string* errmsg) {
}

static void upgrade_metadata(
CephContext* cct, rgw::sal::sfs::sqlite::Storage& storage, sqlite3* db
CephContext* cct, StorageRef storage, sqlite3* db
) {
while (true) {
int cur_version = get_version(cct, storage);
Expand Down Expand Up @@ -356,20 +381,21 @@ static void upgrade_metadata(
cur_version + 1
)
<< dendl;
storage.pragma.user_version(cur_version + 1);
storage->pragma.user_version(cur_version + 1);
}
}

void DBConn::maybe_upgrade_metadata() {
StorageRef storage = get_storage();
int db_version = get_version(cct, storage);
lsubdout(cct, rgw, 10) << "db user version: " << db_version << dendl;

if (db_version == 0) {
// must have just been created, set version!
storage.pragma.user_version(SFS_METADATA_VERSION);
storage->pragma.user_version(SFS_METADATA_VERSION);
} else if (db_version < SFS_METADATA_VERSION && db_version >= SFS_METADATA_MIN_VERSION) {
// perform schema update
upgrade_metadata(cct, storage, first_sqlite_conn);
upgrade_metadata(cct, storage, first_sqlite_conn());
} else if (db_version < SFS_METADATA_MIN_VERSION) {
throw sqlite_sync_exception(
"Existing metadata too far behind! Unable to upgrade schema!"
Expand Down
32 changes: 21 additions & 11 deletions src/rgw/driver/sfs/sqlite/dbconn.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <filesystem>
#include <ios>
#include <memory>
#include <shared_mutex>

#include "buckets/bucket_definitions.h"
#include "buckets/multipart_definitions.h"
Expand Down Expand Up @@ -151,8 +152,8 @@ inline auto _make_storage(const std::string& path) {
sqlite_orm::make_table(
std::string(VERSIONED_OBJECTS_TABLE),
sqlite_orm::make_column(
"id", &DBVersionedObject::id, sqlite_orm::autoincrement(),
sqlite_orm::primary_key()
"id", &DBVersionedObject::id,
sqlite_orm::primary_key().autoincrement()
),
sqlite_orm::make_column("object_id", &DBVersionedObject::object_id),
sqlite_orm::make_column("checksum", &DBVersionedObject::checksum),
Expand Down Expand Up @@ -182,8 +183,7 @@ inline auto _make_storage(const std::string& path) {
sqlite_orm::make_table(
std::string(ACCESS_KEYS),
sqlite_orm::make_column(
"id", &DBAccessKey::id, sqlite_orm::autoincrement(),
sqlite_orm::primary_key()
"id", &DBAccessKey::id, sqlite_orm::primary_key().autoincrement()
),
sqlite_orm::make_column("access_key", &DBAccessKey::access_key),
sqlite_orm::make_column("user_id", &DBAccessKey::user_id),
Expand Down Expand Up @@ -211,8 +211,7 @@ inline auto _make_storage(const std::string& path) {
sqlite_orm::make_table(
std::string(MULTIPARTS_TABLE),
sqlite_orm::make_column(
"id", &DBMultipart::id, sqlite_orm::primary_key(),
sqlite_orm::autoincrement()
"id", &DBMultipart::id, sqlite_orm::primary_key().autoincrement()
),
sqlite_orm::make_column("bucket_id", &DBMultipart::bucket_id),
sqlite_orm::make_column("upload_id", &DBMultipart::upload_id),
Expand All @@ -236,8 +235,8 @@ inline auto _make_storage(const std::string& path) {
sqlite_orm::make_table(
std::string(MULTIPARTS_PARTS_TABLE),
sqlite_orm::make_column(
"id", &DBMultipartPart::id, sqlite_orm::primary_key(),
sqlite_orm::autoincrement()
"id", &DBMultipartPart::id,
sqlite_orm::primary_key().autoincrement()
),
sqlite_orm::make_column("upload_id", &DBMultipartPart::upload_id),
sqlite_orm::make_column("part_num", &DBMultipartPart::part_num),
Expand All @@ -254,13 +253,16 @@ inline auto _make_storage(const std::string& path) {
}

using Storage = decltype(_make_storage(""));
using StorageRef = Storage*;

class DBConn {
private:
Storage storage;
std::unordered_map<std::thread::id, Storage> storage_pool;
std::vector<sqlite3*> sqlite_conns;
const std::thread::id main_thread;
mutable std::shared_mutex storage_pool_mutex;

public:
sqlite3* first_sqlite_conn;
CephContext* const cct;
const bool profile_enabled;

Expand All @@ -270,7 +272,15 @@ class DBConn {
DBConn(const DBConn&) = delete;
DBConn& operator=(const DBConn&) = delete;

inline auto get_storage() const { return storage; }
StorageRef get_storage();
sqlite3* first_sqlite_conn() const {
std::shared_lock lock(storage_pool_mutex);
return sqlite_conns[0];
}
std::vector<sqlite3*> all_sqlite_conns() const {
std::shared_lock lock(storage_pool_mutex);
return sqlite_conns;
}

static std::string getDBPath(CephContext* cct) {
auto rgw_sfs_path = cct->_conf.get_val<std::string>("rgw_sfs_data_path");
Expand Down
36 changes: 18 additions & 18 deletions src/rgw/driver/sfs/sqlite/sqlite_buckets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ std::optional<DBOPBucketInfo> SQLiteBuckets::get_bucket(
const std::string& bucket_id
) const {
auto storage = conn->get_storage();
auto bucket = storage.get_pointer<DBBucket>(bucket_id);
auto bucket = storage->get_pointer<DBBucket>(bucket_id);
std::optional<DBOPBucketInfo> ret_value;
if (bucket) {
ret_value = get_rgw_bucket(*bucket);
Expand All @@ -53,7 +53,7 @@ std::optional<std::pair<std::string, std::string>> SQLiteBuckets::get_owner(
const std::string& bucket_id
) const {
auto storage = conn->get_storage();
const auto rows = storage.select(
const auto rows = storage->select(
columns(&DBUser::user_id, &DBUser::display_name),
inner_join<DBUser>(on(is_equal(&DBBucket::owner_id, &DBUser::user_id))),
where(is_equal(&DBBucket::bucket_id, bucket_id))
Expand All @@ -70,59 +70,59 @@ std::vector<DBOPBucketInfo> SQLiteBuckets::get_bucket_by_name(
) const {
auto storage = conn->get_storage();
return get_rgw_buckets(
storage.get_all<DBBucket>(where(c(&DBBucket::bucket_name) = bucket_name))
storage->get_all<DBBucket>(where(c(&DBBucket::bucket_name) = bucket_name))
);
}

void SQLiteBuckets::store_bucket(const DBOPBucketInfo& bucket) const {
auto storage = conn->get_storage();
auto db_bucket = get_db_bucket(bucket);
storage.replace(db_bucket);
storage->replace(db_bucket);
}

void SQLiteBuckets::remove_bucket(const std::string& bucket_name) const {
auto storage = conn->get_storage();
storage.remove<DBBucket>(bucket_name);
storage->remove<DBBucket>(bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids() const {
auto storage = conn->get_storage();
return storage.select(&DBBucket::bucket_name);
return storage->select(&DBBucket::bucket_name);
}

std::vector<std::string> SQLiteBuckets::get_bucket_ids(
const std::string& user_id
) const {
auto storage = conn->get_storage();
return storage.select(
return storage->select(
&DBBucket::bucket_name, where(c(&DBBucket::owner_id) = user_id)
);
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets() const {
auto storage = conn->get_storage();
return get_rgw_buckets(storage.get_all<DBBucket>());
return get_rgw_buckets(storage->get_all<DBBucket>());
}

std::vector<DBOPBucketInfo> SQLiteBuckets::get_buckets(
const std::string& user_id
) const {
auto storage = conn->get_storage();
return get_rgw_buckets(
storage.get_all<DBBucket>(where(c(&DBBucket::owner_id) = user_id))
storage->get_all<DBBucket>(where(c(&DBBucket::owner_id) = user_id))
);
}

std::vector<std::string> SQLiteBuckets::get_deleted_buckets_ids() const {
auto storage = conn->get_storage();
return storage.select(
return storage->select(
&DBBucket::bucket_id, where(c(&DBBucket::deleted) = true)
);
}

bool SQLiteBuckets::bucket_empty(const std::string& bucket_id) const {
auto storage = conn->get_storage();
auto num_ids = storage.count<DBVersionedObject>(
auto num_ids = storage->count<DBVersionedObject>(
inner_join<DBObject>(
on(is_equal(&DBObject::uuid, &DBVersionedObject::object_id))
),
Expand All @@ -142,9 +142,9 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
RetrySQLiteBusy<DBDeletedObjectItems> retry([&]() {
bucket_deleted = false;
DBDeletedObjectItems ret_values;
storage.begin_transaction();
auto transaction = storage->transaction_guard();
// first get all the objects and versions for that bucket
ret_values = storage.select(
ret_values = storage->select(
columns(&DBObject::uuid, &DBVersionedObject::id),
inner_join<DBObject>(
on(is_equal(&DBObject::uuid, &DBVersionedObject::object_id))
Expand All @@ -154,14 +154,14 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
);
for (auto const& uuid_version : ret_values) {
// remove the versions first
storage.remove<DBVersionedObject>(std::get<1>(uuid_version));
storage->remove<DBVersionedObject>(std::get<1>(uuid_version));
// try to delete the object (it will throw an exception if it's not
// empty yet)
// possible errors when object is not empty are:
// SQLITE_CONSTRAINT: legacy sqlite error
// SQLITE_CONSTRAINT_FOREIGNKEY: extended sqlite error
try {
storage.remove<DBObject>(std::get<0>(uuid_version));
storage->remove<DBObject>(std::get<0>(uuid_version));
} catch (const std::system_error& e) {
if (e.code().value() != SQLITE_CONSTRAINT_FOREIGNKEY &&
e.code().value() != SQLITE_CONSTRAINT) {
Expand All @@ -171,15 +171,15 @@ std::optional<DBDeletedObjectItems> SQLiteBuckets::delete_bucket_transact(
}
// try to delete the bucket
try {
storage.remove<DBBucket>(bucket_id);
storage->remove<DBBucket>(bucket_id);
bucket_deleted = true;
} catch (const std::system_error& e) {
if (e.code().value() != SQLITE_CONSTRAINT_FOREIGNKEY &&
e.code().value() != SQLITE_CONSTRAINT) {
throw(e);
}
}
storage.commit();
transaction.commit();
return ret_values;
});
return retry.run();
Expand All @@ -191,7 +191,7 @@ const std::optional<SQLiteBuckets::Stats> SQLiteBuckets::get_stats(
auto storage = conn->get_storage();
std::optional<SQLiteBuckets::Stats> stats;

auto res = storage.select(
auto res = storage->select(
columns(
count(&DBVersionedObject::object_id), sum(&DBVersionedObject::size)
),
Expand Down
Loading
Loading