Skip to content

Misc cleanup in libpagestore.c. #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 5, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 58 additions & 52 deletions contrib/neon/libpagestore.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* libpqpagestore.c
* libpagestore.c
* Handles network communications with the remote pagestore.
*
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
Expand Down Expand Up @@ -32,25 +32,25 @@ PG_MODULE_MAGIC;

void _PG_init(void);

#define PqPageStoreTrace DEBUG5
#define PageStoreTrace DEBUG5

#define ZENITH_TAG "[ZENITH_SMGR] "
#define zenith_log(tag, fmt, ...) ereport(tag, \
(errmsg(ZENITH_TAG fmt, ## __VA_ARGS__), \
#define NEON_TAG "[NEON_SMGR] "
#define neon_log(tag, fmt, ...) ereport(tag, \
(errmsg(NEON_TAG fmt, ## __VA_ARGS__), \
errhidestmt(true), errhidecontext(true)))

bool connected = false;
PGconn *pageserver_conn = NULL;

char *page_server_connstring_raw;

static ZenithResponse *zenith_call(ZenithRequest *request);
static ZenithResponse *pageserver_call(ZenithRequest *request);
page_server_api api = {
.request = zenith_call
.request = pageserver_call
};

static void
zenith_connect()
pageserver_connect()
{
char *query;
int ret;
Expand All @@ -67,7 +67,7 @@ zenith_connect()
pageserver_conn = NULL;
ereport(ERROR,
(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
errmsg("[ZENITH_SMGR] could not establish connection"),
errmsg(NEON_TAG "could not establish connection to pageserver"),
errdetail_internal("%s", msg)));
}

Expand All @@ -77,8 +77,7 @@ zenith_connect()
{
PQfinish(pageserver_conn);
pageserver_conn = NULL;
zenith_log(ERROR,
"[ZENITH_SMGR] failed to start dispatcher_loop on pageserver");
neon_log(ERROR, "could not send pagestream command to pageserver");
}

while (PQisBusy(pageserver_conn))
Expand All @@ -105,14 +104,13 @@ zenith_connect()
PQfinish(pageserver_conn);
pageserver_conn = NULL;

zenith_log(ERROR, "[ZENITH_SMGR] failed to get handshake from pageserver: %s",
msg);
neon_log(ERROR, "could not complete handshake with pageserver: %s",
msg);
}
}
}

// FIXME: when auth is enabled this ptints JWT to logs
zenith_log(LOG, "libpqpagestore: connected to '%s'", page_server_connstring);
neon_log(LOG, "libpagestore: connected to '%s'", page_server_connstring_raw);

connected = true;
}
Expand All @@ -126,7 +124,7 @@ call_PQgetCopyData(PGconn *conn, char **buffer)
int ret;

retry:
ret = PQgetCopyData(conn, buffer, 1 /* async */);
ret = PQgetCopyData(conn, buffer, 1 /* async */ );

if (ret == 0)
{
Expand All @@ -146,8 +144,8 @@ call_PQgetCopyData(PGconn *conn, char **buffer)
if (wc & WL_SOCKET_READABLE)
{
if (!PQconsumeInput(conn))
zenith_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
neon_log(ERROR, "could not get response from pageserver: %s",
PQerrorMessage(conn));
}

goto retry;
Expand All @@ -158,7 +156,7 @@ call_PQgetCopyData(PGconn *conn, char **buffer)


static ZenithResponse *
zenith_call(ZenithRequest *request)
pageserver_call(ZenithRequest *request)
{
StringInfoData req_buff;
StringInfoData resp_buff;
Expand All @@ -175,7 +173,7 @@ zenith_call(ZenithRequest *request)
}

if (!connected)
zenith_connect();
pageserver_connect();

req_buff = zm_pack_request(request);

Expand All @@ -184,21 +182,21 @@ zenith_call(ZenithRequest *request)
*
* In principle, this could block if the output buffer is full, and we
* should use async mode and check for interrupts while waiting. In
* practice, our requests are small enough to always fit in the output and
* TCP buffer.
* practice, our requests are small enough to always fit in the output
* and TCP buffer.
*/
if (PQputCopyData(pageserver_conn, req_buff.data, req_buff.len) <= 0 || PQflush(pageserver_conn))
{
zenith_log(ERROR, "failed to send page request: %s",
PQerrorMessage(pageserver_conn));
neon_log(ERROR, "failed to send page request: %s",
PQerrorMessage(pageserver_conn));
}
pfree(req_buff.data);

if (message_level_is_interesting(PqPageStoreTrace))
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) request);

zenith_log(PqPageStoreTrace, "Sent request: %s", msg);
neon_log(PageStoreTrace, "sent request: %s", msg);
pfree(msg);
}

Expand All @@ -207,25 +205,20 @@ zenith_call(ZenithRequest *request)
resp_buff.cursor = 0;

if (resp_buff.len == -1)
zenith_log(ERROR, "end of COPY");
neon_log(ERROR, "end of COPY");
else if (resp_buff.len == -2)
zenith_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));
neon_log(ERROR, "could not read COPY data: %s", PQerrorMessage(pageserver_conn));

resp = zm_unpack_response(&resp_buff);
PQfreemem(resp_buff.data);

if (message_level_is_interesting(PqPageStoreTrace))
if (message_level_is_interesting(PageStoreTrace))
{
char *msg = zm_to_string((ZenithMessage *) resp);

zenith_log(PqPageStoreTrace, "Got response: %s", msg);
neon_log(PageStoreTrace, "got response: %s", msg);
pfree(msg);
}

/*
* XXX: zm_to_string leak strings. Check with what memory contex all this
* methods are called.
*/
}
PG_CATCH();
{
Expand All @@ -238,7 +231,7 @@ zenith_call(ZenithRequest *request)
*/
if (connected)
{
zenith_log(LOG, "dropping connection to page server due to error");
neon_log(LOG, "dropping connection to page server due to error");
PQfinish(pageserver_conn);
pageserver_conn = NULL;
connected = false;
Expand Down Expand Up @@ -271,11 +264,13 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
PQconninfoOption *conn_options;
PQconninfoOption *conn_option;
MemoryContext oldcontext;

/*
* Here we substitute password in connection string with an environment variable.
* To simplify things we construct a connection string back with only known options.
* In particular: host port user and password. We do not currently use other options and
* constructing full connstring in an URI shape is quite messy.
* Here we substitute password in connection string with an environment
* variable. To simplify things we construct a connection string back with
* only known options. In particular: host port user and password. We do
* not currently use other options and constructing full connstring in an
* URI shape is quite messy.
*/

if (page_server_connstring_raw == NULL || page_server_connstring_raw[0] == '\0')
Expand All @@ -302,15 +297,18 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
*/
for (conn_option = conn_options; conn_option->keyword != NULL; conn_option++)
{
if (strcmp(conn_option->keyword, "host") == 0) {
if (strcmp(conn_option->keyword, "host") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
host = conn_option->val;
}
else if (strcmp(conn_option->keyword, "port") == 0) {
else if (strcmp(conn_option->keyword, "port") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
port = conn_option->val;
}
else if (strcmp(conn_option->keyword, "user") == 0) {
else if (strcmp(conn_option->keyword, "user") == 0)
{
if (conn_option->val != NULL && conn_option->val[0] != '\0')
user = conn_option->val;
}
Expand All @@ -324,7 +322,7 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
(errcode(ERRCODE_CONNECTION_EXCEPTION),
errmsg("expected placeholder value in pageserver password starting from $ but found: %s", &conn_option->val[1])));

zenith_log(LOG, "found auth token placeholder in pageserver conn string %s", &conn_option->val[1]);
neon_log(LOG, "found auth token placeholder in pageserver conn string '%s'", &conn_option->val[1]);
auth_token = getenv(&conn_option->val[1]);
if (!auth_token)
{
Expand All @@ -334,12 +332,16 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
}
else
{
zenith_log(LOG, "using auth token from environment passed via env");
neon_log(LOG, "using auth token from environment passed via env");
}
}
}
}
// allocate connection string in a TopMemoryContext to make sure it is not freed

/*
* allocate connection string in TopMemoryContext to make sure it is not
* freed
*/
oldcontext = CurrentMemoryContext;
MemoryContextSwitchTo(TopMemoryContext);
page_server_connstring = psprintf("postgresql://%s:%s@%s:%s", user, auth_token ? auth_token : "", host, port);
Expand Down Expand Up @@ -398,15 +400,15 @@ _PG_init(void)
-1, -1, INT_MAX,
PGC_SIGHUP,
GUC_UNIT_MB,
NULL, NULL, NULL);
NULL, NULL, NULL);

relsize_hash_init();
EmitWarningsOnPlaceholders("neon");

if (page_server != NULL)
zenith_log(ERROR, "libpqpagestore already loaded");
neon_log(ERROR, "libpagestore already loaded");

zenith_log(PqPageStoreTrace, "libpqpagestore already loaded");
neon_log(PageStoreTrace, "libpagestore already loaded");
page_server = &api;

/* substitute password in pageserver_connstring */
Expand All @@ -415,18 +417,22 @@ _PG_init(void)
/* Is there more correct way to pass CustomGUC to postgres code? */
zenith_timeline_walproposer = zenith_timeline;
zenith_tenant_walproposer = zenith_tenant;
/* Walproposer instructcs safekeeper which pageserver to use for replication */

/*
* Walproposer instructs safekeeper which pageserver to use for
* replication
*/
zenith_pageserver_connstring_walproposer = page_server_connstring;

if (wal_redo)
{
zenith_log(PqPageStoreTrace, "set inmem_smgr hook");
neon_log(PageStoreTrace, "set inmem_smgr hook");
smgr_hook = smgr_inmem;
smgr_init_hook = smgr_init_inmem;
}
else if (page_server_connstring && page_server_connstring[0])
{
zenith_log(PqPageStoreTrace, "set zenith_smgr hook");
neon_log(PageStoreTrace, "set neon_smgr hook");
smgr_hook = smgr_zenith;
smgr_init_hook = smgr_init_zenith;
dbsize_hook = zenith_dbsize;
Expand Down