Skip to content

Commit

Permalink
Schema: allow loading empty schema diff when the version grows up. (#…
Browse files Browse the repository at this point in the history
…5245)

close #5244
  • Loading branch information
jiaqizho authored Jul 4, 2022
1 parent 09402e3 commit a89222a
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 28 deletions.
13 changes: 11 additions & 2 deletions dbms/src/Debug/MockSchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,25 @@
#include <Debug/MockTiDB.h>
#include <TiDB/Schema/SchemaGetter.h>

#include <optional>

namespace DB
{

struct MockSchemaGetter
{
TiDB::DBInfoPtr getDatabase(DatabaseID db_id) { return MockTiDB::instance().getDBInfoByID(db_id); }

Int64 getVersion() { return MockTiDB::instance().getVersion(); }

SchemaDiff getSchemaDiff(Int64 version) { return MockTiDB::instance().getSchemaDiff(version); }
std::optional<SchemaDiff> getSchemaDiff(Int64 version)
{
return MockTiDB::instance().getSchemaDiff(version);
}

bool checkSchemaDiffExists(Int64 version)
{
return MockTiDB::instance().checkSchemaDiffExists(version);
}

TiDB::TableInfoPtr getTableInfo(DatabaseID, TableID table_id) { return MockTiDB::instance().getTableInfoByID(table_id); }

Expand Down
7 changes: 6 additions & 1 deletion dbms/src/Debug/MockTiDB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -668,9 +668,14 @@ std::pair<bool, DatabaseID> MockTiDB::getDBIDByName(const String & database_name
return std::make_pair(false, -1);
}

SchemaDiff MockTiDB::getSchemaDiff(Int64 version_)
std::optional<SchemaDiff> MockTiDB::getSchemaDiff(Int64 version_)
{
return version_diff[version_];
}

bool MockTiDB::checkSchemaDiffExists(Int64 version)
{
return version_diff.find(version) != version_diff.end();
}

} // namespace DB
4 changes: 3 additions & 1 deletion dbms/src/Debug/MockTiDB.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ class MockTiDB : public ext::Singleton<MockTiDB>

std::pair<bool, DatabaseID> getDBIDByName(const String & database_name);

SchemaDiff getSchemaDiff(Int64 version);
bool checkSchemaDiffExists(Int64 version);

std::optional<SchemaDiff> getSchemaDiff(Int64 version);

std::unordered_map<String, DatabaseID> getDatabases() { return databases; }

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/Transaction/ReadIndexWorker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -880,7 +880,7 @@ BatchReadIndexRes ReadIndexWorkerManager::batchReadIndex(
}
}
{ // if meet timeout, which means part of regions can not get response from leader, try to poll rest tasks
TEST_LOG_FMT("rest {}, poll rest tasks onece", tasks.size());
TEST_LOG_FMT("rest {}, poll rest tasks once", tasks.size());

while (!tasks.empty())
{
Expand Down
13 changes: 10 additions & 3 deletions dbms/src/TiDB/Schema/SchemaGetter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

namespace DB
{

namespace ErrorCodes
{
extern const int SCHEMA_SYNC_ERROR;
Expand Down Expand Up @@ -188,18 +187,26 @@ Int64 SchemaGetter::getVersion()
return std::stoll(ver);
}

bool SchemaGetter::checkSchemaDiffExists(Int64 ver)
{
String key = getSchemaDiffKey(ver);
String data = TxnStructure::get(snap, key);
return !data.empty();
}

String SchemaGetter::getSchemaDiffKey(Int64 ver)
{
return std::string(schemaDiffPrefix) + ":" + std::to_string(ver);
}

SchemaDiff SchemaGetter::getSchemaDiff(Int64 ver)
std::optional<SchemaDiff> SchemaGetter::getSchemaDiff(Int64 ver)
{
String key = getSchemaDiffKey(ver);
String data = TxnStructure::get(snap, key);
if (data.empty())
{
throw TiFlashException("cannot find schema diff for version: " + std::to_string(ver), Errors::Table::SyncError);
LOG_FMT_WARNING(log, "The schema diff for version {}, key {} is empty.", ver, key);
return std::nullopt;
}
SchemaDiff diff;
diff.deserialize(data);
Expand Down
6 changes: 5 additions & 1 deletion dbms/src/TiDB/Schema/SchemaGetter.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@

#include <common/logger_useful.h>

#include <optional>

namespace DB
{
// The enum results are completely the same as the DDL Action listed in the "parser/model/ddl.go" of TiDB codebase, which must be keeping in sync.
Expand Down Expand Up @@ -138,7 +140,9 @@ struct SchemaGetter

Int64 getVersion();

SchemaDiff getSchemaDiff(Int64 ver);
bool checkSchemaDiffExists(Int64 ver);

std::optional<SchemaDiff> getSchemaDiff(Int64 ver);

static String getSchemaDiffKey(Int64 ver);

Expand Down
79 changes: 60 additions & 19 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,21 +106,31 @@ struct TiDBSchemaSyncer : public SchemaSyncer
Stopwatch watch;
SCOPE_EXIT({ GET_METRIC(tiflash_schema_apply_duration_seconds).Observe(watch.elapsedSeconds()); });

LOG_FMT_INFO(log, "start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version);
LOG_FMT_INFO(log, "Start to sync schemas. current version is: {} and try to sync schema version to: {}", cur_version, version);

// Show whether the schema mutex is held for a long time or not.
GET_METRIC(tiflash_schema_applying).Set(1.0);
SCOPE_EXIT({ GET_METRIC(tiflash_schema_applying).Set(0.0); });

GET_METRIC(tiflash_schema_apply_count, type_diff).Increment();
if (!tryLoadSchemaDiffs(getter, version, context))
// After the feature concurrent DDL, TiDB does `update schema version` before `set schema diff`, and they are done in separate transactions.
// So TiFlash may see a schema version X but no schema diff X, meaning that the transaction of schema diff X has not been committed or has
// been aborted.
// However, TiDB makes sure that if we get a schema version X, then the schema diff X-1 must exist. Otherwise the transaction of schema diff
// X-1 is aborted and we can safely ignore it.
// Since TiDB can not make sure the schema diff of the latest schema version X is not empty, under this situation we should set the `cur_version`
// to X-1 and try to fetch the schema diff X next time.
Int64 version_after_load_diff = 0;
if (version_after_load_diff = tryLoadSchemaDiffs(getter, version, context); version_after_load_diff == -1)
{
GET_METRIC(tiflash_schema_apply_count, type_full).Increment();
loadAllSchema(getter, version, context);
// After loadAllSchema, we need update `version_after_load_diff` by last diff value exist or not
version_after_load_diff = getter.checkSchemaDiffExists(version) ? version : version - 1;
}
cur_version = version;
cur_version = version_after_load_diff;
GET_METRIC(tiflash_schema_version).Set(cur_version);
LOG_FMT_INFO(log, "end sync schema, version has been updated to {}", cur_version);
LOG_FMT_INFO(log, "End sync schema, version has been updated to {}{}", cur_version, cur_version == version ? "" : "(latest diff is empty)");
return true;
}

Expand All @@ -144,30 +154,60 @@ struct TiDBSchemaSyncer : public SchemaSyncer
return it->second;
}

bool tryLoadSchemaDiffs(Getter & getter, Int64 version, Context & context)
// Return Values
// - if latest schema diff is not empty, return the (latest_version)
// - if latest schema diff is empty, return the (latest_version - 1)
// - if error happend, return (-1)
Int64 tryLoadSchemaDiffs(Getter & getter, Int64 latest_version, Context & context)
{
if (isTooOldSchema(cur_version, version))
if (isTooOldSchema(cur_version, latest_version))
{
return false;
return -1;
}

LOG_FMT_DEBUG(log, "try load schema diffs.");
LOG_FMT_DEBUG(log, "Try load schema diffs.");

SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, version);
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, latest_version);

Int64 used_version = cur_version;
std::vector<SchemaDiff> diffs;
while (used_version < version)
// First get all schema diff from `cur_version` to `latest_version`. Only apply the schema diff(s) if we fetch all
// schema diff without any exception.
std::vector<std::optional<SchemaDiff>> diffs;
while (used_version < latest_version)
{
used_version++;
diffs.push_back(getter.getSchemaDiff(used_version));
}
LOG_FMT_DEBUG(log, "end load schema diffs with total {} entries.", diffs.size());
LOG_FMT_DEBUG(log, "End load schema diffs with total {} entries.", diffs.size());

try
{
for (const auto & diff : diffs)
for (size_t diff_index = 0; diff_index < diffs.size(); ++diff_index)
{
builder.applyDiff(diff);
const auto & schema_diff = diffs[diff_index];

if (!schema_diff)
{
// If `schema diff` from `latest_version` got empty `schema diff`
// Then we won't apply to `latest_version`, but we will apply to `latest_version - 1`
// If `schema diff` from [`cur_version`, `latest_version - 1`] got empty `schema diff`
// Then we should just skip it.
//
// example:
// - `cur_version` is 1, `latest_version` is 10
// - The schema diff of schema version [2,4,6] is empty, Then we just skip it.
// - The schema diff of schema version 10 is empty, Then we should just apply version into 9
if (diff_index != diffs.size() - 1)
{
LOG_FMT_WARNING(log, "Skip the schema diff from version {}. ", cur_version + diff_index + 1);
continue;
}

// if diff_index == diffs.size() - 1, return used_version - 1;
return used_version - 1;
}

builder.applyDiff(*schema_diff);
}
}
catch (TiFlashException & e)
Expand All @@ -177,7 +217,7 @@ struct TiDBSchemaSyncer : public SchemaSyncer
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
}
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
return -1;
}
catch (Exception & e)
{
Expand All @@ -187,21 +227,22 @@ struct TiDBSchemaSyncer : public SchemaSyncer
}
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {} \n stack is {}", e.displayText(), e.getStackTrace().toString());
return false;
return -1;
}
catch (Poco::Exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.displayText());
return false;
return -1;
}
catch (std::exception & e)
{
GET_METRIC(tiflash_schema_apply_count, type_failed).Increment();
LOG_FMT_WARNING(log, "apply diff meets exception : {}", e.what());
return false;
return -1;
}
return true;

return used_version;
}

void loadAllSchema(Getter & getter, Int64 version, Context & context)
Expand Down

0 comments on commit a89222a

Please sign in to comment.