From 7bcc63fdeac11a40ddd8b9cae3a7d5a477150efd Mon Sep 17 00:00:00 2001 From: Jonathan Thiessen Date: Thu, 18 Jun 2015 11:57:59 -0700 Subject: [PATCH] Add initial `IMPORT FOREIGN SCHEMA` support --- mysql_fdw.c | 275 +++++++++++++++++++++++++++++++++++++++++++++++++++- option.c | 29 ++++-- 2 files changed, 294 insertions(+), 10 deletions(-) diff --git a/mysql_fdw.c b/mysql_fdw.c index e7e27d3..06fb2f0 100644 --- a/mysql_fdw.c +++ b/mysql_fdw.c @@ -132,6 +132,10 @@ static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, static void mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Cost *startup_cost, Cost *total_cost, Oid foreigntableid); +#if PG_VERSION_NUM >= 90500 +static List *mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid); +#endif + static bool mysql_is_column_unique(Oid foreigntableid); void* mysql_dll_handle = NULL; @@ -303,7 +307,11 @@ mysql_fdw_handler(PG_FUNCTION_ARGS) fdwroutine->ReScanForeignScan = mysqlReScanForeignScan; fdwroutine->EndForeignScan = mysqlEndForeignScan; - /* Callback functions for readable FDW */ +#if PG_VERSION_NUM >= 90500 + fdwroutine->ImportForeignSchema = mysqlImportForeignSchema; +#endif + + /* Callback functions for writeable FDW */ fdwroutine->ExecForeignInsert = mysqlExecForeignInsert; fdwroutine->BeginForeignModify = mysqlBeginForeignModify; fdwroutine->PlanForeignModify = mysqlPlanForeignModify; @@ -1578,3 +1586,268 @@ mysqlEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) } } +/* + * Import a foreign schema (9.5+) + */ +#if PG_VERSION_NUM >= 90500 +static List * +mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) +{ + List *commands = NIL; + bool import_default = false; + bool import_not_null = true; + ForeignServer *server; + UserMapping *user; + mysql_opt *options = NULL; + MYSQL *conn; + StringInfoData buf; + MYSQL_RES *volatile res = NULL; + MYSQL_ROW row; + ListCell *lc; + char *err = NULL; + + /* Parse statement options */ + foreach(lc, stmt->options) + { + DefElem *def = (DefElem *) lfirst(lc); + + if (strcmp(def->defname, "import_default") == 0) + import_default = defGetBoolean(def); + else if (strcmp(def->defname, "import_not_null") == 0) + import_not_null = defGetBoolean(def); + else + ereport(ERROR, + (errcode(ERRCODE_FDW_INVALID_OPTION_NAME), + errmsg("invalid option \"%s\"", def->defname))); + } + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + server = GetForeignServer(serverOid); + user = GetUserMapping(GetUserId(), server->serverid); + options = mysql_get_options(serverOid); + conn = mysql_get_connection(server, user, options); + + /* Create workspace for strings */ + initStringInfo(&buf); + + /* Check that the schema really exists */ + appendStringInfo(&buf, "SELECT 1 FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s'", stmt->remote_schema); + + if (_mysql_query(conn, buf.data) != 0) + { + switch(_mysql_errno(conn)) + { + case CR_NO_ERROR: + break; + + case CR_OUT_OF_MEMORY: + case CR_SERVER_GONE_ERROR: + case CR_SERVER_LOST: + case CR_UNKNOWN_ERROR: + err = pstrdup(_mysql_error(conn)); + mysql_rel_connection(conn); + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to execute the MySQL query: \n%s", err))); + break; + + case CR_COMMANDS_OUT_OF_SYNC: + default: + err = pstrdup(_mysql_error(conn)); + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to execute the MySQL query: \n%s", err))); + } + } + + res = _mysql_store_result(conn); + if (!res || mysql_num_rows(res) < 1) + { + ereport(ERROR, + (errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND), + errmsg("schema \"%s\" is not present on foreign server \"%s\"", + stmt->remote_schema, server->servername))); + } + _mysql_free_result(res); + res = NULL; + resetStringInfo(&buf); + + /* + * Fetch all table data from this schema, possibly restricted by + * EXCEPT or LIMIT TO. + */ + appendStringInfo(&buf, + " SELECT" + " t.TABLE_NAME," + " c.COLUMN_NAME," + " CASE" + " WHEN c.DATA_TYPE = 'enum' THEN LOWER(CONCAT(c.COLUMN_NAME, '_t'))" + " WHEN c.DATA_TYPE = 'tinyint' THEN 'smallint'" + " WHEN c.DATA_TYPE = 'mediumint' THEN 'integer'" + " WHEN c.DATA_TYPE = 'tinyint unsigned' THEN 'smallint'" + " WHEN c.DATA_TYPE = 'smallint unsigned' THEN 'integer'" + " WHEN c.DATA_TYPE = 'mediumint unsigned' THEN 'integer'" + " WHEN c.DATA_TYPE = 'int unsigned' THEN 'bigint'" + " WHEN c.DATA_TYPE = 'bigint unsigned' THEN 'numeric(20)'" + " WHEN c.DATA_TYPE = 'double' THEN 'double precision'" + " WHEN c.DATA_TYPE = 'float' THEN 'real'" + " WHEN c.DATA_TYPE = 'datetime' THEN 'timestamp'" + " WHEN c.DATA_TYPE = 'longtext' THEN 'text'" + " WHEN c.DATA_TYPE = 'mediumtext' THEN 'text'" + " WHEN c.DATA_TYPE = 'blob' THEN 'bytea'" + " ELSE c.DATA_TYPE" + " END," + " c.COLUMN_TYPE," + " IF(c.IS_NULLABLE = 'NO', 't', 'f')," + " c.COLUMN_DEFAULT" + " FROM" + " information_schema.TABLES AS t" + " JOIN" + " information_schema.COLUMNS AS c" + " ON" + " t.TABLE_CATALOG = c.TABLE_CATALOG AND t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME" + " WHERE" + " t.TABLE_SCHEMA = '%s'", + stmt->remote_schema); + + /* Apply restrictions for LIMIT TO and EXCEPT */ + if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO || + stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + { + bool first_item = true; + + appendStringInfoString(&buf, " AND t.TABLE_NAME "); + if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT) + appendStringInfoString(&buf, "NOT "); + appendStringInfoString(&buf, "IN ("); + + /* Append list of table names within IN clause */ + foreach(lc, stmt->table_list) + { + RangeVar *rv = (RangeVar *) lfirst(lc); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ", "); + + appendStringInfo(&buf, "%s", rv->relname); + } + appendStringInfoChar(&buf, ')'); + } + + /* Append ORDER BY at the end of query to ensure output ordering */ + appendStringInfo(&buf, " ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION"); + + /* Fetch the data */ + if (_mysql_query(conn, buf.data) != 0) + { + switch(_mysql_errno(conn)) + { + case CR_NO_ERROR: + break; + + case CR_OUT_OF_MEMORY: + case CR_SERVER_GONE_ERROR: + case CR_SERVER_LOST: + case CR_UNKNOWN_ERROR: + err = pstrdup(_mysql_error(conn)); + mysql_rel_connection(conn); + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to execute the MySQL query: \n%s", err))); + break; + + case CR_COMMANDS_OUT_OF_SYNC: + default: + err = pstrdup(_mysql_error(conn)); + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to execute the MySQL query: \n%s", err))); + } + } + + res = _mysql_store_result(conn); + row = _mysql_fetch_row(res); + while (row) + { + char *tablename = row[0]; + bool first_item = true; + + resetStringInfo(&buf); + appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n", + quote_identifier(tablename)); + + /* Scan all rows for this table */ + do + { + char *attname; + char *typename; + char *typedfn; + char *attnotnull; + char *attdefault; + + /* If table has no columns, we'll see nulls here */ + if (row[1] == NULL) + continue; + + attname = row[1]; + typename = row[2]; + typedfn = row[3]; + attnotnull = row[4]; + attdefault = row[5] == NULL ? (char *) NULL : row[5]; + + if (strncmp(typedfn, "enum(", 5) == 0) + ereport(NOTICE, (errmsg("If you encounter an error, you may need to execute the following first:\n" + "DO $$BEGIN IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_type WHERE typname = '%s') THEN CREATE TYPE %s AS %s; END IF; END$$;\n", + typename, + typename, + typedfn))); + + if (first_item) + first_item = false; + else + appendStringInfoString(&buf, ",\n"); + + /* Print column name and type */ + appendStringInfo(&buf, " %s %s", + quote_identifier(attname), + typename); + + /* Add DEFAULT if needed */ + if (import_default && attdefault != NULL) + appendStringInfo(&buf, " DEFAULT %s", attdefault); + + /* Add NOT NULL if needed */ + if (import_not_null && attnotnull[0] == 't') + appendStringInfoString(&buf, " NOT NULL"); + } + while ((row = _mysql_fetch_row(res)) && + (strcmp(row[0], tablename) == 0)); + + /* + * Add server name and table-level options. We specify remote + * database and table name as options (the latter to ensure that + * renaming the foreign table doesn't break the association). + */ + appendStringInfo(&buf, "\n) SERVER %s OPTIONS (dbname '%s', table_name '%s');\n", + quote_identifier(server->servername), + stmt->remote_schema, + tablename); + + commands = lappend(commands, pstrdup(buf.data)); + } + + /* Clean up */ + _mysql_free_result(res); + res = NULL; + resetStringInfo(&buf); + + mysql_rel_connection(conn); + + return commands; +} +#endif diff --git a/option.c b/option.c index a6eac4e..2afc1a9 100644 --- a/option.c +++ b/option.c @@ -149,10 +149,10 @@ mysql_is_valid_option(const char *option, Oid context) * Fetch the options for a mysql_fdw foreign table. */ mysql_opt* -mysql_get_options(Oid foreigntableid) +mysql_get_options(Oid foreignoid) { - ForeignTable *f_table; - ForeignServer *f_server; + ForeignTable *f_table = NULL; + ForeignServer *f_server = NULL; UserMapping *f_mapping; List *options; ListCell *lc; @@ -164,12 +164,23 @@ mysql_get_options(Oid foreigntableid) /* * Extract options from FDW objects. */ - f_table = GetForeignTable(foreigntableid); - f_server = GetForeignServer(f_table->serverid); - f_mapping = GetUserMapping(GetUserId(), f_table->serverid); + PG_TRY(); + { + f_table = GetForeignTable(foreignoid); + f_server = GetForeignServer(f_table->serverid); + } + PG_CATCH(); + { + f_table = NULL; + f_server = GetForeignServer(foreignoid); + } + PG_END_TRY(); + + f_mapping = GetUserMapping(GetUserId(), f_server->serverid); options = NIL; - options = list_concat(options, f_table->options); + if (f_table) + options = list_concat(options, f_table->options); options = list_concat(options, f_server->options); options = list_concat(options, f_mapping->options); @@ -212,8 +223,8 @@ mysql_get_options(Oid foreigntableid) if (!opt->svr_port) opt->svr_port = MYSQL_PORT; - if (!opt->svr_table) - opt->svr_table = get_rel_name(foreigntableid); + if (!opt->svr_table && f_table) + opt->svr_table = get_rel_name(foreignoid); return opt; }