diff --git a/ent/src/yb/integration-tests/twodc-test.cc b/ent/src/yb/integration-tests/twodc-test.cc index b9906de7311d..306499383e33 100644 --- a/ent/src/yb/integration-tests/twodc-test.cc +++ b/ent/src/yb/integration-tests/twodc-test.cc @@ -2174,7 +2174,7 @@ TEST_P(TwoDCTest, TestProducerUniverseExpansion) { } TEST_P(TwoDCTest, TestAlterDDLBasic) { - FLAGS_xcluster_wait_on_ddl_alter = true; + SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter); uint32_t replication_factor = 1; // Use just one tablet here to more easily catch lower-level write issues with this test. @@ -2265,7 +2265,7 @@ TEST_P(TwoDCTest, TestAlterDDLBasic) { } TEST_P(TwoDCTest, TestAlterDDLWithRestarts) { - FLAGS_xcluster_wait_on_ddl_alter = true; + SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter); uint32_t replication_factor = 3; auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor, 2, 3)); diff --git a/ent/src/yb/integration-tests/twodc_test_base.cc b/ent/src/yb/integration-tests/twodc_test_base.cc index 97bfadc29c2f..dd26bbc3a511 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.cc +++ b/ent/src/yb/integration-tests/twodc_test_base.cc @@ -33,6 +33,7 @@ #include "yb/master/master_ddl.proxy.h" #include "yb/master/master_replication.proxy.h" #include "yb/master/mini_master.h" +#include "yb/master/sys_catalog_initialization.h" #include "yb/rpc/rpc_controller.h" #include "yb/tserver/cdc_consumer.h" #include "yb/tserver/mini_tablet_server.h" @@ -43,7 +44,12 @@ #include "yb/yql/pgwrapper/pg_wrapper.h" DECLARE_bool(enable_tablet_split_of_xcluster_replicated_tables); +DECLARE_bool(enable_ysql); +DECLARE_bool(hide_pg_catalog_table_creation_logs); +DECLARE_bool(master_auto_run_initdb); DECLARE_int32(replication_factor); +DECLARE_int32(pggate_rpc_timeout_secs); +DECLARE_string(pgsql_proxy_bind_address); DECLARE_int32(pgsql_proxy_webserver_port); namespace yb { @@ -54,39 +60,74 @@ using tserver::enterprise::CDCConsumer; namespace enterprise { -Status TwoDCTestBase::InitClusters(const MiniClusterOptions& opts) { +Status TwoDCTestBase::InitClusters(const MiniClusterOptions& opts, bool init_postgres) { FLAGS_replication_factor = static_cast(opts.num_tablet_servers); // Disable tablet split for regular tests, see xcluster-tablet-split-itest for those tests. FLAGS_enable_tablet_split_of_xcluster_replicated_tables = false; + if (init_postgres) { + master::SetDefaultInitialSysCatalogSnapshotFlags(); + FLAGS_enable_ysql = true; + FLAGS_hide_pg_catalog_table_creation_logs = true; + FLAGS_master_auto_run_initdb = true; + FLAGS_pggate_rpc_timeout_secs = 120; + } + auto producer_opts = opts; producer_opts.cluster_id = "producer"; producer_cluster_.mini_cluster_ = std::make_unique(producer_opts); + // Randomly select the tserver index that will serve the postgres proxy. + const size_t pg_ts_idx = RandomUniformInt(0, opts.num_tablet_servers - 1); + const std::string pg_addr = server::TEST_RpcAddress(pg_ts_idx + 1, server::Private::kTrue); + + // The 'pgsql_proxy_bind_address' flag must be set before starting the producer cluster. Each + // tserver will store this address when it starts. + const uint16_t producer_pg_port = producer_cluster_.mini_cluster_->AllocateFreePort(); + FLAGS_pgsql_proxy_bind_address = Format("$0:$1", pg_addr, producer_pg_port); + + RETURN_NOT_OK(producer_cluster()->StartSync()); + auto consumer_opts = opts; consumer_opts.cluster_id = "consumer"; consumer_cluster_.mini_cluster_ = std::make_unique(consumer_opts); - RETURN_NOT_OK(producer_cluster()->StartSync()); + // Use a new pg proxy port for the consumer cluster. + const uint16_t consumer_pg_port = consumer_cluster_.mini_cluster_->AllocateFreePort(); + FLAGS_pgsql_proxy_bind_address = Format("$0:$1", pg_addr, consumer_pg_port); + RETURN_NOT_OK(consumer_cluster()->StartSync()); RETURN_NOT_OK(RunOnBothClusters([&opts](MiniCluster* cluster) { return cluster->WaitForTabletServerCount(opts.num_tablet_servers); })); + // Verify the that the selected tablets have their rpc servers bound to the expected pg addr. + CHECK_EQ(producer_cluster_.mini_cluster_->mini_tablet_server(pg_ts_idx)->bound_rpc_addr(). + address().to_string(), pg_addr); + CHECK_EQ(consumer_cluster_.mini_cluster_->mini_tablet_server(pg_ts_idx)->bound_rpc_addr(). + address().to_string(), pg_addr); + producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient()); consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient()); + if (init_postgres) { + RETURN_NOT_OK(InitPostgres(&producer_cluster_, pg_ts_idx, producer_pg_port)); + RETURN_NOT_OK(InitPostgres(&consumer_cluster_, pg_ts_idx, consumer_pg_port)); + } + return Status::OK(); } -Status TwoDCTestBase::InitPostgres(Cluster* cluster) { +Status TwoDCTestBase::InitPostgres(Cluster* cluster, const size_t pg_ts_idx, uint16_t pg_port) { RETURN_NOT_OK(WaitForInitDb(cluster->mini_cluster_.get())); - auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers()); - auto port = cluster->mini_cluster_->AllocateFreePort(); + + tserver::MiniTabletServer *const pg_ts = cluster->mini_cluster_->mini_tablet_server(pg_ts_idx); + CHECK(pg_ts); + yb::pgwrapper::PgProcessConf pg_process_conf = VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb( - yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)), + yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), pg_port)), pg_ts->options()->fs_opts.data_paths.front() + "/pg_data", pg_ts->server()->GetSharedMemoryFd())); pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag; diff --git a/ent/src/yb/integration-tests/twodc_test_base.h b/ent/src/yb/integration-tests/twodc_test_base.h index e47fd541ff34..66128820b786 100644 --- a/ent/src/yb/integration-tests/twodc_test_base.h +++ b/ent/src/yb/integration-tests/twodc_test_base.h @@ -90,11 +90,7 @@ class TwoDCTestBase : public YBTest { FLAGS_flush_rocksdb_on_shutdown = false; } - Status InitClusters(const MiniClusterOptions& opts); - - // Not thread safe. FLAGS_pgsql_proxy_webserver_port is modified each time this is called so this - // is not safe to run in parallel. - Status InitPostgres(Cluster* cluster); + Status InitClusters(const MiniClusterOptions& opts, bool init_postgres = false); void TearDown() override; @@ -224,6 +220,11 @@ class TwoDCTestBase : public YBTest { protected: Cluster producer_cluster_; Cluster consumer_cluster_; + + private: + // Not thread safe. FLAGS_pgsql_proxy_webserver_port is modified each time this is called so this + // is not safe to run in parallel. + Status InitPostgres(Cluster* cluster, const size_t pg_ts_idx, uint16_t pg_port); }; } // namespace enterprise diff --git a/ent/src/yb/integration-tests/twodc_ysql-test.cc b/ent/src/yb/integration-tests/twodc_ysql-test.cc index 21ccd4dddd12..2d42d84d3e65 100644 --- a/ent/src/yb/integration-tests/twodc_ysql-test.cc +++ b/ent/src/yb/integration-tests/twodc_ysql-test.cc @@ -62,7 +62,6 @@ #include "yb/master/master_util.h" #include "yb/master/master_replication.proxy.h" #include "yb/master/master-test-util.h" -#include "yb/master/sys_catalog_initialization.h" #include "yb/rpc/rpc_controller.h" #include "yb/tablet/tablet.h" @@ -85,12 +84,9 @@ #include "yb/yql/pgwrapper/libpq_utils.h" #include "yb/yql/pgwrapper/pg_wrapper.h" +DECLARE_int32(replication_factor); DECLARE_int32(cdc_max_apply_batch_num_records); DECLARE_int32(client_read_write_timeout_ms); -DECLARE_bool(enable_ysql); -DECLARE_bool(hide_pg_catalog_table_creation_logs); -DECLARE_bool(master_auto_run_initdb); -DECLARE_int32(pggate_rpc_timeout_secs); DECLARE_bool(enable_delete_truncate_xcluster_replicated_table); DECLARE_int32(update_min_cdc_indices_interval_secs); DECLARE_int32(log_cache_size_limit_mb); @@ -101,6 +97,9 @@ DECLARE_int32(log_max_seconds_to_retain); DECLARE_int32(log_min_segments_to_retain); DECLARE_bool(check_bootstrap_required); DECLARE_bool(enable_load_balancing); +DECLARE_string(pgsql_proxy_bind_address); +DECLARE_uint64(TEST_pg_auth_key); +DECLARE_bool(ysql_disable_index_backfill); DECLARE_bool(ysql_enable_packed_row); DECLARE_uint64(ysql_packed_row_size_limit); DECLARE_bool(xcluster_wait_on_ddl_alter); @@ -139,24 +138,21 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterface(); + MiniClusterOptions opts; opts.num_tablet_servers = replication_factor; opts.num_masters = num_masters; - RETURN_NOT_OK(InitClusters(opts)); - - RETURN_NOT_OK(InitPostgres(&producer_cluster_)); - RETURN_NOT_OK(InitPostgres(&consumer_cluster_)); + RETURN_NOT_OK(InitClusters(opts, true /* init_postgres */)); return Status::OK(); } @@ -204,6 +200,15 @@ class TwoDCYsqlTest : public TwoDCTestBase, public testing::WithParamInterfaceConnect()); + EXPECT_OK(conn.ExecuteFormat("CREATE DATABASE $0$1", + namespace_name, colocated ? " colocated = true" : "")); + return Status::OK(); + } + Result GetUniverseId(Cluster* cluster) { master::GetMasterClusterConfigRequestPB req; master::GetMasterClusterConfigResponsePB resp; @@ -594,7 +599,7 @@ TEST_P(TwoDCYsqlTest, SimpleReplication) { TEST_P(TwoDCYsqlTest, ReplicationWithBasicDDL) { YB_SKIP_TEST_IN_TSAN(); - FLAGS_xcluster_wait_on_ddl_alter = true; + SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter); string new_column = "contact_name"; constexpr auto kRecordBatch = 5; @@ -843,6 +848,108 @@ TEST_P(TwoDCYsqlTest, ReplicationWithBasicDDL) { count += kRecordBatch; ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(count); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + /***************************/ + /****** FORCE RESUME *******/ + /***************************/ + auto missing_column = "missing"; + // 1. ALTER Table to Add a Column on Producer. + { + auto tbl = producer_table->name(); + auto conn = EXPECT_RESULT(producer_cluster_.ConnectToDB(tbl.namespace_name())); + ASSERT_OK(conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN $1 VARCHAR", + tbl.table_name(), missing_column)); + } + + // 2. Write more data so we have some entries with the new schema. + WriteWorkload(count, count + kRecordBatch, &producer_cluster_, producer_table->name()); + // 3. Verify ALTER was parsed by Consumer, which stopped replication and hasn't read the new data. + ASSERT_OK(is_consumer_halted_on_ddl()); + LOG(INFO) << "Consumer count after Producer ALTER halted polling = " + << EXPECT_RESULT(data_replicated_correctly(count)); + // 4. Force Resume on the Consumer. + SetAtomicFlag(false, &FLAGS_xcluster_wait_on_ddl_alter); + // 5. Verify Replication continued and new schema Producer entries are added to Consumer. + count += kRecordBatch; + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(count); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); +} + +TEST_P(TwoDCYsqlTest, ReplicationWithCreateIndexDDL) { + YB_SKIP_TEST_IN_TSAN(); + SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter); + FLAGS_ysql_disable_index_backfill = false; + string new_column = "alt"; + constexpr auto kIndexName = "TestIndex"; + + constexpr auto kRecordBatch = 5; + auto count = 0; + constexpr int kNTabletsPerTable = 4; + std::vector tables_vector = {kNTabletsPerTable}; + auto tables = ASSERT_RESULT(SetUpWithParams(tables_vector, tables_vector, 1)); + const string kUniverseId = ASSERT_RESULT(GetUniverseId(&producer_cluster_)); + + // Tables contains both producer and consumer universe tables (alternately). + ASSERT_EQ(tables.size(), 2); + std::shared_ptr producer_table(tables[0]), consumer_table(tables[1]); + + ASSERT_OK(SetupUniverseReplication(producer_cluster(), consumer_cluster(), consumer_client(), + kUniverseId, {producer_table})); + master::GetUniverseReplicationResponsePB get_universe_replication_resp; + ASSERT_OK(VerifyUniverseReplication(consumer_cluster(), consumer_client(), kUniverseId, + &get_universe_replication_resp)); + + auto producer_conn = EXPECT_RESULT(producer_cluster_.ConnectToDB( + producer_table->name().namespace_name())); + auto consumer_conn = EXPECT_RESULT(consumer_cluster_.ConnectToDB( + consumer_table->name().namespace_name())); + + // Add a second column & populate with data. + ASSERT_OK(producer_conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN $1 int", + producer_table->name().table_name(), new_column)); + ASSERT_OK(consumer_conn.ExecuteFormat("ALTER TABLE $0 ADD COLUMN $1 int", + consumer_table->name().table_name(), new_column)); + ASSERT_OK(producer_conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series($1, $1 + $2), " + "generate_series($1 + 11, $1 + $2 + 11))", + producer_table->name().table_name(), count, kRecordBatch - 1)); + ASSERT_OK(VerifyWrittenRecords(producer_table->name(), consumer_table->name())); + count += kRecordBatch; + + // Create an Index on the second column. + ASSERT_OK(producer_conn.ExecuteFormat("CREATE INDEX $0 ON $1 ($2 ASC)", + kIndexName, producer_table->name().table_name(), new_column)); + + const std::string query = Format("SELECT * FROM $0 ORDER BY $1", + producer_table->name().table_name(), new_column); + ASSERT_TRUE(ASSERT_RESULT(producer_conn.HasIndexScan(query))); + PGResultPtr res = ASSERT_RESULT(producer_conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), count); + ASSERT_EQ(PQnfields(res.get()), 2); + + // Verify that the Consumer is still getting new traffic after the index is created. + ASSERT_OK(producer_conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series($1, $1 + $2), " + "generate_series($1 + 11, $1 + $2 + 11))", + producer_table->name().table_name(), count, kRecordBatch - 1)); + ASSERT_OK(VerifyWrittenRecords(producer_table->name(), consumer_table->name())); + count += kRecordBatch; + + // Drop the Index. + ASSERT_OK(producer_conn.ExecuteFormat("DROP INDEX $0", kIndexName)); + + // The main Table should no longer list having an index. + ASSERT_FALSE(ASSERT_RESULT(producer_conn.HasIndexScan(query))); + res = ASSERT_RESULT(producer_conn.Fetch(query)); + ASSERT_EQ(PQntuples(res.get()), count); + ASSERT_EQ(PQnfields(res.get()), 2); + + // Verify that we're still getting traffic to the Consumer after the index drop. + ASSERT_OK(producer_conn.ExecuteFormat( + "INSERT INTO $0 VALUES (generate_series($1, $1 + $2), " + "generate_series($1 + 11, $1 + $2 + 11))", + producer_table->name().table_name(), count, kRecordBatch - 1)); + ASSERT_OK(VerifyWrittenRecords(producer_table->name(), consumer_table->name())); } @@ -1009,6 +1116,9 @@ TEST_P(TwoDCYsqlTest, SetupUniverseReplicationWithProducerBootstrapId) { TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { YB_SKIP_TEST_IN_TSAN(); + SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter); + constexpr auto kRecordBatch = 5; + auto count = 0; constexpr int kNTabletsPerColocatedTable = 1; constexpr int kNTabletsPerTable = 3; std::vector tables_vector = {kNTabletsPerColocatedTable, kNTabletsPerColocatedTable}; @@ -1062,13 +1172,15 @@ TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { // 1. Write some data to all tables. for (const auto& producer_table : producer_tables) { LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); - WriteWorkload(0, 100, &producer_cluster_, producer_table->name()); + WriteWorkload(count, count + kRecordBatch, &producer_cluster_, producer_table->name()); } + count += kRecordBatch; // 2. Setup replication for only the colocated tables. // Get the producer namespace id, so we can construct the colocated parent table id. GetNamespaceInfoResponsePB ns_resp; ASSERT_OK(producer_client()->GetNamespaceInfo("", kNamespaceName, YQL_DATABASE_PGSQL, &ns_resp)); + auto colocated_parent_table_id = master::GetColocatedDbParentTableId(ns_resp.namespace_().id()); rpc::RpcController rpc; master::SetupUniverseReplicationRequestPB setup_universe_req; @@ -1079,8 +1191,7 @@ TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { HostPortsToPBs(hp_vec, setup_universe_req.mutable_producer_master_addresses()); // Only need to add the colocated parent table id. setup_universe_req.mutable_producer_table_ids()->Reserve(1); - setup_universe_req.add_producer_table_ids( - master::GetColocatedDbParentTableId(ns_resp.namespace_().id())); + setup_universe_req.add_producer_table_ids(colocated_parent_table_id); auto* consumer_leader_mini_master = ASSERT_RESULT(consumer_cluster()->GetLeaderMiniMaster()); auto master_proxy = std::make_shared( &consumer_client()->proxy_cache(), @@ -1116,7 +1227,7 @@ TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { } return true; }; - ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100, true); }, + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(count, true); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); // Ensure that the non colocated table is not replicated. auto non_coloc_results = ScanToStrings(non_colocated_consumer_table->name(), &consumer_cluster_); @@ -1143,18 +1254,64 @@ TEST_P(TwoDCYsqlTest, ColocatedDatabaseReplication) { ASSERT_OK(CorrectlyPollingAllTablets( consumer_cluster(), kNTabletsPerColocatedTable + kNTabletsPerTable)); // Check that all data is replicated for the new table as well. - ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100, false); }, + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(count, false); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); // 6. Add additional data to all tables for (const auto& producer_table : producer_tables) { LOG(INFO) << "Writing records for table " << producer_table->name().ToString(); - WriteWorkload(100, 150, &producer_cluster_, producer_table->name()); + WriteWorkload(count, count + kRecordBatch, &producer_cluster_, producer_table->name()); } + count += kRecordBatch; // 7. Verify all tables are properly replicated. - ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(150, false); }, + ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(count, false); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // Test Add Colocated Table, which is an ALTER operation. + std::shared_ptr new_colocated_producer_table, new_colocated_consumer_table; + + // Add a Colocated Table on the Producer for an existing Replication stream. + { + std::vector tables; + uint32_t idx = static_cast(tables_vector.size()) + 1; + const int co_id = (idx) * 111111; + auto table = ASSERT_RESULT(CreateYsqlTable(&producer_cluster_, kNamespaceName, "", + Format("test_table_$0", idx), boost::none, kNTabletsPerColocatedTable, true, co_id)); + ASSERT_OK(producer_client()->OpenTable(table, &new_colocated_producer_table)); + } + + // 2. Write data so we have some entries on the new colocated table. + WriteWorkload(0, kRecordBatch, &producer_cluster_, new_colocated_producer_table->name()); + + { + // Matching schema to consumer should succeed. + std::vector tables; + uint32_t idx = static_cast(tables_vector.size()) + 1; + const int co_id = (idx) * 111111; + auto table = ASSERT_RESULT(CreateYsqlTable(&consumer_cluster_, kNamespaceName, "", + Format("test_table_$0", idx), boost::none, kNTabletsPerColocatedTable, true, co_id)); + ASSERT_OK(consumer_client()->OpenTable(table, &new_colocated_consumer_table)); + } + + // 5. Verify the new schema Producer entries are added to Consumer. + count += kRecordBatch; + ASSERT_OK(WaitFor([&]() -> Result { + LOG(INFO) << "Checking records for table " << new_colocated_consumer_table->name().ToString(); + auto consumer_results = ScanToStrings(new_colocated_consumer_table->name(), &consumer_cluster_); + auto num_results = kRecordBatch; + if (num_results != PQntuples(consumer_results.get())) { + return false; + } + int result; + for (int i = 0; i < num_results; ++i) { + result = VERIFY_RESULT(GetInt32(consumer_results.get(), i, 0)); + if (i != result) { + return false; + } + } + return true; + }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); } TEST_P(TwoDCYsqlTest, ColocatedDatabaseDifferentColocationIds) { @@ -1248,8 +1405,9 @@ TEST_P(TwoDCYsqlTest, TablegroupReplication) { ASSERT_OK(CorrectlyPollingAllTablets(consumer_cluster(), 1)); // 4. Check that tables are being replicated. - auto data_replicated_correctly = [&](int num_results) -> Result { - for (const auto& consumer_table : consumer_tables) { + auto data_replicated_correctly = [&](std::vector> tables, + int num_results) -> Result { + for (const auto& consumer_table : tables) { LOG(INFO) << "Checking records for table " << consumer_table->name().ToString(); auto consumer_results = ScanToStrings(consumer_table->name(), &consumer_cluster_); @@ -1267,7 +1425,7 @@ TEST_P(TwoDCYsqlTest, TablegroupReplication) { return true; }; - ASSERT_OK(WaitFor([&]() -> Result { return data_replicated_correctly(100); }, + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(consumer_tables, 100); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); // 5. Write more data. @@ -1275,8 +1433,80 @@ TEST_P(TwoDCYsqlTest, TablegroupReplication) { WriteWorkload(100, 105, &producer_cluster_, producer_table->name()); } - ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(105); }, + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(consumer_tables, 105); }, MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + ASSERT_TRUE(FLAGS_xcluster_wait_on_ddl_alter); + // Add a new table to the existing Tablegroup. This is essentially an ALTER TABLE. + std::vector> producer_new_table; + { + std::vector tables; + std::shared_ptr new_tablegroup_producer_table; + uint32_t idx = static_cast(producer_tables.size()) + 1; + ASSERT_OK(CreateYsqlTable(idx, 1, &producer_cluster_, &tables, kTablegroupName)); + ASSERT_OK(producer_client()->OpenTable(tables[0], &new_tablegroup_producer_table)); + producer_new_table.push_back(new_tablegroup_producer_table); + } + + // TODO (#14234): Verify that Replication stops once we support tablegroups. +/* + // Verify that Replication stopped. + ASSERT_OK(WaitFor([&]() -> Result { + master::SysClusterConfigEntryPB cluster_info; + auto& cm = VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager(); + RETURN_NOT_OK(cm.GetClusterConfig(&cluster_info)); + auto& producer_map = cluster_info.consumer_registry().producer_map(); + auto producer_entry = FindOrNull(producer_map, kUniverseId); + if (producer_entry) { + auto stream_map_iter = producer_entry->stream_map().begin(); + return stream_map_iter != producer_entry->stream_map().end() && + stream_map_iter->second.has_producer_schema() && + stream_map_iter->second.producer_schema().has_pending_schema(); + } + return false; + }, MonoDelta::FromSeconds(20), "IsConsumerHaltedOnDDL")); +*/ + + for (const auto& producer_table : producer_tables) { + WriteWorkload(105, 110, &producer_cluster_, producer_table->name()); + } + + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(consumer_tables, 110); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); + + // TODO (#14234): Try to add a bad table. Ensure we're stopped. + + // Add the compatible table. Ensure that writes to the table are now replicated. + std::vector> consumer_new_table; + { + std::vector tables; + std::shared_ptr new_tablegroup_consumer_table; + uint32_t idx = static_cast(consumer_tables.size()) + 1; + ASSERT_OK(CreateYsqlTable(idx, 1, &consumer_cluster_, &tables, kTablegroupName)); + ASSERT_OK(consumer_client()->OpenTable(tables[0], &new_tablegroup_consumer_table)); + consumer_new_table.push_back(new_tablegroup_consumer_table); + } + + // Verify that Replication is NOT halted. + ASSERT_OK(WaitFor([&]() -> Result { + master::SysClusterConfigEntryPB cluster_info; + auto& cm = VERIFY_RESULT(consumer_cluster()->GetLeaderMiniMaster())->catalog_manager(); + RETURN_NOT_OK(cm.GetClusterConfig(&cluster_info)); + auto& producer_map = cluster_info.consumer_registry().producer_map(); + auto producer_entry = FindOrNull(producer_map, kUniverseId); + if (producer_entry) { + auto stream_map_iter = producer_entry->stream_map().begin(); + return stream_map_iter != producer_entry->stream_map().end() && + (!stream_map_iter->second.has_producer_schema() || + !stream_map_iter->second.producer_schema().has_pending_schema()); + } + return false; + }, MonoDelta::FromSeconds(20), "IsConsumerResumedAfterDDL")); + + // Replication should work on this new table. + WriteWorkload(0, 10, &producer_cluster_, producer_new_table.begin()->get()->name()); + ASSERT_OK(WaitFor([&]() { return data_replicated_correctly(consumer_new_table, 10); }, + MonoDelta::FromSeconds(20), "IsDataReplicatedCorrectly")); } TEST_P(TwoDCYsqlTest, TablegroupReplicationMismatch) { diff --git a/ent/src/yb/integration-tests/xcluster_safe_time-itest.cc b/ent/src/yb/integration-tests/xcluster_safe_time-itest.cc index b3e7b2bd952b..a8d01444f291 100644 --- a/ent/src/yb/integration-tests/xcluster_safe_time-itest.cc +++ b/ent/src/yb/integration-tests/xcluster_safe_time-itest.cc @@ -24,7 +24,6 @@ #include "yb/master/master_ddl.pb.h" #include "yb/master/master_defaults.h" #include "yb/master/master_replication.pb.h" -#include "yb/master/sys_catalog_initialization.h" #include "yb/tablet/tablet_peer.h" #include "yb/tserver/mini_tablet_server.h" #include "yb/tserver/tablet_server.h" @@ -40,11 +39,7 @@ using namespace std::chrono_literals; DECLARE_int32(xcluster_safe_time_update_interval_secs); DECLARE_bool(TEST_xcluster_simulate_have_more_records); DECLARE_bool(enable_load_balancing); -DECLARE_bool(enable_ysql); DECLARE_bool(xcluster_consistent_reads); -DECLARE_bool(master_auto_run_initdb); -DECLARE_bool(hide_pg_catalog_table_creation_logs); -DECLARE_int32(pggate_rpc_timeout_secs); DECLARE_int32(TEST_xcluster_simulated_lag_ms); DECLARE_string(TEST_xcluster_simulated_lag_tablet_filter); DECLARE_int32(cdc_max_apply_batch_num_records); @@ -259,11 +254,6 @@ string GetCompleteTableName(const YBTableName& table) { class XClusterSafeTimeYsqlTest : public TwoDCTestBase { public: void SetUp() override { - FLAGS_enable_ysql = true; - FLAGS_master_auto_run_initdb = true; - FLAGS_hide_pg_catalog_table_creation_logs = true; - FLAGS_pggate_rpc_timeout_secs = 120; - // Disable LB as we dont want tablets moving during the test FLAGS_enable_load_balancing = false; FLAGS_xcluster_safe_time_update_interval_secs = 1; @@ -271,15 +261,11 @@ class XClusterSafeTimeYsqlTest : public TwoDCTestBase { FLAGS_xcluster_safe_time_update_interval_secs * 5s * kTimeMultiplier; FLAGS_xcluster_consistent_reads = true; - master::SetDefaultInitialSysCatalogSnapshotFlags(); TwoDCTestBase::SetUp(); MiniClusterOptions opts; opts.num_masters = kMasterCount; opts.num_tablet_servers = kTServerCount; - ASSERT_OK(InitClusters(opts)); - - ASSERT_OK(InitPostgres(&producer_cluster_)); - ASSERT_OK(InitPostgres(&consumer_cluster_)); + ASSERT_OK(InitClusters(opts, true /* init_postgres */)); auto producer_cluster_future = std::async(std::launch::async, [&] { auto table_name = ASSERT_RESULT(CreateYsqlTable( diff --git a/ent/src/yb/master/catalog_manager.h b/ent/src/yb/master/catalog_manager.h index f577213aa6e7..2b351e153a63 100644 --- a/ent/src/yb/master/catalog_manager.h +++ b/ent/src/yb/master/catalog_manager.h @@ -280,6 +280,8 @@ class CatalogManager : public yb::master::CatalogManager, SnapshotCoordinatorCon void EnableTabletSplitting(const std::string& feature) override; + Status RunXClusterBgTasks(); + void StartXClusterParentTabletDeletionTaskIfStopped(); void ScheduleXClusterParentTabletDeletionTask(); diff --git a/ent/src/yb/master/catalog_manager_ent.cc b/ent/src/yb/master/catalog_manager_ent.cc index 1e34f9595a76..b4ba97e3ff5b 100644 --- a/ent/src/yb/master/catalog_manager_ent.cc +++ b/ent/src/yb/master/catalog_manager_ent.cc @@ -6398,13 +6398,11 @@ Status CatalogManager::UpdateConsumerOnProducerMetadata( rpc::RpcContext* rpc) { LOG_WITH_FUNC(INFO) << " from " << RequestorString(rpc) << ": " << req->DebugString(); - if (!FLAGS_xcluster_wait_on_ddl_alter) { + if (!GetAtomicFlag(&FLAGS_xcluster_wait_on_ddl_alter)) { resp->set_should_wait(false); return Status::OK(); } - auto& producer_meta_pb = req->producer_change_metadata_request(); - auto& producer_schema_pb = producer_meta_pb.schema(); auto u_id = req->producer_id(); auto stream_id = req->stream_id(); @@ -6443,9 +6441,7 @@ Status CatalogManager::UpdateConsumerOnProducerMetadata( auto schema_cached = stream_entry->mutable_producer_schema(); auto version_validated = schema_cached->validated_schema_version(); - // Grab the local Consumer schema and compare it to the Producer's schema. - Schema consumer_schema, producer_schema; - RETURN_NOT_OK(SchemaFromPB(producer_schema_pb, &producer_schema)); + auto& producer_meta_pb = req->producer_change_metadata_request(); auto version_received = producer_meta_pb.schema_version(); if (version_validated > 0 && version_received <= version_validated) { @@ -6453,38 +6449,52 @@ Status CatalogManager::UpdateConsumerOnProducerMetadata( resp->set_should_wait(false); return Status::OK(); } - RETURN_NOT_OK(table->GetSchema(&consumer_schema)); - if (consumer_schema.EquivalentForDataCopy(producer_schema)) { - resp->set_should_wait(false); - LOG(INFO) << "Received Compatible Producer schema version: " << version_received; - // Update the schema version if we're functionally equivalent. - if (version_received > version_validated) { - DCHECK(!schema_cached->has_pending_schema()); - schema_cached->set_validated_schema_version(version_received); + // If we have a full schema, then we can do a schema comparison. + if (producer_meta_pb.has_schema()) { + // Grab the local Consumer schema and compare it to the Producer's schema. + auto& producer_schema_pb = producer_meta_pb.schema(); + Schema consumer_schema, producer_schema; + RETURN_NOT_OK(SchemaFromPB(producer_schema_pb, &producer_schema)); + RETURN_NOT_OK(table->GetSchema(&consumer_schema)); + + if (consumer_schema.EquivalentForDataCopy(producer_schema)) { + resp->set_should_wait(false); + LOG(INFO) << "Received Compatible Producer schema version: " << version_received; + // Update the schema version if we're functionally equivalent. + if (version_received > version_validated) { + DCHECK(!schema_cached->has_pending_schema()); + schema_cached->set_validated_schema_version(version_received); + } else { + // Nothing to modify. Don't write to sys catalog. + return Status::OK(); + } } else { - // Nothing to modify. Don't write to sys catalog. - return Status::OK(); + resp->set_should_wait(true); + LOG(WARNING) << Format("XCluster Schema mismatch $0 \n Consumer={$1} \n Producer={$2}", + consumer_table_id, consumer_schema.ToString(), producer_schema.ToString()); + + // Incompatible schema: store, wait for all tablet reports, then make the DDL change. + auto producer_schema = stream_entry->mutable_producer_schema(); + if (!producer_schema->has_pending_schema()) { + // Copy the schema. + producer_schema->mutable_pending_schema()->CopyFrom(producer_schema_pb); + producer_schema->set_pending_schema_version(version_received); + } else { + // Why would we be getting different schema versions across tablets? Partial apply? + DCHECK_EQ(version_received, producer_schema->pending_schema_version()); + } } + + RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), + "Updating cluster config in sys-catalog")); + l.Commit(); } else { - LOG(WARNING) << Format("XCluster Schema mismatch $0 \n Consumer={$1} \n Producer={$2}", - consumer_table_id, consumer_schema.ToString(), producer_schema.ToString()); - - // Incompatible schema: store, wait for all tablet reports, then make the DDL change. - auto producer_schema = stream_entry->mutable_producer_schema(); - if (!producer_schema->has_pending_schema()) { - // Copy the schema. - producer_schema->mutable_pending_schema()->CopyFrom(producer_schema_pb); - producer_schema->set_pending_schema_version(version_received); - } else { - // Why would we be getting different schema versions across tablets? Partial apply? - DCHECK_EQ(version_received, producer_schema->pending_schema_version()); - } + resp->set_should_wait(false); + // TODO (#14234): Support colocated tables / tablegroups. + // Need producer_meta_pb.has_add_table(), add_multiple_tables(), + // remove_table_id(), alter_table_id(). } - - RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), - "Updating cluster config in sys-catalog")); - l.Commit(); } return Status::OK(); @@ -6898,7 +6908,7 @@ Status CatalogManager::ValidateNewSchemaWithCdc(const TableInfo& table_info, } Status CatalogManager::ResumeCdcAfterNewSchema(const TableInfo& table_info) { - if (PREDICT_FALSE(!FLAGS_xcluster_wait_on_ddl_alter)) { + if (PREDICT_FALSE(!GetAtomicFlag(&FLAGS_xcluster_wait_on_ddl_alter))) { return Status::OK(); } @@ -6998,6 +7008,69 @@ Result CatalogManager::GetNumLiveTServersForActiveCluster() { return ts_descs.size(); } +Status CatalogManager::RunXClusterBgTasks() { + // Clean up Deleted CDC Streams on the Producer. + std::vector> streams; + WARN_NOT_OK(FindCDCStreamsMarkedAsDeleting(&streams), "Failed Finding Deleting CDC Streams"); + if (!streams.empty()) { + WARN_NOT_OK(CleanUpDeletedCDCStreams(streams), "Failed Cleaning Deleted CDC Streams"); + } + + // Clean up Failed Universes on the Consumer. + WARN_NOT_OK(ClearFailedUniverse(), "Failed Clearing Failed Universe"); + + // DELETING_METADATA special state is used by CDC, to do CDC streams metadata cleanup from + // cache as well as from the system catalog for the drop table scenario. + std::vector> cdcsdk_streams; + WARN_NOT_OK(FindCDCStreamsMarkedForMetadataDeletion(&cdcsdk_streams, + SysCDCStreamEntryPB::DELETING_METADATA), "Failed CDC Stream Metadata Deletion"); + if (!cdcsdk_streams.empty()) { + WARN_NOT_OK(CleanUpCDCStreamsMetadata(cdcsdk_streams), "Failed Cleanup CDC Streams Metadata"); + } + + // Restart xCluster parent tablet deletion bg task. + StartXClusterParentTabletDeletionTaskIfStopped(); + + // Run periodic task for namespace-level replications. + ScheduleXClusterNSReplicationAddTableTask(); + + if (PREDICT_FALSE(!GetAtomicFlag(&FLAGS_xcluster_wait_on_ddl_alter))) { + // See if any Streams are waiting on a pending_schema. + bool found_pending_schema = false; + auto cluster_config = ClusterConfig(); + auto cl = cluster_config->LockForWrite(); + auto producer_map = cl.mutable_data()->pb.mutable_consumer_registry()->mutable_producer_map(); + // For each user entry. + for (auto& producer_id_and_entry : *producer_map) { + // For each CDC stream in that Universe. + for (auto& stream_id_and_entry : *producer_id_and_entry.second.mutable_stream_map()) { + auto& stream_entry = stream_id_and_entry.second; + if (stream_entry.has_producer_schema() && + stream_entry.producer_schema().has_pending_schema()) { + // Force resume this stream. + auto schema = stream_entry.mutable_producer_schema(); + schema->set_validated_schema_version( + std::max(schema->validated_schema_version(), schema->pending_schema_version())); + schema->clear_pending_schema(); + + found_pending_schema = true; + LOG(INFO) << "Force Resume Consumer schema: " << stream_id_and_entry.first + << " @ schema version " << schema->pending_schema_version(); + } + } + } + + if (found_pending_schema) { + // Bump the ClusterConfig version so we'll broadcast new schema version & resume operation. + cl.mutable_data()->pb.set_version(cl.mutable_data()->pb.version() + 1); + RETURN_NOT_OK(CheckStatus(sys_catalog_->Upsert(leader_ready_term(), cluster_config.get()), + "updating cluster config after Schema for CDC")); + cl.Commit(); + } + } + return Status::OK(); +} + Status CatalogManager::ClearFailedUniverse() { // Delete a single failed universe from universes_to_clear_. if (PREDICT_FALSE(FLAGS_disable_universe_gc)) { diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 3139b153cfcf..b4e74045211b 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -488,7 +488,7 @@ DEFINE_bool(enable_delete_truncate_xcluster_replicated_table, false, "When set, enables deleting/truncating tables currently in xCluster replication"); TAG_FLAG(enable_delete_truncate_xcluster_replicated_table, runtime); -DEFINE_bool(xcluster_wait_on_ddl_alter, false, +DEFINE_bool(xcluster_wait_on_ddl_alter, true, "When xCluster replication sends a DDL change, wait for the user to enter a " "compatible/matching entry. Note: Can also set at runtime to resume after stall."); TAG_FLAG(xcluster_wait_on_ddl_alter, runtime); @@ -6255,7 +6255,7 @@ Status CatalogManager::AlterTable(const AlterTableRequestPB* req, } - if (FLAGS_xcluster_wait_on_ddl_alter && + if (GetAtomicFlag(&FLAGS_xcluster_wait_on_ddl_alter) && [&]() {SharedLock lock(mutex_); return IsTableCdcConsumer(*table);}()) { // If we're waiting for a Schema because we saw the a replication source with a change, // ensure this alter is compatible with what we're expecting. diff --git a/src/yb/master/catalog_manager_bg_tasks.cc b/src/yb/master/catalog_manager_bg_tasks.cc index eb7ea89d0b1a..17de875b315e 100644 --- a/src/yb/master/catalog_manager_bg_tasks.cc +++ b/src/yb/master/catalog_manager_bg_tasks.cc @@ -227,28 +227,10 @@ void CatalogManagerBgTasks::Run() { if (!to_delete.empty() || catalog_manager_->AreTablesDeleting()) { catalog_manager_->CleanUpDeletedTables(); } - std::vector> streams; - auto s = catalog_manager_->FindCDCStreamsMarkedAsDeleting(&streams); - if (s.ok() && !streams.empty()) { - s = catalog_manager_->CleanUpDeletedCDCStreams(streams); - } - - // Do a failed universe clean up - if (s.ok()) { - s = catalog_manager_->ClearFailedUniverse(); - } - // DELETING_METADATA special state is used by CDC, to do CDC streams metadata cleanup from - // cache as well as from the system catalog for the drop table scenario. - std::vector> cdcsdk_streams; - auto status_delete_metadata = catalog_manager_->FindCDCStreamsMarkedForMetadataDeletion( - &cdcsdk_streams, SysCDCStreamEntryPB::DELETING_METADATA); - if (status_delete_metadata.ok() && !cdcsdk_streams.empty()) { - status_delete_metadata = catalog_manager_->CleanUpCDCStreamsMetadata(cdcsdk_streams); - } // Ensure the master sys catalog tablet follows the cluster's affinity specification. if (FLAGS_sys_catalog_respect_affinity_task) { - s = catalog_manager_->SysCatalogRespectLeaderAffinity(); + Status s = catalog_manager_->SysCatalogRespectLeaderAffinity(); if (!s.ok()) { YB_LOG_EVERY_N(INFO, 10) << s.message().ToBuffer(); } @@ -259,11 +241,8 @@ void CatalogManagerBgTasks::Run() { catalog_manager_->StartTablespaceBgTaskIfStopped(); } - // Restart xCluster parent tablet deletion bg task. - catalog_manager_->StartXClusterParentTabletDeletionTaskIfStopped(); - - // Run periodic task for namespace-level replications. - catalog_manager_->ScheduleXClusterNSReplicationAddTableTask(); + // Run background tasks related to XCluster & CDC Schema. + WARN_NOT_OK(catalog_manager_->RunXClusterBgTasks(), "Failed XCluster Background Task"); was_leader_ = true; } else { diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index 43f548e6780b..95d359e18208 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -152,6 +152,9 @@ DECLARE_int32(pgsql_proxy_webserver_port); DEFINE_int64(inbound_rpc_memory_limit, 0, "Inbound RPC memory limit"); DEFINE_bool(tserver_enable_metrics_snapshotter, false, "Should metrics snapshotter be enabled"); + +DEFINE_test_flag(uint64, pg_auth_key, 0, "Forces an auth key for the postgres user when non-zero") + DECLARE_int32(num_concurrent_backfills_allowed); DECLARE_int32(svc_queue_length_default); @@ -335,7 +338,11 @@ Status TabletServer::Init() { // 5433 is kDefaultPort in src/yb/yql/pgwrapper/pg_wrapper.h. RETURN_NOT_OK(pgsql_proxy_bind_address_.ParseString(FLAGS_pgsql_proxy_bind_address, 5433)); - shared_object().SetPostgresAuthKey(RandomUniformInt()); + if (PREDICT_FALSE(FLAGS_TEST_pg_auth_key != 0)) { + shared_object().SetPostgresAuthKey(FLAGS_TEST_pg_auth_key); + } else { + shared_object().SetPostgresAuthKey(RandomUniformInt()); + } return Status::OK(); }