1
1
/*-------------------------------------------------------------------------
2
2
*
3
- * libpqpagestore .c
3
+ * libpagestore .c
4
4
* Handles network communications with the remote pagestore.
5
5
*
6
6
* Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
@@ -32,25 +32,25 @@ PG_MODULE_MAGIC;
32
32
33
33
void _PG_init (void );
34
34
35
- #define PqPageStoreTrace DEBUG5
35
+ #define PageStoreTrace DEBUG5
36
36
37
- #define ZENITH_TAG "[ZENITH_SMGR ] "
38
- #define zenith_log (tag , fmt , ...) ereport(tag, \
39
- (errmsg(ZENITH_TAG fmt, ## __VA_ARGS__), \
37
+ #define NEON_TAG "[NEON_SMGR ] "
38
+ #define neon_log (tag , fmt , ...) ereport(tag, \
39
+ (errmsg(NEON_TAG fmt, ## __VA_ARGS__), \
40
40
errhidestmt(true), errhidecontext(true)))
41
41
42
42
bool connected = false;
43
43
PGconn * pageserver_conn = NULL ;
44
44
45
45
char * page_server_connstring_raw ;
46
46
47
- static ZenithResponse * zenith_call (ZenithRequest * request );
47
+ static ZenithResponse * pageserver_call (ZenithRequest * request );
48
48
page_server_api api = {
49
- .request = zenith_call
49
+ .request = pageserver_call
50
50
};
51
51
52
52
static void
53
- zenith_connect ()
53
+ pageserver_connect ()
54
54
{
55
55
char * query ;
56
56
int ret ;
@@ -67,7 +67,7 @@ zenith_connect()
67
67
pageserver_conn = NULL ;
68
68
ereport (ERROR ,
69
69
(errcode (ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION ),
70
- errmsg ("[ZENITH_SMGR] could not establish connection" ),
70
+ errmsg (NEON_TAG " could not establish connection to pageserver " ),
71
71
errdetail_internal ("%s" , msg )));
72
72
}
73
73
@@ -77,8 +77,7 @@ zenith_connect()
77
77
{
78
78
PQfinish (pageserver_conn );
79
79
pageserver_conn = NULL ;
80
- zenith_log (ERROR ,
81
- "[ZENITH_SMGR] failed to start dispatcher_loop on pageserver" );
80
+ neon_log (ERROR , "could not send pagestream command to pageserver" );
82
81
}
83
82
84
83
while (PQisBusy (pageserver_conn ))
@@ -105,14 +104,13 @@ zenith_connect()
105
104
PQfinish (pageserver_conn );
106
105
pageserver_conn = NULL ;
107
106
108
- zenith_log (ERROR , "[ZENITH_SMGR] failed to get handshake from pageserver: %s" ,
109
- msg );
107
+ neon_log (ERROR , "could not complete handshake with pageserver: %s" ,
108
+ msg );
110
109
}
111
110
}
112
111
}
113
112
114
- // FIXME: when auth is enabled this ptints JWT to logs
115
- zenith_log (LOG , "libpqpagestore: connected to '%s'" , page_server_connstring );
113
+ neon_log (LOG , "libpagestore: connected to '%s'" , page_server_connstring_raw );
116
114
117
115
connected = true;
118
116
}
@@ -126,7 +124,7 @@ call_PQgetCopyData(PGconn *conn, char **buffer)
126
124
int ret ;
127
125
128
126
retry :
129
- ret = PQgetCopyData (conn , buffer , 1 /* async */ );
127
+ ret = PQgetCopyData (conn , buffer , 1 /* async */ );
130
128
131
129
if (ret == 0 )
132
130
{
@@ -146,8 +144,8 @@ call_PQgetCopyData(PGconn *conn, char **buffer)
146
144
if (wc & WL_SOCKET_READABLE )
147
145
{
148
146
if (!PQconsumeInput (conn ))
149
- zenith_log (ERROR , "could not get response from pageserver: %s" ,
150
- PQerrorMessage (conn ));
147
+ neon_log (ERROR , "could not get response from pageserver: %s" ,
148
+ PQerrorMessage (conn ));
151
149
}
152
150
153
151
goto retry ;
@@ -158,7 +156,7 @@ call_PQgetCopyData(PGconn *conn, char **buffer)
158
156
159
157
160
158
static ZenithResponse *
161
- zenith_call (ZenithRequest * request )
159
+ pageserver_call (ZenithRequest * request )
162
160
{
163
161
StringInfoData req_buff ;
164
162
StringInfoData resp_buff ;
@@ -175,7 +173,7 @@ zenith_call(ZenithRequest *request)
175
173
}
176
174
177
175
if (!connected )
178
- zenith_connect ();
176
+ pageserver_connect ();
179
177
180
178
req_buff = zm_pack_request (request );
181
179
@@ -184,21 +182,21 @@ zenith_call(ZenithRequest *request)
184
182
*
185
183
* In principle, this could block if the output buffer is full, and we
186
184
* should use async mode and check for interrupts while waiting. In
187
- * practice, our requests are small enough to always fit in the output and
188
- * TCP buffer.
185
+ * practice, our requests are small enough to always fit in the output
186
+ * and TCP buffer.
189
187
*/
190
188
if (PQputCopyData (pageserver_conn , req_buff .data , req_buff .len ) <= 0 || PQflush (pageserver_conn ))
191
189
{
192
- zenith_log (ERROR , "failed to send page request: %s" ,
193
- PQerrorMessage (pageserver_conn ));
190
+ neon_log (ERROR , "failed to send page request: %s" ,
191
+ PQerrorMessage (pageserver_conn ));
194
192
}
195
193
pfree (req_buff .data );
196
194
197
- if (message_level_is_interesting (PqPageStoreTrace ))
195
+ if (message_level_is_interesting (PageStoreTrace ))
198
196
{
199
197
char * msg = zm_to_string ((ZenithMessage * ) request );
200
198
201
- zenith_log ( PqPageStoreTrace , "Sent request: %s" , msg );
199
+ neon_log ( PageStoreTrace , "sent request: %s" , msg );
202
200
pfree (msg );
203
201
}
204
202
@@ -207,25 +205,20 @@ zenith_call(ZenithRequest *request)
207
205
resp_buff .cursor = 0 ;
208
206
209
207
if (resp_buff .len == -1 )
210
- zenith_log (ERROR , "end of COPY" );
208
+ neon_log (ERROR , "end of COPY" );
211
209
else if (resp_buff .len == -2 )
212
- zenith_log (ERROR , "could not read COPY data: %s" , PQerrorMessage (pageserver_conn ));
210
+ neon_log (ERROR , "could not read COPY data: %s" , PQerrorMessage (pageserver_conn ));
213
211
214
212
resp = zm_unpack_response (& resp_buff );
215
213
PQfreemem (resp_buff .data );
216
214
217
- if (message_level_is_interesting (PqPageStoreTrace ))
215
+ if (message_level_is_interesting (PageStoreTrace ))
218
216
{
219
217
char * msg = zm_to_string ((ZenithMessage * ) resp );
220
218
221
- zenith_log ( PqPageStoreTrace , "Got response: %s" , msg );
219
+ neon_log ( PageStoreTrace , "got response: %s" , msg );
222
220
pfree (msg );
223
221
}
224
-
225
- /*
226
- * XXX: zm_to_string leak strings. Check with what memory contex all this
227
- * methods are called.
228
- */
229
222
}
230
223
PG_CATCH ();
231
224
{
@@ -238,7 +231,7 @@ zenith_call(ZenithRequest *request)
238
231
*/
239
232
if (connected )
240
233
{
241
- zenith_log (LOG , "dropping connection to page server due to error" );
234
+ neon_log (LOG , "dropping connection to page server due to error" );
242
235
PQfinish (pageserver_conn );
243
236
pageserver_conn = NULL ;
244
237
connected = false;
@@ -271,11 +264,13 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
271
264
PQconninfoOption * conn_options ;
272
265
PQconninfoOption * conn_option ;
273
266
MemoryContext oldcontext ;
267
+
274
268
/*
275
- * Here we substitute password in connection string with an environment variable.
276
- * To simplify things we construct a connection string back with only known options.
277
- * In particular: host port user and password. We do not currently use other options and
278
- * constructing full connstring in an URI shape is quite messy.
269
+ * Here we substitute password in connection string with an environment
270
+ * variable. To simplify things we construct a connection string back with
271
+ * only known options. In particular: host port user and password. We do
272
+ * not currently use other options and constructing full connstring in an
273
+ * URI shape is quite messy.
279
274
*/
280
275
281
276
if (page_server_connstring_raw == NULL || page_server_connstring_raw [0 ] == '\0' )
@@ -302,15 +297,18 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
302
297
*/
303
298
for (conn_option = conn_options ; conn_option -> keyword != NULL ; conn_option ++ )
304
299
{
305
- if (strcmp (conn_option -> keyword , "host" ) == 0 ) {
300
+ if (strcmp (conn_option -> keyword , "host" ) == 0 )
301
+ {
306
302
if (conn_option -> val != NULL && conn_option -> val [0 ] != '\0' )
307
303
host = conn_option -> val ;
308
304
}
309
- else if (strcmp (conn_option -> keyword , "port" ) == 0 ) {
305
+ else if (strcmp (conn_option -> keyword , "port" ) == 0 )
306
+ {
310
307
if (conn_option -> val != NULL && conn_option -> val [0 ] != '\0' )
311
308
port = conn_option -> val ;
312
309
}
313
- else if (strcmp (conn_option -> keyword , "user" ) == 0 ) {
310
+ else if (strcmp (conn_option -> keyword , "user" ) == 0 )
311
+ {
314
312
if (conn_option -> val != NULL && conn_option -> val [0 ] != '\0' )
315
313
user = conn_option -> val ;
316
314
}
@@ -324,7 +322,7 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
324
322
(errcode (ERRCODE_CONNECTION_EXCEPTION ),
325
323
errmsg ("expected placeholder value in pageserver password starting from $ but found: %s" , & conn_option -> val [1 ])));
326
324
327
- zenith_log (LOG , "found auth token placeholder in pageserver conn string %s " , & conn_option -> val [1 ]);
325
+ neon_log (LOG , "found auth token placeholder in pageserver conn string '%s' " , & conn_option -> val [1 ]);
328
326
auth_token = getenv (& conn_option -> val [1 ]);
329
327
if (!auth_token )
330
328
{
@@ -334,12 +332,16 @@ substitute_pageserver_password(const char *page_server_connstring_raw)
334
332
}
335
333
else
336
334
{
337
- zenith_log (LOG , "using auth token from environment passed via env" );
335
+ neon_log (LOG , "using auth token from environment passed via env" );
338
336
}
339
337
}
340
338
}
341
339
}
342
- // allocate connection string in a TopMemoryContext to make sure it is not freed
340
+
341
+ /*
342
+ * allocate connection string in TopMemoryContext to make sure it is not
343
+ * freed
344
+ */
343
345
oldcontext = CurrentMemoryContext ;
344
346
MemoryContextSwitchTo (TopMemoryContext );
345
347
page_server_connstring = psprintf ("postgresql://%s:%s@%s:%s" , user , auth_token ? auth_token : "" , host , port );
@@ -398,15 +400,15 @@ _PG_init(void)
398
400
-1 , -1 , INT_MAX ,
399
401
PGC_SIGHUP ,
400
402
GUC_UNIT_MB ,
401
- NULL , NULL , NULL );
403
+ NULL , NULL , NULL );
402
404
403
405
relsize_hash_init ();
404
406
EmitWarningsOnPlaceholders ("neon" );
405
407
406
408
if (page_server != NULL )
407
- zenith_log (ERROR , "libpqpagestore already loaded" );
409
+ neon_log (ERROR , "libpagestore already loaded" );
408
410
409
- zenith_log ( PqPageStoreTrace , "libpqpagestore already loaded" );
411
+ neon_log ( PageStoreTrace , "libpagestore already loaded" );
410
412
page_server = & api ;
411
413
412
414
/* substitute password in pageserver_connstring */
@@ -415,18 +417,22 @@ _PG_init(void)
415
417
/* Is there more correct way to pass CustomGUC to postgres code? */
416
418
zenith_timeline_walproposer = zenith_timeline ;
417
419
zenith_tenant_walproposer = zenith_tenant ;
418
- /* Walproposer instructcs safekeeper which pageserver to use for replication */
420
+
421
+ /*
422
+ * Walproposer instructs safekeeper which pageserver to use for
423
+ * replication
424
+ */
419
425
zenith_pageserver_connstring_walproposer = page_server_connstring ;
420
426
421
427
if (wal_redo )
422
428
{
423
- zenith_log ( PqPageStoreTrace , "set inmem_smgr hook" );
429
+ neon_log ( PageStoreTrace , "set inmem_smgr hook" );
424
430
smgr_hook = smgr_inmem ;
425
431
smgr_init_hook = smgr_init_inmem ;
426
432
}
427
433
else if (page_server_connstring && page_server_connstring [0 ])
428
434
{
429
- zenith_log ( PqPageStoreTrace , "set zenith_smgr hook" );
435
+ neon_log ( PageStoreTrace , "set neon_smgr hook" );
430
436
smgr_hook = smgr_zenith ;
431
437
smgr_init_hook = smgr_init_zenith ;
432
438
dbsize_hook = zenith_dbsize ;
0 commit comments