From 0d160a9d66dfe85401054a03e10e9ae5358a5c8c Mon Sep 17 00:00:00 2001 From: Quanzheng Long Date: Fri, 15 Nov 2019 11:36:03 -0800 Subject: [PATCH] Support Postgres in SQL CLI and make SQL pluggable for any database (#2828) --- Makefile | 8 + cmd/tools/sql/main.go | 2 + go.mod | 2 +- schema/mysql/v57/cadence/schema.sql | 6 +- .../mysql/v57/cadence/versioned/v0.1/base.sql | 6 +- schema/postgres/cadence/database.sql | 1 + schema/postgres/cadence/schema.sql | 255 ++++++++++++++++++ .../postgres/cadence/versioned/v0.1/base.sql | 232 ++++++++++++++++ .../cadence/versioned/v0.1/manifest.json | 8 + .../cadence/versioned/v0.2/manifest.json | 8 + .../postgres/cadence/versioned/v0.2/queue.sql | 12 + .../cadence/versioned/v0.3/manifest.json | 8 + .../versioned/v0.3/replication_tasks_dlq.sql | 9 + .../conn.go => schema/postgres/version.go | 36 +-- schema/postgres/visibility/database.sql | 1 + schema/postgres/visibility/schema.sql | 19 ++ .../visibility/versioned/v0.1/base.sql | 19 ++ .../visibility/versioned/v0.1/manifest.json | 8 + tools/cassandra/cqlclient.go | 4 +- tools/common/schema/types.go | 2 +- .../mysql}/conn_test.go | 36 +-- tools/sql-extensions/mysql/driver.go | 128 +++++++++ .../mysql}/handler_test.go | 27 +- .../mysql}/setuptask_test.go | 9 +- .../mysql}/updatetask_test.go | 13 +- .../mysql}/version_test.go | 26 +- tools/sql-extensions/postgres/driver.go | 134 +++++++++ tools/sql/README.md | 5 +- tools/sql/conn.go | 132 ++++----- tools/sql/handler.go | 91 ++++--- tools/sql/main.go | 8 +- 31 files changed, 1065 insertions(+), 190 deletions(-) create mode 100644 schema/postgres/cadence/database.sql create mode 100644 schema/postgres/cadence/schema.sql create mode 100644 schema/postgres/cadence/versioned/v0.1/base.sql create mode 100644 schema/postgres/cadence/versioned/v0.1/manifest.json create mode 100644 schema/postgres/cadence/versioned/v0.2/manifest.json create mode 100644 schema/postgres/cadence/versioned/v0.2/queue.sql create mode 100644 schema/postgres/cadence/versioned/v0.3/manifest.json create mode 100644 schema/postgres/cadence/versioned/v0.3/replication_tasks_dlq.sql rename tools/sql/mysql/conn.go => schema/postgres/version.go (55%) create mode 100644 schema/postgres/visibility/database.sql create mode 100644 schema/postgres/visibility/schema.sql create mode 100644 schema/postgres/visibility/versioned/v0.1/base.sql create mode 100644 schema/postgres/visibility/versioned/v0.1/manifest.json rename tools/{sql => sql-extensions/mysql}/conn_test.go (84%) create mode 100644 tools/sql-extensions/mysql/driver.go rename tools/{sql => sql-extensions/mysql}/handler_test.go (80%) rename tools/{sql => sql-extensions/mysql}/setuptask_test.go (87%) rename tools/{sql => sql-extensions/mysql}/updatetask_test.go (83%) rename tools/{sql => sql-extensions/mysql}/version_test.go (93%) create mode 100644 tools/sql-extensions/postgres/driver.go diff --git a/Makefile b/Makefile index 6f718d67f17..daad131560b 100644 --- a/Makefile +++ b/Makefile @@ -259,6 +259,14 @@ install-schema-mysql: bins ./cadence-sql-tool --ep 127.0.0.1 --db cadence_visibility setup-schema -v 0.0 ./cadence-sql-tool --ep 127.0.0.1 --db cadence_visibility update-schema -d ./schema/mysql/v57/visibility/versioned +install-schema-postgres: bins + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres create --db cadence + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence setup -v 0.0 + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence update-schema -d ./schema/postgres/cadence/versioned + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres create --db cadence_visibility + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence_visibility setup-schema -v 0.0 + ./cadence-sql-tool --ep 127.0.0.1 -p 5432 -u postgres -pw cadence --dr postgres --db cadence_visibility update-schema -d ./schema/postgres/visibility/versioned + start: bins ./cadence-server start diff --git a/cmd/tools/sql/main.go b/cmd/tools/sql/main.go index e32604e5fa5..c2f2daf9281 100644 --- a/cmd/tools/sql/main.go +++ b/cmd/tools/sql/main.go @@ -24,6 +24,8 @@ import ( "os" "github.com/uber/cadence/tools/sql" + _ "github.com/uber/cadence/tools/sql-extensions/mysql" // needed to load mysql extensions + _ "github.com/uber/cadence/tools/sql-extensions/postgres" // needed to load postgres extensions ) func main() { diff --git a/go.mod b/go.mod index 7ff028cc88a..e821a5ccc74 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/jmoiron/sqlx v1.2.0 github.com/jonboulle/clockwork v0.1.0 github.com/konsorten/go-windows-terminal-sequences v1.0.2 // indirect - github.com/lib/pq v1.2.0 // indirect + github.com/lib/pq v1.2.0 github.com/m3db/prometheus_client_golang v0.8.1 github.com/m3db/prometheus_client_model v0.1.0 // indirect github.com/m3db/prometheus_common v0.1.0 // indirect diff --git a/schema/mysql/v57/cadence/schema.sql b/schema/mysql/v57/cadence/schema.sql index 431e21e6f3c..d49bd62525c 100644 --- a/schema/mysql/v57/cadence/schema.sql +++ b/schema/mysql/v57/cadence/schema.sql @@ -3,9 +3,9 @@ CREATE TABLE domains( id BINARY(16) NOT NULL, name VARCHAR(255) UNIQUE NOT NULL, -- - data BLOB NOT NULL, - data_encoding VARCHAR(16) NOT NULL, - is_global TINYINT(1) NOT NULL, + data BLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + is_global TINYINT(1) NOT NULL, PRIMARY KEY(shard_id, id) ); diff --git a/schema/mysql/v57/cadence/versioned/v0.1/base.sql b/schema/mysql/v57/cadence/versioned/v0.1/base.sql index f5fed9c0d7f..e215753cad4 100644 --- a/schema/mysql/v57/cadence/versioned/v0.1/base.sql +++ b/schema/mysql/v57/cadence/versioned/v0.1/base.sql @@ -3,9 +3,9 @@ CREATE TABLE domains( id BINARY(16) NOT NULL, name VARCHAR(255) UNIQUE NOT NULL, -- - data BLOB NOT NULL, - data_encoding VARCHAR(16) NOT NULL, - is_global TINYINT(1) NOT NULL, + data BLOB NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + is_global TINYINT(1) NOT NULL, PRIMARY KEY(shard_id, id) ); diff --git a/schema/postgres/cadence/database.sql b/schema/postgres/cadence/database.sql new file mode 100644 index 00000000000..7af10d98f1e --- /dev/null +++ b/schema/postgres/cadence/database.sql @@ -0,0 +1 @@ +CREATE DATABASE cadence; \ No newline at end of file diff --git a/schema/postgres/cadence/schema.sql b/schema/postgres/cadence/schema.sql new file mode 100644 index 00000000000..eebaeed4bc3 --- /dev/null +++ b/schema/postgres/cadence/schema.sql @@ -0,0 +1,255 @@ +CREATE TABLE domains( + shard_id INTEGER NOT NULL DEFAULT 54321, + id BYTEA NOT NULL, + name VARCHAR(255) UNIQUE NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + is_global SMALLINT NOT NULL, + PRIMARY KEY(shard_id, id) +); + +CREATE TABLE domain_metadata ( + notification_version BIGINT NOT NULL +); + +INSERT INTO domain_metadata (notification_version) VALUES (1); + +CREATE TABLE shards ( + shard_id INTEGER NOT NULL, + -- + range_id BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id) +); + +CREATE TABLE transfer_tasks( + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, task_id) +); + +CREATE TABLE executions( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + -- + next_event_id BIGINT NOT NULL, + last_write_version BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id) +); + +CREATE TABLE current_executions( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + -- + run_id BYTEA NOT NULL, + create_request_id VARCHAR(64) NOT NULL, + state INTEGER NOT NULL, + close_status INTEGER NOT NULL, + start_version BIGINT NOT NULL, + last_write_version BIGINT NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id) +); + +CREATE TABLE buffered_events ( + id BIGSERIAL NOT NULL, + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX buffered_events_by_events_ids ON buffered_events(shard_id, domain_id, workflow_id, run_id); + +CREATE TABLE tasks ( + domain_id BYTEA NOT NULL, + task_list_name VARCHAR(255) NOT NULL, + task_type SMALLINT NOT NULL, -- {Activity, Decision} + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (domain_id, task_list_name, task_type, task_id) +); + +CREATE TABLE task_lists ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + name VARCHAR(255) NOT NULL, + task_type SMALLINT NOT NULL, -- {Activity, Decision} + -- + range_id BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, domain_id, name, task_type) +); + +CREATE TABLE replication_tasks ( + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, task_id) +); + +CREATE TABLE replication_tasks_dlq ( + source_cluster_name VARCHAR(255) NOT NULL, + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (source_cluster_name, shard_id, task_id) +); + +CREATE TABLE timer_tasks ( + shard_id INTEGER NOT NULL, + visibility_timestamp TIMESTAMP NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, visibility_timestamp, task_id) +); + +CREATE TABLE activity_info_maps ( +-- each row corresponds to one key of one map + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + schedule_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + last_heartbeat_details BYTEA, + last_heartbeat_updated_time TIMESTAMP NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, schedule_id) +); + +CREATE TABLE timer_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + timer_id VARCHAR(255) NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, timer_id) +); + +CREATE TABLE child_execution_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE request_cancel_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE signal_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE buffered_replication_task_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + first_event_id BIGINT NOT NULL, +-- + version BIGINT NOT NULL, + next_event_id BIGINT NOT NULL, + history BYTEA, + history_encoding VARCHAR(16) NOT NULL, + new_run_history BYTEA, + new_run_history_encoding VARCHAR(16) NOT NULL DEFAULT 'json', + event_store_version INTEGER NOT NULL, -- indiciates which version of event store to query + new_run_event_store_version INTEGER NOT NULL, -- indiciates which version of event store to query for new run(continueAsNew) + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, first_event_id) +); + +CREATE TABLE signals_requested_sets ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + signal_id VARCHAR(64) NOT NULL, + -- + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, signal_id) +); + +-- history eventsV2: history_node stores history event data +CREATE TABLE history_node ( + shard_id INTEGER NOT NULL, + tree_id BYTEA NOT NULL, + branch_id BYTEA NOT NULL, + node_id BIGINT NOT NULL, + txn_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, tree_id, branch_id, node_id, txn_id) +); + +-- history eventsV2: history_tree stores branch metadata +CREATE TABLE history_tree ( + shard_id INTEGER NOT NULL, + tree_id BYTEA NOT NULL, + branch_id BYTEA NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, tree_id, branch_id) +); + +CREATE TABLE queue ( + queue_type INTEGER NOT NULL, + message_id BIGINT NOT NULL, + message_payload BYTEA NOT NULL, + PRIMARY KEY(queue_type, message_id) +); + +CREATE TABLE queue_metadata ( + queue_type INTEGER NOT NULL, + data BYTEA NOT NULL, + PRIMARY KEY(queue_type) +); diff --git a/schema/postgres/cadence/versioned/v0.1/base.sql b/schema/postgres/cadence/versioned/v0.1/base.sql new file mode 100644 index 00000000000..e5dd990fb4a --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.1/base.sql @@ -0,0 +1,232 @@ +CREATE TABLE domains( + shard_id INTEGER NOT NULL DEFAULT 54321, + id BYTEA NOT NULL, + name VARCHAR(255) UNIQUE NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + is_global SMALLINT NOT NULL, + PRIMARY KEY(shard_id, id) +); + +CREATE TABLE domain_metadata ( + notification_version BIGINT NOT NULL +); + +INSERT INTO domain_metadata (notification_version) VALUES (1); + +CREATE TABLE shards ( + shard_id INTEGER NOT NULL, + -- + range_id BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id) +); + +CREATE TABLE transfer_tasks( + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, task_id) +); + +CREATE TABLE executions( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + -- + next_event_id BIGINT NOT NULL, + last_write_version BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id) +); + +CREATE TABLE current_executions( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + -- + run_id BYTEA NOT NULL, + create_request_id VARCHAR(64) NOT NULL, + state INTEGER NOT NULL, + close_status INTEGER NOT NULL, + start_version BIGINT NOT NULL, + last_write_version BIGINT NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id) +); + +CREATE TABLE buffered_events ( + id BIGSERIAL NOT NULL, + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (id) +); + +CREATE INDEX buffered_events_by_events_ids ON buffered_events(shard_id, domain_id, workflow_id, run_id); + +CREATE TABLE tasks ( + domain_id BYTEA NOT NULL, + task_list_name VARCHAR(255) NOT NULL, + task_type SMALLINT NOT NULL, -- {Activity, Decision} + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (domain_id, task_list_name, task_type, task_id) +); + +CREATE TABLE task_lists ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + name VARCHAR(255) NOT NULL, + task_type SMALLINT NOT NULL, -- {Activity, Decision} + -- + range_id BIGINT NOT NULL, + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, domain_id, name, task_type) +); + +CREATE TABLE replication_tasks ( + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, task_id) +); + +CREATE TABLE timer_tasks ( + shard_id INTEGER NOT NULL, + visibility_timestamp TIMESTAMP NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, visibility_timestamp, task_id) +); + +CREATE TABLE activity_info_maps ( +-- each row corresponds to one key of one map + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + schedule_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + last_heartbeat_details BYTEA, + last_heartbeat_updated_time TIMESTAMP NOT NULL, + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, schedule_id) +); + +CREATE TABLE timer_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + timer_id VARCHAR(255) NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, timer_id) +); + +CREATE TABLE child_execution_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE request_cancel_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE signal_info_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + initiated_id BIGINT NOT NULL, +-- + data BYTEA NOT NULL, + data_encoding VARCHAR(16), + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, initiated_id) +); + +CREATE TABLE buffered_replication_task_maps ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + first_event_id BIGINT NOT NULL, +-- + version BIGINT NOT NULL, + next_event_id BIGINT NOT NULL, + history BYTEA, + history_encoding VARCHAR(16) NOT NULL, + new_run_history BYTEA, + new_run_history_encoding VARCHAR(16) NOT NULL DEFAULT 'json', + event_store_version INTEGER NOT NULL, -- indiciates which version of event store to query + new_run_event_store_version INTEGER NOT NULL, -- indiciates which version of event store to query for new run(continueAsNew) + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, first_event_id) +); + +CREATE TABLE signals_requested_sets ( + shard_id INTEGER NOT NULL, + domain_id BYTEA NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + run_id BYTEA NOT NULL, + signal_id VARCHAR(64) NOT NULL, + -- + PRIMARY KEY (shard_id, domain_id, workflow_id, run_id, signal_id) +); + +-- history eventsV2: history_node stores history event data +CREATE TABLE history_node ( + shard_id INTEGER NOT NULL, + tree_id BYTEA NOT NULL, + branch_id BYTEA NOT NULL, + node_id BIGINT NOT NULL, + txn_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, tree_id, branch_id, node_id, txn_id) +); + +-- history eventsV2: history_tree stores branch metadata +CREATE TABLE history_tree ( + shard_id INTEGER NOT NULL, + tree_id BYTEA NOT NULL, + branch_id BYTEA NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (shard_id, tree_id, branch_id) +); diff --git a/schema/postgres/cadence/versioned/v0.1/manifest.json b/schema/postgres/cadence/versioned/v0.1/manifest.json new file mode 100644 index 00000000000..789002c0eef --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.1/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.1", + "MinCompatibleVersion": "0.1", + "Description": "base version of schema", + "SchemaUpdateCqlFiles": [ + "base.sql" + ] +} diff --git a/schema/postgres/cadence/versioned/v0.2/manifest.json b/schema/postgres/cadence/versioned/v0.2/manifest.json new file mode 100644 index 00000000000..941a776dd39 --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.2/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.2", + "MinCompatibleVersion": "0.2", + "Description": "add queue and queue_metadata table", + "SchemaUpdateCqlFiles": [ + "queue.sql" + ] +} diff --git a/schema/postgres/cadence/versioned/v0.2/queue.sql b/schema/postgres/cadence/versioned/v0.2/queue.sql new file mode 100644 index 00000000000..694ac7e10e9 --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.2/queue.sql @@ -0,0 +1,12 @@ +CREATE TABLE queue ( + queue_type INTEGER NOT NULL, + message_id BIGINT NOT NULL, + message_payload BYTEA NOT NULL, + PRIMARY KEY(queue_type, message_id) +); + +CREATE TABLE queue_metadata ( + queue_type INTEGER NOT NULL, + data BYTEA NOT NULL, + PRIMARY KEY(queue_type) +); diff --git a/schema/postgres/cadence/versioned/v0.3/manifest.json b/schema/postgres/cadence/versioned/v0.3/manifest.json new file mode 100644 index 00000000000..ff6fc2d483c --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.3/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.3", + "MinCompatibleVersion": "0.3", + "Description": "add replication_tasks_dlq table", + "SchemaUpdateCqlFiles": [ + "replication_tasks_dlq.sql" + ] +} diff --git a/schema/postgres/cadence/versioned/v0.3/replication_tasks_dlq.sql b/schema/postgres/cadence/versioned/v0.3/replication_tasks_dlq.sql new file mode 100644 index 00000000000..914debcffe2 --- /dev/null +++ b/schema/postgres/cadence/versioned/v0.3/replication_tasks_dlq.sql @@ -0,0 +1,9 @@ +CREATE TABLE replication_tasks_dlq ( + source_cluster_name VARCHAR(255) NOT NULL, + shard_id INTEGER NOT NULL, + task_id BIGINT NOT NULL, + -- + data BYTEA NOT NULL, + data_encoding VARCHAR(16) NOT NULL, + PRIMARY KEY (source_cluster_name, shard_id, task_id) +); \ No newline at end of file diff --git a/tools/sql/mysql/conn.go b/schema/postgres/version.go similarity index 55% rename from tools/sql/mysql/conn.go rename to schema/postgres/version.go index c543a8290ab..1124819d0b0 100644 --- a/tools/sql/mysql/conn.go +++ b/schema/postgres/version.go @@ -1,4 +1,4 @@ -// Copyright (c) 2017 Uber Technologies, Inc. +// Copyright (c) 2019 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -20,32 +20,14 @@ package mysql -import ( - "fmt" - "strings" +import "github.com/uber/cadence/schema/mysql" - "github.com/iancoleman/strcase" - "github.com/jmoiron/sqlx" -) +// NOTE: whenever there is a new data base schema update, plz update the following versions -const ( - dataSourceName = "%s:%s@%v(%v)/%s?multiStatements=true&parseTime=true&clientFoundRows=true" -) +// Version is the Postgres database release version +// Cadence supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres +const Version = mysql.Version -// DriverName refers to the name of the mysql driver -const DriverName = "mysql" - -// NewConnection returns a new connection to mysql database -func NewConnection(host string, port int, user string, passwd string, database string) (*sqlx.DB, error) { - addr := fmt.Sprintf("%v:%v", host, port) - if strings.Contains(host, ":") && port == 0 { - addr = host - } - db, err := sqlx.Connect(DriverName, fmt.Sprintf(dataSourceName, user, passwd, "tcp", addr, database)) - if err != nil { - return nil, err - } - // Maps struct names in CamelCase to snake without need for db struct tags. - db.MapperFunc(strcase.ToSnake) - return db, nil -} +// VisibilityVersion is the Postgres visibility database release version +// Cadence supports both MySQL and Postgres officially, so upgrade should be perform for both MySQL and Postgres +const VisibilityVersion = mysql.VisibilityVersion diff --git a/schema/postgres/visibility/database.sql b/schema/postgres/visibility/database.sql new file mode 100644 index 00000000000..dc53ab9e8ef --- /dev/null +++ b/schema/postgres/visibility/database.sql @@ -0,0 +1 @@ +CREATE DATABASE cadence_visibility; \ No newline at end of file diff --git a/schema/postgres/visibility/schema.sql b/schema/postgres/visibility/schema.sql new file mode 100644 index 00000000000..8e3105140f3 --- /dev/null +++ b/schema/postgres/visibility/schema.sql @@ -0,0 +1,19 @@ +CREATE TABLE executions_visibility ( + domain_id CHAR(64) NOT NULL, + run_id CHAR(64) NOT NULL, + start_time TIMESTAMP NOT NULL, + execution_time TIMESTAMP NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + workflow_type_name VARCHAR(255) NOT NULL, + close_status INTEGER, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + close_time TIMESTAMP NULL, + history_length BIGINT, + memo BYTEA, + encoding VARCHAR(64) NOT NULL, + + PRIMARY KEY (domain_id, run_id) +); + +CREATE INDEX by_type_start_time ON executions_visibility (domain_id, workflow_type_name, close_status, start_time DESC, run_id); +CREATE INDEX by_workflow_id_start_time ON executions_visibility (domain_id, workflow_id, close_status, start_time DESC, run_id); +CREATE INDEX by_status_by_close_time ON executions_visibility (domain_id, close_status, start_time DESC, run_id); diff --git a/schema/postgres/visibility/versioned/v0.1/base.sql b/schema/postgres/visibility/versioned/v0.1/base.sql new file mode 100644 index 00000000000..8e3105140f3 --- /dev/null +++ b/schema/postgres/visibility/versioned/v0.1/base.sql @@ -0,0 +1,19 @@ +CREATE TABLE executions_visibility ( + domain_id CHAR(64) NOT NULL, + run_id CHAR(64) NOT NULL, + start_time TIMESTAMP NOT NULL, + execution_time TIMESTAMP NOT NULL, + workflow_id VARCHAR(255) NOT NULL, + workflow_type_name VARCHAR(255) NOT NULL, + close_status INTEGER, -- enum WorkflowExecutionCloseStatus {COMPLETED, FAILED, CANCELED, TERMINATED, CONTINUED_AS_NEW, TIMED_OUT} + close_time TIMESTAMP NULL, + history_length BIGINT, + memo BYTEA, + encoding VARCHAR(64) NOT NULL, + + PRIMARY KEY (domain_id, run_id) +); + +CREATE INDEX by_type_start_time ON executions_visibility (domain_id, workflow_type_name, close_status, start_time DESC, run_id); +CREATE INDEX by_workflow_id_start_time ON executions_visibility (domain_id, workflow_id, close_status, start_time DESC, run_id); +CREATE INDEX by_status_by_close_time ON executions_visibility (domain_id, close_status, start_time DESC, run_id); diff --git a/schema/postgres/visibility/versioned/v0.1/manifest.json b/schema/postgres/visibility/versioned/v0.1/manifest.json new file mode 100644 index 00000000000..442e3af5ce4 --- /dev/null +++ b/schema/postgres/visibility/versioned/v0.1/manifest.json @@ -0,0 +1,8 @@ +{ + "CurrVersion": "0.1", + "MinCompatibleVersion": "0.1", + "Description": "base version of visibility schema", + "SchemaUpdateCqlFiles": [ + "base.sql" + ] +} \ No newline at end of file diff --git a/tools/cassandra/cqlclient.go b/tools/cassandra/cqlclient.go index d4be3a05b48..cefd8f20281 100644 --- a/tools/cassandra/cqlclient.go +++ b/tools/cassandra/cqlclient.go @@ -201,8 +201,8 @@ func (client *cqlClient) WriteSchemaUpdateLog(oldVersion string, newVersion stri } // Exec executes a cql statement -func (client *cqlClient) Exec(stmt string) error { - return client.session.Query(stmt).Exec() +func (client *cqlClient) Exec(stmt string, args ...interface{}) error { + return client.session.Query(stmt, args...).Exec() } // Close closes the cql client diff --git a/tools/common/schema/types.go b/tools/common/schema/types.go index 73e6e174fff..589776bf7ca 100644 --- a/tools/common/schema/types.go +++ b/tools/common/schema/types.go @@ -51,7 +51,7 @@ type ( // for the schema-tool to work DB interface { // Exec executes a cql statement - Exec(stmt string) error + Exec(stmt string, args ...interface{}) error // DropAllTables drops all tables DropAllTables() error // CreateSchemaVersionTables sets up the schema version tables diff --git a/tools/sql/conn_test.go b/tools/sql-extensions/mysql/conn_test.go similarity index 84% rename from tools/sql/conn_test.go rename to tools/sql-extensions/mysql/conn_test.go index 9fc8877fa2f..ba8f5f15b5c 100644 --- a/tools/sql/conn_test.go +++ b/tools/sql-extensions/mysql/conn_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package sql +package mysql import ( "testing" @@ -29,7 +29,7 @@ import ( "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/environment" "github.com/uber/cadence/tools/common/schema/test" - "github.com/uber/cadence/tools/sql/mysql" + "github.com/uber/cadence/tools/sql" ) type ( @@ -38,7 +38,7 @@ type ( } ) -var _ test.DB = (*sqlConn)(nil) +var _ test.DB = (*sql.Connection)(nil) const ( testUser = "uber" @@ -70,13 +70,13 @@ func (s *SQLConnTestSuite) TestParseCQLFile() { } func (s *SQLConnTestSuite) TestSQLConn() { - conn, err := newConn(&sqlConnectParams{ - host: environment.GetMySQLAddress(), - port: environment.GetMySQLPort(), - user: testUser, - password: testPassword, - driverName: mysql.DriverName, - database: s.DBName, + conn, err := sql.NewConnection(&sql.ConnectParams{ + Host: environment.GetMySQLAddress(), + Port: environment.GetMySQLPort(), + User: testUser, + Password: testPassword, + DriverName: driverName, + Database: s.DBName, }) s.Nil(err) s.RunCreateTest(conn) @@ -85,14 +85,14 @@ func (s *SQLConnTestSuite) TestSQLConn() { conn.Close() } -func newTestConn(database string) (*sqlConn, error) { - return newConn(&sqlConnectParams{ - host: environment.GetMySQLAddress(), - port: environment.GetMySQLPort(), - user: testUser, - password: testPassword, - driverName: mysql.DriverName, - database: database, +func newTestConn(database string) (*sql.Connection, error) { + return sql.NewConnection(&sql.ConnectParams{ + Host: environment.GetMySQLAddress(), + Port: environment.GetMySQLPort(), + User: testUser, + Password: testPassword, + DriverName: driverName, + Database: database, }) } diff --git a/tools/sql-extensions/mysql/driver.go b/tools/sql-extensions/mysql/driver.go new file mode 100644 index 00000000000..3fa0cac03e0 --- /dev/null +++ b/tools/sql-extensions/mysql/driver.go @@ -0,0 +1,128 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package mysql + +import ( + "fmt" + + _ "github.com/go-sql-driver/mysql" // needed to load the mysql driver + + "github.com/iancoleman/strcase" + "github.com/jmoiron/sqlx" + + "github.com/uber/cadence/tools/sql" +) + +const ( + // DriverName refers to the name of the mysql driver + driverName = "mysql" + + dataSourceNameMySQL = "%s:%s@%v(%v:%v)/%s?multiStatements=true&parseTime=true&clientFoundRows=true" + + readSchemaVersionMySQL = `SELECT curr_version from schema_version where db_name=?` + + writeSchemaVersionMySQL = `REPLACE into schema_version(db_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)` + + writeSchemaUpdateHistoryMySQL = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)` + + createSchemaVersionTableMySQL = `CREATE TABLE schema_version(db_name VARCHAR(255) not null PRIMARY KEY, ` + + `creation_time DATETIME(6), ` + + `curr_version VARCHAR(64), ` + + `min_compatible_version VARCHAR(64));` + + createSchemaUpdateHistoryTableMySQL = `CREATE TABLE schema_update_history(` + + `year int not null, ` + + `month int not null, ` + + `update_time DATETIME(6) not null, ` + + `description VARCHAR(255), ` + + `manifest_md5 VARCHAR(64), ` + + `new_version VARCHAR(64), ` + + `old_version VARCHAR(64), ` + + `PRIMARY KEY (year, month, update_time));` + + //NOTE we have to use %v because somehow mysql doesn't work with ? here + createDatabaseMySQL = "CREATE database %v CHARACTER SET UTF8" + + dropDatabaseMySQL = "Drop database %v" + + listTablesMySQL = "SHOW TABLES FROM %v" + + dropTableMySQL = "DROP TABLE %v" +) + +type driver struct{} + +var _ sql.Driver = (*driver)(nil) + +func init() { + sql.RegisterDriver(driverName, &driver{}) +} + +func (d *driver) GetDriverName() string { + return driverName +} + +func (d *driver) CreateDBConnection(driverName, host string, port int, user string, passwd string, database string) (*sqlx.DB, error) { + db, err := sqlx.Connect(driverName, fmt.Sprintf(dataSourceNameMySQL, user, passwd, "tcp", host, port, database)) + + if err != nil { + return nil, err + } + // Maps struct names in CamelCase to snake without need for db struct tags. + db.MapperFunc(strcase.ToSnake) + return db, nil +} + +func (d *driver) GetReadSchemaVersionSQL() string { + return readSchemaVersionMySQL +} + +func (d *driver) GetWriteSchemaVersionSQL() string { + return writeSchemaVersionMySQL +} + +func (d *driver) GetWriteSchemaUpdateHistorySQL() string { + return writeSchemaUpdateHistoryMySQL +} + +func (d *driver) GetCreateSchemaVersionTableSQL() string { + return createSchemaVersionTableMySQL +} + +func (d *driver) GetCreateSchemaUpdateHistoryTableSQL() string { + return createSchemaUpdateHistoryTableMySQL +} + +func (d *driver) GetCreateDatabaseSQL() string { + return createDatabaseMySQL +} + +func (d *driver) GetDropDatabaseSQL() string { + return dropDatabaseMySQL +} + +func (d *driver) GetListTablesSQL() string { + return listTablesMySQL +} + +func (d *driver) GetDropTableSQL() string { + return dropTableMySQL +} diff --git a/tools/sql/handler_test.go b/tools/sql-extensions/mysql/handler_test.go similarity index 80% rename from tools/sql/handler_test.go rename to tools/sql-extensions/mysql/handler_test.go index da96027973b..2fc40e6d721 100644 --- a/tools/sql/handler_test.go +++ b/tools/sql-extensions/mysql/handler_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package sql +package mysql import ( "testing" @@ -28,6 +28,7 @@ import ( "github.com/uber/cadence/environment" "github.com/uber/cadence/tools/common/schema" + "github.com/uber/cadence/tools/sql" ) type ( @@ -46,16 +47,16 @@ func (s *HandlerTestSuite) SetupTest() { } func (s *HandlerTestSuite) TestValidateConnectParams() { - p := new(sqlConnectParams) - s.NotNil(validateConnectParams(p, false)) - s.NotNil(validateConnectParams(p, true)) - - p.host = environment.GetMySQLAddress() - s.NotNil(validateConnectParams(p, false)) - s.Nil(validateConnectParams(p, true)) - s.Equal(schema.DryrunDBName, p.database) - - p.database = "foobar" - s.Nil(validateConnectParams(p, false)) - s.Nil(validateConnectParams(p, true)) + p := new(sql.ConnectParams) + s.NotNil(sql.ValidateConnectParams(p, false)) + s.NotNil(sql.ValidateConnectParams(p, true)) + + p.Host = environment.GetMySQLAddress() + s.NotNil(sql.ValidateConnectParams(p, false)) + s.Nil(sql.ValidateConnectParams(p, true)) + s.Equal(schema.DryrunDBName, p.Database) + + p.Database = "foobar" + s.Nil(sql.ValidateConnectParams(p, false)) + s.Nil(sql.ValidateConnectParams(p, true)) } diff --git a/tools/sql/setuptask_test.go b/tools/sql-extensions/mysql/setuptask_test.go similarity index 87% rename from tools/sql/setuptask_test.go rename to tools/sql-extensions/mysql/setuptask_test.go index 27ce7d7a8d1..08fafe42fe9 100644 --- a/tools/sql/setuptask_test.go +++ b/tools/sql-extensions/mysql/setuptask_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package sql +package mysql import ( "os" @@ -29,12 +29,13 @@ import ( "github.com/uber/cadence/environment" "github.com/uber/cadence/tools/common/schema/test" + "github.com/uber/cadence/tools/sql" ) type ( SetupSchemaTestSuite struct { test.SetupSchemaTestBase - conn *sqlConn + conn *sql.Connection } ) @@ -59,7 +60,7 @@ func (s *SetupSchemaTestSuite) TearDownSuite() { } func (s *SetupSchemaTestSuite) TestCreateDatabase() { - RunTool([]string{"./tool", "-u", testUser, "--pw", testPassword, "create", "--db", "foobar123"}) + sql.RunTool([]string{"./tool", "-u", testUser, "--pw", testPassword, "create", "--db", "foobar123"}) err := s.conn.DropDatabase("foobar123") s.Nil(err) } @@ -67,5 +68,5 @@ func (s *SetupSchemaTestSuite) TestCreateDatabase() { func (s *SetupSchemaTestSuite) TestSetupSchema() { conn, err := newTestConn(s.DBName) s.Nil(err) - s.RunSetupTest(buildCLIOptions(), conn, "--db", createTestSQLFileContent(), []string{"task_maps", "tasks"}) + s.RunSetupTest(sql.BuildCLIOptions(), conn, "--db", createTestSQLFileContent(), []string{"task_maps", "tasks"}) } diff --git a/tools/sql/updatetask_test.go b/tools/sql-extensions/mysql/updatetask_test.go similarity index 83% rename from tools/sql/updatetask_test.go rename to tools/sql-extensions/mysql/updatetask_test.go index 7daf7635801..8ee6cd49ace 100644 --- a/tools/sql/updatetask_test.go +++ b/tools/sql-extensions/mysql/updatetask_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package sql +package mysql import ( "log" @@ -30,6 +30,7 @@ import ( "github.com/uber/cadence/environment" "github.com/uber/cadence/schema/mysql" "github.com/uber/cadence/tools/common/schema/test" + "github.com/uber/cadence/tools/sql" ) type UpdateSchemaTestSuite struct { @@ -59,21 +60,21 @@ func (s *UpdateSchemaTestSuite) TestUpdateSchema() { conn, err := newTestConn(s.DBName) s.Nil(err) defer conn.Close() - s.RunUpdateSchemaTest(buildCLIOptions(), conn, "--db", createTestSQLFileContent(), []string{"task_maps", "tasks"}) + s.RunUpdateSchemaTest(sql.BuildCLIOptions(), conn, "--db", createTestSQLFileContent(), []string{"task_maps", "tasks"}) } func (s *UpdateSchemaTestSuite) TestDryrun() { conn, err := newTestConn(s.DBName) s.Nil(err) defer conn.Close() - dir := "../../schema/mysql/v57/cadence/versioned" - s.RunDryrunTest(buildCLIOptions(), conn, "--db", dir, mysql.Version) + dir := "../../../schema/mysql/v57/cadence/versioned" + s.RunDryrunTest(sql.BuildCLIOptions(), conn, "--db", dir, mysql.Version) } func (s *UpdateSchemaTestSuite) TestVisibilityDryrun() { conn, err := newTestConn(s.DBName) s.Nil(err) defer conn.Close() - dir := "../../schema/mysql/v57/visibility/versioned" - s.RunDryrunTest(buildCLIOptions(), conn, "--db", dir, mysql.VisibilityVersion) + dir := "../../../schema/mysql/v57/visibility/versioned" + s.RunDryrunTest(sql.BuildCLIOptions(), conn, "--db", dir, mysql.VisibilityVersion) } diff --git a/tools/sql/version_test.go b/tools/sql-extensions/mysql/version_test.go similarity index 93% rename from tools/sql/version_test.go rename to tools/sql-extensions/mysql/version_test.go index 2cde8333dd1..3a26dafb7ea 100644 --- a/tools/sql/version_test.go +++ b/tools/sql-extensions/mysql/version_test.go @@ -18,7 +18,7 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -package sql +package mysql import ( "fmt" @@ -38,7 +38,7 @@ import ( "github.com/uber/cadence/common/service/config" "github.com/uber/cadence/common/service/dynamicconfig" "github.com/uber/cadence/environment" - "github.com/uber/cadence/tools/sql/mysql" + "github.com/uber/cadence/tools/sql" ) type ( @@ -61,20 +61,20 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() { visDatabase := "cadence_visibility_test" _, filename, _, ok := runtime.Caller(0) s.True(ok) - root := path.Dir(path.Dir(path.Dir(filename))) + root := path.Dir(path.Dir(path.Dir(path.Dir(filename)))) sqlFile := path.Join(root, "schema/mysql/v57/cadence/schema.sql") visSQLFile := path.Join(root, "schema/mysql/v57/visibility/schema.sql") defer s.createDatabase(database)() defer s.createDatabase(visDatabase)() - err := RunTool([]string{ + err := sql.RunTool([]string{ "./tool", "-ep", environment.GetMySQLAddress(), "-p", strconv.Itoa(environment.GetMySQLPort()), "-u", testUser, "-pw", testPassword, "-db", database, - "-dr", mysql.DriverName, + "-dr", driverName, "-q", "setup-schema", "-f", sqlFile, @@ -82,14 +82,14 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() { "-o", }) s.NoError(err) - err = RunTool([]string{ + err = sql.RunTool([]string{ "./tool", "-ep", environment.GetMySQLAddress(), "-p", strconv.Itoa(environment.GetMySQLPort()), "-u", testUser, "-pw", testPassword, "-db", visDatabase, - "-dr", mysql.DriverName, + "-dr", driverName, "-q", "setup-schema", "-f", visSQLFile, @@ -102,7 +102,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() { ConnectAddr: fmt.Sprintf("%v:%v", environment.GetMySQLAddress(), environment.GetMySQLPort()), User: testUser, Password: testPassword, - DriverName: mysql.DriverName, + DriverName: driverName, DatabaseName: database, } visibilityCfg := defaultCfg @@ -116,7 +116,7 @@ func (s *VersionTestSuite) TestVerifyCompatibleVersion() { }, TransactionSizeLimit: dynamicconfig.GetIntPropertyFn(common.DefaultTransactionSizeLimit), } - s.NoError(VerifyCompatibleVersion(cfg)) + s.NoError(sql.VerifyCompatibleVersion(cfg)) } func (s *VersionTestSuite) TestCheckCompatibleVersion() { @@ -168,14 +168,14 @@ func (s *VersionTestSuite) runCheckCompatibleVersion( } sqlFile := subdir + "/v" + actual + "/tmp.sql" - RunTool([]string{ + sql.RunTool([]string{ "./tool", "-ep", environment.GetMySQLAddress(), "-p", strconv.Itoa(environment.GetMySQLPort()), "-u", testUser, "-pw", testPassword, "-db", database, - "-dr", mysql.DriverName, + "-dr", driverName, "-q", "setup-schema", "-f", sqlFile, @@ -190,10 +190,10 @@ func (s *VersionTestSuite) runCheckCompatibleVersion( ConnectAddr: fmt.Sprintf("%v:%v", environment.GetMySQLAddress(), environment.GetMySQLPort()), User: testUser, Password: testPassword, - DriverName: mysql.DriverName, + DriverName: driverName, DatabaseName: database, } - err = checkCompatibleVersion(cfg, expected) + err = sql.CheckCompatibleVersion(cfg, expected) if len(errStr) > 0 { s.Error(err) s.Contains(err.Error(), errStr) diff --git a/tools/sql-extensions/postgres/driver.go b/tools/sql-extensions/postgres/driver.go new file mode 100644 index 00000000000..d1c916749f2 --- /dev/null +++ b/tools/sql-extensions/postgres/driver.go @@ -0,0 +1,134 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package postgres + +import ( + "fmt" + + _ "github.com/lib/pq" // needed to load the postgres driver + + "github.com/iancoleman/strcase" + "github.com/jmoiron/sqlx" + + "github.com/uber/cadence/tools/sql" +) + +const ( + driverName = "postgres" + + dataSourceNamePostgres = "user=%v password=%v host=%v port=%v dbname=%v sslmode=disable " + + readSchemaVersionPostgres = `SELECT curr_version from schema_version where db_name=$1` + + writeSchemaVersionPostgres = `INSERT into schema_version(db_name, creation_time, curr_version, min_compatible_version) VALUES ($1,$2,$3,$4) + ON CONFLICT (db_name) DO UPDATE + SET creation_time = excluded.creation_time, + curr_version = excluded.curr_version, + min_compatible_version = excluded.min_compatible_version;` + + writeSchemaUpdateHistoryPostgres = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES($1,$2,$3,$4,$5,$6,$7)` + + createSchemaVersionTablePostgres = `CREATE TABLE schema_version(db_name VARCHAR(255) not null PRIMARY KEY, ` + + `creation_time TIMESTAMP, ` + + `curr_version VARCHAR(64), ` + + `min_compatible_version VARCHAR(64));` + + createSchemaUpdateHistoryTablePostgres = `CREATE TABLE schema_update_history(` + + `year int not null, ` + + `month int not null, ` + + `update_time TIMESTAMP not null, ` + + `description VARCHAR(255), ` + + `manifest_md5 VARCHAR(64), ` + + `new_version VARCHAR(64), ` + + `old_version VARCHAR(64), ` + + `PRIMARY KEY (year, month, update_time));` + + //NOTE we have to use %v because somehow mysql doesn't work with ? here + createDatabasePostgres = "CREATE database %v" + + dropDatabasePostgres = "Drop database %v" + + listTablesPostgres = "select table_name from information_schema.tables where table_schema='public'" + + dropTablePostgres = "DROP TABLE %v" +) + +type driver struct{} + +var _ sql.Driver = (*driver)(nil) + +func init() { + sql.RegisterDriver(driverName, &driver{}) +} + +func (d *driver) GetDriverName() string { + return driverName +} + +func (d *driver) CreateDBConnection(driverName, host string, port int, user string, passwd string, database string) (*sqlx.DB, error) { + if database == "" { + database = "postgres" + } + db, err := sqlx.Connect(driverName, fmt.Sprintf(dataSourceNamePostgres, user, passwd, host, port, database)) + + if err != nil { + return nil, err + } + // Maps struct names in CamelCase to snake without need for db struct tags. + db.MapperFunc(strcase.ToSnake) + return db, nil +} + +func (d *driver) GetReadSchemaVersionSQL() string { + return readSchemaVersionPostgres +} + +func (d *driver) GetWriteSchemaVersionSQL() string { + return writeSchemaVersionPostgres +} + +func (d *driver) GetWriteSchemaUpdateHistorySQL() string { + return writeSchemaUpdateHistoryPostgres +} + +func (d *driver) GetCreateSchemaVersionTableSQL() string { + return createSchemaVersionTablePostgres +} + +func (d *driver) GetCreateSchemaUpdateHistoryTableSQL() string { + return createSchemaUpdateHistoryTablePostgres +} + +func (d *driver) GetCreateDatabaseSQL() string { + return createDatabasePostgres +} + +func (d *driver) GetDropDatabaseSQL() string { + return dropDatabasePostgres +} + +func (d *driver) GetListTablesSQL() string { + return listTablesPostgres +} + +func (d *driver) GetDropTableSQL() string { + return dropTablePostgres +} diff --git a/tools/sql/README.md b/tools/sql/README.md index 50242a89e5e..81075126d00 100644 --- a/tools/sql/README.md +++ b/tools/sql/README.md @@ -13,8 +13,11 @@ SQL_USER=$USERNAME SQL_PASSWORD=$PASSWD make install-schema-mysql ### Create the binaries - Run `make bins` - You should see an executable `cadence-sql-tool` +- Cadence officially support MySQL and Postgres for SQL. +- For other SQL database, you can add it easily as we do for MySQL/Postgres following our code in sql-extensions ### Do one time database creation and schema setup for a new cluster +- All command below are taking MySQL as example. For postgres, simply use with "--driver postgres" ``` cadence-sql-tool --ep $SQL_HOST_ADDR -p $port create --driver mysql --db cadence @@ -34,7 +37,7 @@ You can only upgrade to a new version after the initial setup done above. ``` ./cadence-sql-tool --ep $SQL_HOST_ADDR -p $port --driver mysql --db cadence update-schema -d ./schema/mysql/v57/cadence/versioned -v x.x -y -- executes a dryrun of upgrade to version x.x -./cadence-cassandra-tool --ep $SQL_HOST_ADDR -p $port --driver mysql --db cadence update-schema -d ./schema/mysql/v57/cadence/versioned -v x.x -- actually executes the upgrade to version x.x +./cadence-sql-tool --ep $SQL_HOST_ADDR -p $port --driver mysql --db cadence update-schema -d ./schema/mysql/v57/cadence/versioned -v x.x -- actually executes the upgrade to version x.x ./cadence-sql-tool --ep $SQL_HOST_ADDR -p $port --driver mysql --db cadence_visibility update-schema -d ./schema/mysql/v57/cadence/versioned -v x.x -y -- executes a dryrun of upgrade to version x.x ./cadence-sql-tool --ep $SQL_HOST_ADDR -p $port --driver mysql --db cadence_visibility update-schema -d ./schema/mysql/v57/cadence/versioned -v x.x -- actually executes the upgrade to version x.x diff --git a/tools/sql/conn.go b/tools/sql/conn.go index ce043257ef1..88ab802ae4d 100644 --- a/tools/sql/conn.go +++ b/tools/sql/conn.go @@ -27,107 +27,119 @@ import ( "github.com/jmoiron/sqlx" "github.com/uber/cadence/tools/common/schema" - "github.com/uber/cadence/tools/sql/mysql" ) type ( - sqlConnectParams struct { - host string - port int - user string - password string - database string - driverName string + // ConnectParams is the connection param + ConnectParams struct { + Host string + Port int + User string + Password string + Database string + DriverName string } - sqlConn struct { + + // Connection is the connection to database + Connection struct { + driver Driver database string db *sqlx.DB } -) -const ( - defaultSQLPort = 3306 - readSchemaVersionSQL = `SELECT curr_version from schema_version where db_name=?` - writeSchemaVersionSQL = `REPLACE into schema_version(db_name, creation_time, curr_version, min_compatible_version) VALUES (?,?,?,?)` - writeSchemaUpdateHistorySQL = `INSERT into schema_update_history(year, month, update_time, old_version, new_version, manifest_md5, description) VALUES(?,?,?,?,?,?,?)` - - createSchemaVersionTableSQL = `CREATE TABLE schema_version(db_name VARCHAR(255) not null PRIMARY KEY, ` + - `creation_time DATETIME(6), ` + - `curr_version VARCHAR(64), ` + - `min_compatible_version VARCHAR(64));` - - createSchemaUpdateHistoryTableSQL = `CREATE TABLE schema_update_history(` + - `year int not null, ` + - `month int not null, ` + - `update_time DATETIME(6) not null, ` + - `description VARCHAR(255), ` + - `manifest_md5 VARCHAR(64), ` + - `new_version VARCHAR(64), ` + - `old_version VARCHAR(64), ` + - `PRIMARY KEY (year, month, update_time));` + // Driver is the driver interface that each SQL database needs to implement + Driver interface { + GetDriverName() string + CreateDBConnection(driverName, host string, port int, user string, passwd string, database string) (*sqlx.DB, error) + GetReadSchemaVersionSQL() string + GetWriteSchemaVersionSQL() string + GetWriteSchemaUpdateHistorySQL() string + GetCreateSchemaVersionTableSQL() string + GetCreateSchemaUpdateHistoryTableSQL() string + GetCreateDatabaseSQL() string + GetDropDatabaseSQL() string + GetListTablesSQL() string + GetDropTableSQL() string + } ) -var _ schema.DB = (*sqlConn)(nil) +var supportedSQLDrivers = map[string]Driver{} -func newConn(params *sqlConnectParams) (*sqlConn, error) { - if params.driverName != mysql.DriverName { - return nil, fmt.Errorf("unsupported database driver: %v", params.driverName) +var _ schema.DB = (*Connection)(nil) + +// RegisterDriver will register a SQL driver for SQL client CLI +func RegisterDriver(driverName string, driver Driver) { + if _, ok := supportedSQLDrivers[driverName]; ok { + panic("driver " + driverName + " already registered") } - db, err := mysql.NewConnection(params.host, params.port, params.user, params.password, params.database) + supportedSQLDrivers[driverName] = driver +} + +// NewConnection creates a new connection to database +func NewConnection(params *ConnectParams) (*Connection, error) { + driver, ok := supportedSQLDrivers[params.DriverName] + + if !ok { + return nil, fmt.Errorf("not supported driver %v, only supported: %v", params.DriverName, supportedSQLDrivers) + } + + db, err := driver.CreateDBConnection(params.DriverName, params.Host, params.Port, params.User, params.Password, params.Database) if err != nil { return nil, err } - return &sqlConn{db: db, database: params.database}, nil + return &Connection{ + db: db, + database: params.Database, + driver: driver, + }, nil } // CreateSchemaVersionTables sets up the schema version tables -func (c *sqlConn) CreateSchemaVersionTables() error { - if err := c.Exec(createSchemaVersionTableSQL); err != nil { +func (c *Connection) CreateSchemaVersionTables() error { + if err := c.Exec(c.driver.GetCreateSchemaVersionTableSQL()); err != nil { return err } - return c.Exec(createSchemaUpdateHistoryTableSQL) + return c.Exec(c.driver.GetCreateSchemaUpdateHistoryTableSQL()) } // ReadSchemaVersion returns the current schema version for the keyspace -func (c *sqlConn) ReadSchemaVersion() (string, error) { +func (c *Connection) ReadSchemaVersion() (string, error) { var version string - err := c.db.Get(&version, readSchemaVersionSQL, c.database) + err := c.db.Get(&version, c.driver.GetReadSchemaVersionSQL(), c.database) return version, err } -// UpdateShemaVersion updates the schema version for the keyspace -func (c *sqlConn) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error { - _, err := c.db.Exec(writeSchemaVersionSQL, c.database, time.Now(), newVersion, minCompatibleVersion) - return err +// UpdateSchemaVersion updates the schema version for the keyspace +func (c *Connection) UpdateSchemaVersion(newVersion string, minCompatibleVersion string) error { + return c.Exec(c.driver.GetWriteSchemaVersionSQL(), c.database, time.Now(), newVersion, minCompatibleVersion) } // WriteSchemaUpdateLog adds an entry to the schema update history table -func (c *sqlConn) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { +func (c *Connection) WriteSchemaUpdateLog(oldVersion string, newVersion string, manifestMD5 string, desc string) error { now := time.Now().UTC() - _, err := c.db.Exec(writeSchemaUpdateHistorySQL, now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc) - return err + return c.Exec(c.driver.GetWriteSchemaUpdateHistorySQL(), now.Year(), int(now.Month()), now, oldVersion, newVersion, manifestMD5, desc) } // Exec executes a sql statement -func (c *sqlConn) Exec(stmt string) error { - _, err := c.db.Exec(stmt) +func (c *Connection) Exec(stmt string, args ...interface{}) error { + _, err := c.db.Exec(stmt, args...) return err } // ListTables returns a list of tables in this database -func (c *sqlConn) ListTables() ([]string, error) { +func (c *Connection) ListTables() ([]string, error) { var tables []string - err := c.db.Select(&tables, fmt.Sprintf("SHOW TABLES FROM %v", c.database)) + err := c.db.Select(&tables, fmt.Sprintf(c.driver.GetListTablesSQL(), c.database)) return tables, err } // DropTable drops a given table from the database -func (c *sqlConn) DropTable(name string) error { - return c.Exec(fmt.Sprintf("DROP TABLE %v", name)) +func (c *Connection) DropTable(name string) error { + return c.Exec(fmt.Sprintf(c.driver.GetDropTableSQL(), name)) } // DropAllTables drops all tables from this database -func (c *sqlConn) DropAllTables() error { +func (c *Connection) DropAllTables() error { tables, err := c.ListTables() if err != nil { return err @@ -141,17 +153,17 @@ func (c *sqlConn) DropAllTables() error { } // CreateDatabase creates a database if it doesn't exist -func (c *sqlConn) CreateDatabase(name string) error { - return c.Exec(fmt.Sprintf("CREATE database %v CHARACTER SET UTF8", name)) +func (c *Connection) CreateDatabase(name string) error { + return c.Exec(fmt.Sprintf(c.driver.GetCreateDatabaseSQL(), name)) } // DropDatabase drops a database -func (c *sqlConn) DropDatabase(name string) error { - return c.Exec(fmt.Sprintf("DROP database %v", name)) +func (c *Connection) DropDatabase(name string) error { + return c.Exec(fmt.Sprintf(c.driver.GetDropDatabaseSQL(), name)) } // Close closes the sql client -func (c *sqlConn) Close() { +func (c *Connection) Close() { if c.db != nil { c.db.Close() } diff --git a/tools/sql/handler.go b/tools/sql/handler.go index 5f1deab9f5f..a5a74944d71 100644 --- a/tools/sql/handler.go +++ b/tools/sql/handler.go @@ -23,6 +23,8 @@ package sql import ( "fmt" "log" + "strconv" + "strings" "github.com/urfave/cli" @@ -39,14 +41,14 @@ func VerifyCompatibleVersion( ds, ok := cfg.DataStores[cfg.DefaultStore] if ok && ds.SQL != nil { - err := checkCompatibleVersion(*ds.SQL, mysql.Version) + err := CheckCompatibleVersion(*ds.SQL, mysql.Version) if err != nil { return err } } ds, ok = cfg.DataStores[cfg.VisibilityStore] if ok && ds.SQL != nil { - err := checkCompatibleVersion(*ds.SQL, mysql.VisibilityVersion) + err := CheckCompatibleVersion(*ds.SQL, mysql.VisibilityVersion) if err != nil { return err } @@ -54,18 +56,36 @@ func VerifyCompatibleVersion( return nil } -// checkCompatibleVersion check the version compatibility -func checkCompatibleVersion( +// CheckCompatibleVersion check the version compatibility +func CheckCompatibleVersion( cfg config.SQL, expectedVersion string, ) error { - - connection, err := newConn(&sqlConnectParams{ - host: cfg.ConnectAddr, - user: cfg.User, - password: cfg.Password, - driverName: cfg.DriverName, - database: cfg.DatabaseName, + var host string + var port int + if strings.Contains(cfg.ConnectAddr, ":") { + ss := strings.Split(cfg.ConnectAddr, ":") + if len(ss) != 2 { + panic("invalid connect address, it must be in host:port format") + } + var err error + host = ss[0] + port, err = strconv.Atoi(ss[1]) + if err != nil { + panic("invalid port number:" + ss[1]) + } + } else { + host = cfg.ConnectAddr + port = defaultSQLPort + } + + connection, err := NewConnection(&ConnectParams{ + Host: host, + Port: port, + User: cfg.User, + Password: cfg.Password, + DriverName: cfg.DriverName, + Database: cfg.DatabaseName, }) if err != nil { return fmt.Errorf("unable to create SQL connection: %v", err.Error()) @@ -83,7 +103,7 @@ func setupSchema(cli *cli.Context) error { if err != nil { return handleErr(schema.NewConfigError(err.Error())) } - conn, err := newConn(params) + conn, err := NewConnection(params) if err != nil { return handleErr(err) } @@ -101,14 +121,14 @@ func updateSchema(cli *cli.Context) error { if err != nil { return handleErr(schema.NewConfigError(err.Error())) } - if params.database == schema.DryrunDBName { + if params.Database == schema.DryrunDBName { p := *params - if err := doCreateDatabase(p, p.database); err != nil { + if err := doCreateDatabase(p, p.Database); err != nil { return handleErr(fmt.Errorf("error creating dryrun database: %v", err)) } - defer doDropDatabase(p, p.database) + defer doDropDatabase(p, p.Database) } - conn, err := newConn(params) + conn, err := NewConnection(params) if err != nil { return handleErr(err) } @@ -136,9 +156,9 @@ func createDatabase(cli *cli.Context) error { return nil } -func doCreateDatabase(p sqlConnectParams, name string) error { - p.database = "" - conn, err := newConn(&p) +func doCreateDatabase(p ConnectParams, name string) error { + p.Database = "" + conn, err := NewConnection(&p) if err != nil { return err } @@ -146,9 +166,9 @@ func doCreateDatabase(p sqlConnectParams, name string) error { return conn.CreateDatabase(name) } -func doDropDatabase(p sqlConnectParams, name string) { - p.database = "" - conn, err := newConn(&p) +func doDropDatabase(p ConnectParams, name string) { + p.Database = "" + conn, err := NewConnection(&p) if err != nil { handleErr(err) return @@ -157,30 +177,31 @@ func doDropDatabase(p sqlConnectParams, name string) { conn.Close() } -func parseConnectParams(cli *cli.Context) (*sqlConnectParams, error) { - params := new(sqlConnectParams) - params.host = cli.GlobalString(schema.CLIOptEndpoint) - params.port = cli.GlobalInt(schema.CLIOptPort) - params.user = cli.GlobalString(schema.CLIOptUser) - params.password = cli.GlobalString(schema.CLIOptPassword) - params.database = cli.GlobalString(schema.CLIOptDatabase) - params.driverName = cli.GlobalString(schema.CLIOptDriverName) +func parseConnectParams(cli *cli.Context) (*ConnectParams, error) { + params := new(ConnectParams) + params.Host = cli.GlobalString(schema.CLIOptEndpoint) + params.Port = cli.GlobalInt(schema.CLIOptPort) + params.User = cli.GlobalString(schema.CLIOptUser) + params.Password = cli.GlobalString(schema.CLIOptPassword) + params.Database = cli.GlobalString(schema.CLIOptDatabase) + params.DriverName = cli.GlobalString(schema.CLIOptDriverName) isDryRun := cli.Bool(schema.CLIOptDryrun) - if err := validateConnectParams(params, isDryRun); err != nil { + if err := ValidateConnectParams(params, isDryRun); err != nil { return nil, err } return params, nil } -func validateConnectParams(params *sqlConnectParams, isDryRun bool) error { - if len(params.host) == 0 { +// ValidateConnectParams validates params +func ValidateConnectParams(params *ConnectParams, isDryRun bool) error { + if len(params.Host) == 0 { return schema.NewConfigError("missing sql endpoint argument " + flag(schema.CLIOptEndpoint)) } - if params.database == "" { + if params.Database == "" { if !isDryRun { return schema.NewConfigError("missing " + flag(schema.CLIOptDatabase) + " argument ") } - params.database = schema.DryrunDBName + params.Database = schema.DryrunDBName } return nil } diff --git a/tools/sql/main.go b/tools/sql/main.go index 86dd55d7178..7044fa288fe 100644 --- a/tools/sql/main.go +++ b/tools/sql/main.go @@ -23,15 +23,16 @@ package sql import ( "os" - _ "github.com/go-sql-driver/mysql" // needed to load the mysql driver "github.com/urfave/cli" "github.com/uber/cadence/tools/common/schema" ) +const defaultSQLPort = 3306 + // RunTool runs the cadence-cassandra-tool command line tool func RunTool(args []string) error { - app := buildCLIOptions() + app := BuildCLIOptions() return app.Run(args) } @@ -44,7 +45,8 @@ func cliHandler(c *cli.Context, handler func(c *cli.Context) error) { } } -func buildCLIOptions() *cli.App { +// BuildCLIOptions builds the options for cli +func BuildCLIOptions() *cli.App { app := cli.NewApp() app.Name = "cadence-sql-tool"