Skip to content

Commit

Permalink
[yugabyte#11017] Halt XCluster on ALTER by Default
Browse files Browse the repository at this point in the history
Summary: D17570 added DDL Halt support, disabled by default.  To enable this feature, we add additional unit tests to verify that Index, TableGroup, and CoLocation work.  Also, added the ability to resume any halted streams if this feature needs to be disabled due to a bug.

Test Plan:
    TwoDCYsqlTest.ReplicationWithBasicDDL
    TwoDCYsqlTest.ReplicationWithCreateIndexDDL
    TwoDCYsqlTest.ColocatedDatabaseReplication
    TwoDCYsqlTest.TablegroupReplication

Reviewers: rahuldesirazu, jhe, jmaley, hsunder

Reviewed By: hsunder

Subscribers: hsunder, slingam, bogdan

Differential Revision: https://phabricator.dev.yugabyte.com/D19093
  • Loading branch information
nspiegelberg committed Sep 29, 2022
1 parent 485d32a commit 9d956d0
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 116 deletions.
4 changes: 2 additions & 2 deletions ent/src/yb/integration-tests/twodc-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2174,7 +2174,7 @@ TEST_P(TwoDCTest, TestProducerUniverseExpansion) {
}

TEST_P(TwoDCTest, TestAlterDDLBasic) {
FLAGS_xcluster_wait_on_ddl_alter = true;
SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter);

uint32_t replication_factor = 1;
// Use just one tablet here to more easily catch lower-level write issues with this test.
Expand Down Expand Up @@ -2265,7 +2265,7 @@ TEST_P(TwoDCTest, TestAlterDDLBasic) {
}

TEST_P(TwoDCTest, TestAlterDDLWithRestarts) {
FLAGS_xcluster_wait_on_ddl_alter = true;
SetAtomicFlag(true, &FLAGS_xcluster_wait_on_ddl_alter);

uint32_t replication_factor = 3;
auto tables = ASSERT_RESULT(SetUpWithParams({1}, {1}, replication_factor, 2, 3));
Expand Down
53 changes: 47 additions & 6 deletions ent/src/yb/integration-tests/twodc_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "yb/master/master_ddl.proxy.h"
#include "yb/master/master_replication.proxy.h"
#include "yb/master/mini_master.h"
#include "yb/master/sys_catalog_initialization.h"
#include "yb/rpc/rpc_controller.h"
#include "yb/tserver/cdc_consumer.h"
#include "yb/tserver/mini_tablet_server.h"
Expand All @@ -43,7 +44,12 @@
#include "yb/yql/pgwrapper/pg_wrapper.h"

DECLARE_bool(enable_tablet_split_of_xcluster_replicated_tables);
DECLARE_bool(enable_ysql);
DECLARE_bool(hide_pg_catalog_table_creation_logs);
DECLARE_bool(master_auto_run_initdb);
DECLARE_int32(replication_factor);
DECLARE_int32(pggate_rpc_timeout_secs);
DECLARE_string(pgsql_proxy_bind_address);
DECLARE_int32(pgsql_proxy_webserver_port);

namespace yb {
Expand All @@ -54,39 +60,74 @@ using tserver::enterprise::CDCConsumer;

namespace enterprise {

Status TwoDCTestBase::InitClusters(const MiniClusterOptions& opts) {
Status TwoDCTestBase::InitClusters(const MiniClusterOptions& opts, bool init_postgres) {
FLAGS_replication_factor = static_cast<int>(opts.num_tablet_servers);
// Disable tablet split for regular tests, see xcluster-tablet-split-itest for those tests.
FLAGS_enable_tablet_split_of_xcluster_replicated_tables = false;
if (init_postgres) {
master::SetDefaultInitialSysCatalogSnapshotFlags();
FLAGS_enable_ysql = true;
FLAGS_hide_pg_catalog_table_creation_logs = true;
FLAGS_master_auto_run_initdb = true;
FLAGS_pggate_rpc_timeout_secs = 120;
}

auto producer_opts = opts;
producer_opts.cluster_id = "producer";

producer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(producer_opts);

// Randomly select the tserver index that will serve the postgres proxy.
const size_t pg_ts_idx = RandomUniformInt<size_t>(0, opts.num_tablet_servers - 1);
const std::string pg_addr = server::TEST_RpcAddress(pg_ts_idx + 1, server::Private::kTrue);

// The 'pgsql_proxy_bind_address' flag must be set before starting the producer cluster. Each
// tserver will store this address when it starts.
const uint16_t producer_pg_port = producer_cluster_.mini_cluster_->AllocateFreePort();
FLAGS_pgsql_proxy_bind_address = Format("$0:$1", pg_addr, producer_pg_port);

RETURN_NOT_OK(producer_cluster()->StartSync());

auto consumer_opts = opts;
consumer_opts.cluster_id = "consumer";
consumer_cluster_.mini_cluster_ = std::make_unique<MiniCluster>(consumer_opts);

RETURN_NOT_OK(producer_cluster()->StartSync());
// Use a new pg proxy port for the consumer cluster.
const uint16_t consumer_pg_port = consumer_cluster_.mini_cluster_->AllocateFreePort();
FLAGS_pgsql_proxy_bind_address = Format("$0:$1", pg_addr, consumer_pg_port);

RETURN_NOT_OK(consumer_cluster()->StartSync());

RETURN_NOT_OK(RunOnBothClusters([&opts](MiniCluster* cluster) {
return cluster->WaitForTabletServerCount(opts.num_tablet_servers);
}));

// Verify the that the selected tablets have their rpc servers bound to the expected pg addr.
CHECK_EQ(producer_cluster_.mini_cluster_->mini_tablet_server(pg_ts_idx)->bound_rpc_addr().
address().to_string(), pg_addr);
CHECK_EQ(consumer_cluster_.mini_cluster_->mini_tablet_server(pg_ts_idx)->bound_rpc_addr().
address().to_string(), pg_addr);

producer_cluster_.client_ = VERIFY_RESULT(producer_cluster()->CreateClient());
consumer_cluster_.client_ = VERIFY_RESULT(consumer_cluster()->CreateClient());

if (init_postgres) {
RETURN_NOT_OK(InitPostgres(&producer_cluster_, pg_ts_idx, producer_pg_port));
RETURN_NOT_OK(InitPostgres(&consumer_cluster_, pg_ts_idx, consumer_pg_port));
}

return Status::OK();
}

Status TwoDCTestBase::InitPostgres(Cluster* cluster) {
Status TwoDCTestBase::InitPostgres(Cluster* cluster, const size_t pg_ts_idx, uint16_t pg_port) {
RETURN_NOT_OK(WaitForInitDb(cluster->mini_cluster_.get()));
auto pg_ts = RandomElement(cluster->mini_cluster_->mini_tablet_servers());
auto port = cluster->mini_cluster_->AllocateFreePort();

tserver::MiniTabletServer *const pg_ts = cluster->mini_cluster_->mini_tablet_server(pg_ts_idx);
CHECK(pg_ts);

yb::pgwrapper::PgProcessConf pg_process_conf =
VERIFY_RESULT(yb::pgwrapper::PgProcessConf::CreateValidateAndRunInitDb(
yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), port)),
yb::ToString(Endpoint(pg_ts->bound_rpc_addr().address(), pg_port)),
pg_ts->options()->fs_opts.data_paths.front() + "/pg_data",
pg_ts->server()->GetSharedMemoryFd()));
pg_process_conf.master_addresses = pg_ts->options()->master_addresses_flag;
Expand Down
11 changes: 6 additions & 5 deletions ent/src/yb/integration-tests/twodc_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ class TwoDCTestBase : public YBTest {
FLAGS_flush_rocksdb_on_shutdown = false;
}

Status InitClusters(const MiniClusterOptions& opts);

// Not thread safe. FLAGS_pgsql_proxy_webserver_port is modified each time this is called so this
// is not safe to run in parallel.
Status InitPostgres(Cluster* cluster);
Status InitClusters(const MiniClusterOptions& opts, bool init_postgres = false);

void TearDown() override;

Expand Down Expand Up @@ -224,6 +220,11 @@ class TwoDCTestBase : public YBTest {
protected:
Cluster producer_cluster_;
Cluster consumer_cluster_;

private:
// Not thread safe. FLAGS_pgsql_proxy_webserver_port is modified each time this is called so this
// is not safe to run in parallel.
Status InitPostgres(Cluster* cluster, const size_t pg_ts_idx, uint16_t pg_port);
};

} // namespace enterprise
Expand Down
Loading

0 comments on commit 9d956d0

Please sign in to comment.