Skip to content

Commit

Permalink
Only update tiflash replica info when applying replica SchemaDiff
Browse files Browse the repository at this point in the history
  • Loading branch information
JaySon-Huang committed Nov 26, 2023
1 parent aa5139b commit 8d72656
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 37 deletions.
120 changes: 90 additions & 30 deletions dbms/src/TiDB/Schema/SchemaBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include <TiDB/Schema/SchemaNameMapper.h>
#include <TiDB/Schema/TiDB.h>
#include <common/logger_useful.h>
#include <fmt/format.h>

#include <boost/algorithm/string/join.hpp>
#include <magic_enum.hpp>
Expand Down Expand Up @@ -340,7 +341,7 @@ void SchemaBuilder<Getter, NameMapper>::applyDiff(const SchemaDiff & diff)
{
// >= SchemaActionType::MaxRecognizedType
// log down the Int8 value directly
LOG_ERROR(log, "Unsupported change type: {}, diff_version={}", static_cast<Int8>(diff.type), diff.version);
LOG_ERROR(log, "Unsupported change type: {}, diff_version={}", fmt::underlying(diff.type), diff.version);
}

break;
Expand All @@ -358,10 +359,10 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
return;
}

auto & tmt_context = context.getTMTContext();
if (table_info->replica_info.count == 0)
{
// if set 0, drop table in TiFlash
auto & tmt_context = context.getTMTContext();
auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id);
if (unlikely(storage == nullptr))
{
Expand All @@ -377,8 +378,7 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
}

assert(table_info->replica_info.count != 0);
// if set not 0, we first check whether the storage exists, and then check the replica_count and available
auto & tmt_context = context.getTMTContext();
// Replica info is set to non-zero, create the storage if not exists.
auto storage = tmt_context.getStorages().get(keyspace_id, table_info->id);
if (storage == nullptr)
{
Expand All @@ -389,47 +389,85 @@ void SchemaBuilder<Getter, NameMapper>::applySetTiFlashReplica(DatabaseID databa
return;
}

if (storage->getTombstone() != 0)
// Recover the table if tombstoned
if (storage->isTombstone())
{
applyRecoverTable(db_info->id, table_id);
applyRecoverLogicalTable(db_info, table_info);
return;
}

auto managed_storage = std::dynamic_pointer_cast<IManageableStorage>(storage);
auto storage_replica_info = managed_storage->getTableInfo().replica_info;
if (storage_replica_info.count == table_info->replica_info.count
&& storage_replica_info.available == table_info->replica_info.available)
auto local_logical_storage_table_info = storage->getTableInfo(); // copy
// Check whether replica_count and available changed
if (const auto & local_logical_storage_replica_info = local_logical_storage_table_info.replica_info;
local_logical_storage_replica_info.count == table_info->replica_info.count
&& local_logical_storage_replica_info.available == table_info->replica_info.available)
{
return;
return; // nothing changed
}

size_t old_replica_count = 0;
size_t new_replica_count = 0;
if (table_info->isLogicalPartitionTable())
{
for (const auto & part_def : table_info->partition.definitions)
{
auto new_part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper);
auto part_storage = tmt_context.getStorages().get(keyspace_id, new_part_table_info->id);
if (part_storage != nullptr)
if (part_storage == nullptr)
{
table_id_map.emplacePartitionTableID(part_def.id, table_id);
continue;
}
{
auto alter_lock = part_storage->lockForAlter(getThreadNameAndID());
auto local_table_info = part_storage->getTableInfo(); // copy
old_replica_count = local_table_info.replica_info.count;
new_replica_count = new_part_table_info->replica_info.count;
// Only update the replica info, do not change other fields. Or it may
// lead to other DDL is unexpectedly ignored.
local_table_info.replica_info = new_part_table_info->replica_info;
part_storage->alterSchemaChange(
alter_lock,
*new_part_table_info,
local_table_info,
name_mapper.mapDatabaseName(db_info->id, keyspace_id),
name_mapper.mapTableName(*new_part_table_info),
name_mapper.mapTableName(local_table_info),
context);
}
else
table_id_map.emplacePartitionTableID(part_def.id, table_id);
LOG_INFO(
log,
"Updating replica info, replica count old={} new={} available={}"
" physical_table_id={} logical_table_id={}",
old_replica_count,
new_replica_count,
table_info->replica_info.available,
part_def.id,
table_id);
}
}
auto alter_lock = storage->lockForAlter(getThreadNameAndID());
storage->alterSchemaChange(
alter_lock,
*table_info,
name_mapper.mapDatabaseName(db_info->id, keyspace_id),
name_mapper.mapTableName(*table_info),
context);

{
auto alter_lock = storage->lockForAlter(getThreadNameAndID());
old_replica_count = local_logical_storage_table_info.replica_info.count;
new_replica_count = table_info->replica_info.count;
// Only update the replica info, do not change other fields. Or it may
// lead to other DDL is unexpectedly ignored.
local_logical_storage_table_info.replica_info = table_info->replica_info;
storage->alterSchemaChange(
alter_lock,
local_logical_storage_table_info,
name_mapper.mapDatabaseName(db_info->id, keyspace_id),
name_mapper.mapTableName(local_logical_storage_table_info),
context);
}
LOG_INFO(
log,
"Updating replica info, replica count old={} new={} available={}"
" physical_table_id={} logical_table_id={}",
old_replica_count,
new_replica_count,
table_info->replica_info.available,
table_id,
table_id);
}

template <typename Getter, typename NameMapper>
Expand Down Expand Up @@ -688,8 +726,8 @@ void SchemaBuilder<Getter, NameMapper>::applyRecoverTable(DatabaseID database_id
{
for (const auto & part_def : table_info->partition.definitions)
{
auto new_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper);
tryRecoverPhysicalTable(db_info, new_table_info);
auto part_table_info = table_info->producePartitionTableInfo(part_def.id, name_mapper);
tryRecoverPhysicalTable(db_info, part_table_info);
}
}

Expand Down Expand Up @@ -1223,16 +1261,38 @@ void SchemaBuilder<Getter, NameMapper>::syncAllSchema()
LOG_INFO(log, "Sync all schemas end");
}

/**
* Update the schema of given `physical_table_id`.
* This function ensure only the lock of `physical_table_id` is involved.
*
* Param `database_id`, `logical_table_id` is to key to fetch the latest table info. If
* something wrong when generating the table info of `physical_table_id`, it means the
* TableID mapping is not up-to-date. This function will return false and the caller
* should update the TableID mapping then retry.
* If the caller ensure the TableID mapping is up-to-date, then it should call with
* `force == true`
*/
template <typename Getter, typename NameMapper>
bool SchemaBuilder<Getter, NameMapper>::applyTable(
DatabaseID database_id,
TableID logical_table_id,
TableID physical_table_id)
TableID physical_table_id,
bool force)
{
// Here we get table info without mvcc. If the table has been renamed to another
// database, it will return false and the caller should update the table_id_map
// then retry.
auto table_info = getter.getTableInfo(database_id, logical_table_id, /*try_mvcc*/ false);
// When `force==false`, we get table info without mvcc. So we can detect that whether
// the table has been renamed to another database or dropped.
// If the table has been renamed to another database, it is dangerous to use the
// old table info from the old database because some new columns may have been
// added to the new table.
// For the reason above, if we can not get table info without mvcc, this function
// will return false and the caller should update the table_id_map then retry.
//
// When `force==true`, the caller ensure the TableID mapping is up-to-date, so we
// need to get table info with mvcc. It can return the table info even if a table is
// dropped but not physically removed by TiDB/TiKV gc_safepoint.
// It is need for TiFlash correctly decoding the data and get ready for `RECOVER TABLE`
// and `RECOVER DATABASE`.
auto table_info = getter.getTableInfo(database_id, logical_table_id, /*try_mvcc*/ force);
if (table_info == nullptr)
{
LOG_WARNING(
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/TiDB/Schema/SchemaBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ struct SchemaBuilder

void dropAllSchema();

bool applyTable(DatabaseID database_id, TableID logical_table_id, TableID physical_table_id);
bool applyTable(DatabaseID database_id, TableID logical_table_id, TableID physical_table_id, bool force);

private:
void applyDropSchema(DatabaseID schema_id);
Expand All @@ -74,7 +74,7 @@ struct SchemaBuilder
void applyDropTable(DatabaseID database_id, TableID table_id);

void applyRecoverTable(DatabaseID database_id, TiDB::TableID table_id);

void applyRecoverLogicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);
bool tryRecoverPhysicalTable(const TiDB::DBInfoPtr & db_info, const TiDB::TableInfoPtr & table_info);

/// Parameter schema_name should be mapped.
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/TiDB/Schema/TiDBSchemaSyncer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ std::tuple<bool, String> TiDBSchemaSyncer<mock_getter, mock_mapper>::trySyncTabl
Context & context,
TableID physical_table_id,
Getter & getter,
bool force,
const char * next_action)
{
// Get logical_table_id and database_id by physical_table_id.
Expand All @@ -177,7 +178,7 @@ std::tuple<bool, String> TiDBSchemaSyncer<mock_getter, mock_mapper>::trySyncTabl
// If the table schema apply is failed, then we need to update the table-id-mapping
// and retry.
SchemaBuilder<Getter, NameMapper> builder(getter, context, databases, table_id_map);
if (!builder.applyTable(database_id, logical_table_id, physical_table_id))
if (!builder.applyTable(database_id, logical_table_id, physical_table_id, force))
{
String message = fmt::format(
"Can not apply table schema because the table_id_map is not up-to-date, {}."
Expand Down Expand Up @@ -207,7 +208,7 @@ bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncTableSchema(Context & conte
/// Note that we don't need a lock at the beginning of syncTableSchema.
/// The AlterLock for storage will be acquired in `SchemaBuilder::applyTable`.
auto [need_update_id_mapping, message]
= trySyncTableSchema(context, physical_table_id, getter, "try to syncSchemas");
= trySyncTableSchema(context, physical_table_id, getter, false, "try to syncSchemas");
if (!need_update_id_mapping)
{
LOG_INFO(log, "Sync table schema end, table_id={}", physical_table_id);
Expand All @@ -219,7 +220,7 @@ bool TiDBSchemaSyncer<mock_getter, mock_mapper>::syncTableSchema(Context & conte
// Notice: must use the same getter
syncSchemasByGetter(context, getter);
std::tie(need_update_id_mapping, message)
= trySyncTableSchema(context, physical_table_id, getter, "sync table schema fail");
= trySyncTableSchema(context, physical_table_id, getter, true, "sync table schema fail");
if (likely(!need_update_id_mapping))
{
LOG_INFO(log, "Sync table schema end after syncSchemas, table_id={}", physical_table_id);
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/TiDB/Schema/TiDBSchemaSyncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class TiDBSchemaSyncer : public SchemaSyncer

LoggerPtr log;

DatabaseInfoCache databases;
TableIDMap table_id_map;
DatabaseInfoCache databases;

Getter createSchemaGetter(KeyspaceID keyspace_id)
{
Expand Down Expand Up @@ -105,6 +105,7 @@ class TiDBSchemaSyncer : public SchemaSyncer
Context & context,
TableID physical_table_id,
Getter & getter,
bool force,
const char * next_action);

TiDB::DBInfoPtr getDBInfoByName(const String & database_name) override
Expand Down
2 changes: 1 addition & 1 deletion tests/fullstack-test2/ddl/alter_exchange_partition.test
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ mysql> set session tidb_isolation_read_engines='tiflash'; select * from test.e2
| 3 | a | b | 3 |
+----+-------+-------+------+

# case 12, create partition table, non-partition table and execute exchagne partition immediately
## case 12, create partition table, non-partition table and execute exchagne partition immediately
mysql> drop table if exists test.e
mysql> drop table if exists test.e2
mysql> create table test.e(id INT NOT NULL,fname VARCHAR(30),lname VARCHAR(30)) PARTITION BY RANGE (id) ( PARTITION p0 VALUES LESS THAN (50),PARTITION p1 VALUES LESS THAN (100),PARTITION p2 VALUES LESS THAN (150), PARTITION p3 VALUES LESS THAN (MAXVALUE));
Expand Down

0 comments on commit 8d72656

Please sign in to comment.