Skip to content

Commit

Permalink
Add initial IMPORT FOREIGN SCHEMA support
Browse files Browse the repository at this point in the history
  • Loading branch information
jjthiessen committed Jul 20, 2015
1 parent 5f8e57c commit 7bcc63f
Show file tree
Hide file tree
Showing 2 changed files with 294 additions and 10 deletions.
275 changes: 274 additions & 1 deletion mysql_fdw.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ static ForeignScan *mysqlGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel,
static void mysqlEstimateCosts(PlannerInfo *root, RelOptInfo *baserel, Cost *startup_cost, Cost *total_cost,
Oid foreigntableid);

#if PG_VERSION_NUM >= 90500
static List *mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid);
#endif

static bool mysql_is_column_unique(Oid foreigntableid);

void* mysql_dll_handle = NULL;
Expand Down Expand Up @@ -303,7 +307,11 @@ mysql_fdw_handler(PG_FUNCTION_ARGS)
fdwroutine->ReScanForeignScan = mysqlReScanForeignScan;
fdwroutine->EndForeignScan = mysqlEndForeignScan;

/* Callback functions for readable FDW */
#if PG_VERSION_NUM >= 90500
fdwroutine->ImportForeignSchema = mysqlImportForeignSchema;
#endif

/* Callback functions for writeable FDW */
fdwroutine->ExecForeignInsert = mysqlExecForeignInsert;
fdwroutine->BeginForeignModify = mysqlBeginForeignModify;
fdwroutine->PlanForeignModify = mysqlPlanForeignModify;
Expand Down Expand Up @@ -1578,3 +1586,268 @@ mysqlEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo)
}
}

/*
* Import a foreign schema (9.5+)
*/
#if PG_VERSION_NUM >= 90500
static List *
mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
{
List *commands = NIL;
bool import_default = false;
bool import_not_null = true;
ForeignServer *server;
UserMapping *user;
mysql_opt *options = NULL;
MYSQL *conn;
StringInfoData buf;
MYSQL_RES *volatile res = NULL;
MYSQL_ROW row;
ListCell *lc;
char *err = NULL;

/* Parse statement options */
foreach(lc, stmt->options)
{
DefElem *def = (DefElem *) lfirst(lc);

if (strcmp(def->defname, "import_default") == 0)
import_default = defGetBoolean(def);
else if (strcmp(def->defname, "import_not_null") == 0)
import_not_null = defGetBoolean(def);
else
ereport(ERROR,
(errcode(ERRCODE_FDW_INVALID_OPTION_NAME),
errmsg("invalid option \"%s\"", def->defname)));
}

/*
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
server = GetForeignServer(serverOid);
user = GetUserMapping(GetUserId(), server->serverid);
options = mysql_get_options(serverOid);
conn = mysql_get_connection(server, user, options);

/* Create workspace for strings */
initStringInfo(&buf);

/* Check that the schema really exists */
appendStringInfo(&buf, "SELECT 1 FROM information_schema.TABLES WHERE TABLE_SCHEMA = '%s'", stmt->remote_schema);

if (_mysql_query(conn, buf.data) != 0)
{
switch(_mysql_errno(conn))
{
case CR_NO_ERROR:
break;

case CR_OUT_OF_MEMORY:
case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
case CR_UNKNOWN_ERROR:
err = pstrdup(_mysql_error(conn));
mysql_rel_connection(conn);
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to execute the MySQL query: \n%s", err)));
break;

case CR_COMMANDS_OUT_OF_SYNC:
default:
err = pstrdup(_mysql_error(conn));
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to execute the MySQL query: \n%s", err)));
}
}

res = _mysql_store_result(conn);
if (!res || mysql_num_rows(res) < 1)
{
ereport(ERROR,
(errcode(ERRCODE_FDW_SCHEMA_NOT_FOUND),
errmsg("schema \"%s\" is not present on foreign server \"%s\"",
stmt->remote_schema, server->servername)));
}
_mysql_free_result(res);
res = NULL;
resetStringInfo(&buf);

/*
* Fetch all table data from this schema, possibly restricted by
* EXCEPT or LIMIT TO.
*/
appendStringInfo(&buf,
" SELECT"
" t.TABLE_NAME,"
" c.COLUMN_NAME,"
" CASE"
" WHEN c.DATA_TYPE = 'enum' THEN LOWER(CONCAT(c.COLUMN_NAME, '_t'))"
" WHEN c.DATA_TYPE = 'tinyint' THEN 'smallint'"
" WHEN c.DATA_TYPE = 'mediumint' THEN 'integer'"
" WHEN c.DATA_TYPE = 'tinyint unsigned' THEN 'smallint'"
" WHEN c.DATA_TYPE = 'smallint unsigned' THEN 'integer'"
" WHEN c.DATA_TYPE = 'mediumint unsigned' THEN 'integer'"
" WHEN c.DATA_TYPE = 'int unsigned' THEN 'bigint'"
" WHEN c.DATA_TYPE = 'bigint unsigned' THEN 'numeric(20)'"
" WHEN c.DATA_TYPE = 'double' THEN 'double precision'"
" WHEN c.DATA_TYPE = 'float' THEN 'real'"
" WHEN c.DATA_TYPE = 'datetime' THEN 'timestamp'"
" WHEN c.DATA_TYPE = 'longtext' THEN 'text'"
" WHEN c.DATA_TYPE = 'mediumtext' THEN 'text'"
" WHEN c.DATA_TYPE = 'blob' THEN 'bytea'"
" ELSE c.DATA_TYPE"
" END,"
" c.COLUMN_TYPE,"
" IF(c.IS_NULLABLE = 'NO', 't', 'f'),"
" c.COLUMN_DEFAULT"
" FROM"
" information_schema.TABLES AS t"
" JOIN"
" information_schema.COLUMNS AS c"
" ON"
" t.TABLE_CATALOG = c.TABLE_CATALOG AND t.TABLE_SCHEMA = c.TABLE_SCHEMA AND t.TABLE_NAME = c.TABLE_NAME"
" WHERE"
" t.TABLE_SCHEMA = '%s'",
stmt->remote_schema);

/* Apply restrictions for LIMIT TO and EXCEPT */
if (stmt->list_type == FDW_IMPORT_SCHEMA_LIMIT_TO ||
stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
{
bool first_item = true;

appendStringInfoString(&buf, " AND t.TABLE_NAME ");
if (stmt->list_type == FDW_IMPORT_SCHEMA_EXCEPT)
appendStringInfoString(&buf, "NOT ");
appendStringInfoString(&buf, "IN (");

/* Append list of table names within IN clause */
foreach(lc, stmt->table_list)
{
RangeVar *rv = (RangeVar *) lfirst(lc);

if (first_item)
first_item = false;
else
appendStringInfoString(&buf, ", ");

appendStringInfo(&buf, "%s", rv->relname);
}
appendStringInfoChar(&buf, ')');
}

/* Append ORDER BY at the end of query to ensure output ordering */
appendStringInfo(&buf, " ORDER BY t.TABLE_NAME, c.ORDINAL_POSITION");

/* Fetch the data */
if (_mysql_query(conn, buf.data) != 0)
{
switch(_mysql_errno(conn))
{
case CR_NO_ERROR:
break;

case CR_OUT_OF_MEMORY:
case CR_SERVER_GONE_ERROR:
case CR_SERVER_LOST:
case CR_UNKNOWN_ERROR:
err = pstrdup(_mysql_error(conn));
mysql_rel_connection(conn);
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to execute the MySQL query: \n%s", err)));
break;

case CR_COMMANDS_OUT_OF_SYNC:
default:
err = pstrdup(_mysql_error(conn));
ereport(ERROR,
(errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION),
errmsg("failed to execute the MySQL query: \n%s", err)));
}
}

res = _mysql_store_result(conn);
row = _mysql_fetch_row(res);
while (row)
{
char *tablename = row[0];
bool first_item = true;

resetStringInfo(&buf);
appendStringInfo(&buf, "CREATE FOREIGN TABLE %s (\n",
quote_identifier(tablename));

/* Scan all rows for this table */
do
{
char *attname;
char *typename;
char *typedfn;
char *attnotnull;
char *attdefault;

/* If table has no columns, we'll see nulls here */
if (row[1] == NULL)
continue;

attname = row[1];
typename = row[2];
typedfn = row[3];
attnotnull = row[4];
attdefault = row[5] == NULL ? (char *) NULL : row[5];

if (strncmp(typedfn, "enum(", 5) == 0)
ereport(NOTICE, (errmsg("If you encounter an error, you may need to execute the following first:\n"
"DO $$BEGIN IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_type WHERE typname = '%s') THEN CREATE TYPE %s AS %s; END IF; END$$;\n",
typename,
typename,
typedfn)));

if (first_item)
first_item = false;
else
appendStringInfoString(&buf, ",\n");

/* Print column name and type */
appendStringInfo(&buf, " %s %s",
quote_identifier(attname),
typename);

/* Add DEFAULT if needed */
if (import_default && attdefault != NULL)
appendStringInfo(&buf, " DEFAULT %s", attdefault);

/* Add NOT NULL if needed */
if (import_not_null && attnotnull[0] == 't')
appendStringInfoString(&buf, " NOT NULL");
}
while ((row = _mysql_fetch_row(res)) &&
(strcmp(row[0], tablename) == 0));

/*
* Add server name and table-level options. We specify remote
* database and table name as options (the latter to ensure that
* renaming the foreign table doesn't break the association).
*/
appendStringInfo(&buf, "\n) SERVER %s OPTIONS (dbname '%s', table_name '%s');\n",
quote_identifier(server->servername),
stmt->remote_schema,
tablename);

commands = lappend(commands, pstrdup(buf.data));
}

/* Clean up */
_mysql_free_result(res);
res = NULL;
resetStringInfo(&buf);

mysql_rel_connection(conn);

return commands;
}
#endif
29 changes: 20 additions & 9 deletions option.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ mysql_is_valid_option(const char *option, Oid context)
* Fetch the options for a mysql_fdw foreign table.
*/
mysql_opt*
mysql_get_options(Oid foreigntableid)
mysql_get_options(Oid foreignoid)
{
ForeignTable *f_table;
ForeignServer *f_server;
ForeignTable *f_table = NULL;
ForeignServer *f_server = NULL;
UserMapping *f_mapping;
List *options;
ListCell *lc;
Expand All @@ -164,12 +164,23 @@ mysql_get_options(Oid foreigntableid)
/*
* Extract options from FDW objects.
*/
f_table = GetForeignTable(foreigntableid);
f_server = GetForeignServer(f_table->serverid);
f_mapping = GetUserMapping(GetUserId(), f_table->serverid);
PG_TRY();
{
f_table = GetForeignTable(foreignoid);
f_server = GetForeignServer(f_table->serverid);
}
PG_CATCH();
{
f_table = NULL;
f_server = GetForeignServer(foreignoid);
}
PG_END_TRY();

f_mapping = GetUserMapping(GetUserId(), f_server->serverid);

options = NIL;
options = list_concat(options, f_table->options);
if (f_table)
options = list_concat(options, f_table->options);
options = list_concat(options, f_server->options);
options = list_concat(options, f_mapping->options);

Expand Down Expand Up @@ -212,8 +223,8 @@ mysql_get_options(Oid foreigntableid)
if (!opt->svr_port)
opt->svr_port = MYSQL_PORT;

if (!opt->svr_table)
opt->svr_table = get_rel_name(foreigntableid);
if (!opt->svr_table && f_table)
opt->svr_table = get_rel_name(foreignoid);

return opt;
}
Expand Down

0 comments on commit 7bcc63f

Please sign in to comment.