Skip to content

Commit c73874e

Browse files
committed
Add hooks to support extensible interconnect
The interconnect module is traditionally single-thread. It's hard to refactor/replace the current implementation for thread-safe. This commit adds hooks to support extensible interconnect. The hooks contain 3 functions in struct ExtInterconnectFuncs: struct ExtInterconnectFuncs { void (*init)(); int (*port)(); void (*exit)(); }; init: create socket to receive interconnect packets. port: return the port number created in init. exit: disconnect all interconnections and cleanup related resources.
1 parent 9d86458 commit c73874e

File tree

10 files changed

+65
-23
lines changed

10 files changed

+65
-23
lines changed

src/backend/cdb/cdbutil.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979

8080
MemoryContext CdbComponentsContext = NULL;
8181
static CdbComponentDatabases *cdb_component_dbs = NULL;
82+
const struct ExtInterconnectFuncs *ext_interconnect = NULL;
8283

8384
#ifdef USE_INTERNAL_FTS
8485

@@ -1149,6 +1150,9 @@ cdb_setup(void)
11491150
ensureInterconnectAddress();
11501151
/* Initialize the Motion Layer IPC subsystem. */
11511152
CurrentMotionIPCLayer->InitMotionLayerIPC();
1153+
1154+
if (ext_interconnect && ext_interconnect->init)
1155+
ext_interconnect->init();
11521156
}
11531157

11541158
/*
@@ -1213,6 +1217,9 @@ cdb_cleanup(int code pg_attribute_unused(), Datum arg
12131217
{
12141218
/* shutdown our listener socket */
12151219
CurrentMotionIPCLayer->CleanUpMotionLayerIPC();
1220+
1221+
if (ext_interconnect && ext_interconnect->exit)
1222+
ext_interconnect->exit();
12161223
}
12171224
}
12181225

@@ -3542,6 +3549,9 @@ cdb_setup(void)
35423549
ensureInterconnectAddress();
35433550
/* Initialize the Motion Layer IPC subsystem. */
35443551
CurrentMotionIPCLayer->InitMotionLayerIPC();
3552+
3553+
if (ext_interconnect && ext_interconnect->init)
3554+
ext_interconnect->init();
35453555
}
35463556

35473557
/*
@@ -3595,6 +3605,9 @@ cdb_cleanup(int code pg_attribute_unused(), Datum arg
35953605
{
35963606
/* shutdown our listener socket */
35973607
CurrentMotionIPCLayer->CleanUpMotionLayerIPC();
3608+
3609+
if (ext_interconnect && ext_interconnect->exit)
3610+
ext_interconnect->exit();
35983611
}
35993612
}
36003613

src/backend/cdb/dispatcher/cdbconn.c

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
#include "cdb/cdbgang.h"
3232

3333

34-
static uint32 cdbconn_get_motion_listener_port(PGconn *conn);
34+
static int64 cdbconn_get_motion_listener_port(PGconn *conn);
3535
static void cdbconn_disconnect(SegmentDatabaseDescriptor *segdbDesc);
3636

3737
static void MPPnoticeReceiver(void *arg, const PGresult *res);
@@ -83,6 +83,7 @@ cdbconn_createSegmentDescriptor(struct CdbComponentDatabaseInfo *cdbinfo, int id
8383
/* Connection info, set in function cdbconn_doConnect */
8484
segdbDesc->conn = NULL;
8585
segdbDesc->motionListener = 0;
86+
segdbDesc->motionExtListener = 0;
8687
segdbDesc->backendPid = 0;
8788

8889
/* whoami */
@@ -294,16 +295,19 @@ cdbconn_doConnectComplete(SegmentDatabaseDescriptor *segdbDesc)
294295
* giving us the TCP port number where it listens for connections from the
295296
* gang below.
296297
*/
297-
segdbDesc->motionListener = cdbconn_get_motion_listener_port(segdbDesc->conn);
298+
int64 ports = cdbconn_get_motion_listener_port(segdbDesc->conn);
299+
segdbDesc->motionListener = (uint32) (ports & 0xFFFFFFFF);
300+
segdbDesc->motionExtListener = ports >> 32;
298301
segdbDesc->backendPid = PQbackendPID(segdbDesc->conn);
299302

300303
if (segdbDesc->motionListener != 0 &&
301304
gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
302305
{
303-
elog(LOG, "Connected to %s motionListenerPorts=%u/%u with options %s",
306+
elog(LOG, "Connected to %s motionListenerPorts=%u/%u/%u with options %s",
304307
segdbDesc->whoami,
305308
(segdbDesc->motionListener & 0x0ffff),
306309
((segdbDesc->motionListener >> 16) & 0x0ffff),
310+
segdbDesc->motionExtListener,
307311
PQoptions(segdbDesc->conn));
308312
}
309313
}
@@ -483,25 +487,32 @@ cdbconn_signalQE(SegmentDatabaseDescriptor *segdbDesc,
483487

484488

485489
/* GPDB function to retrieve QE-backend details (motion listener) */
486-
static uint32
490+
static int64
487491
cdbconn_get_motion_listener_port(PGconn *conn)
488492
{
489493
const char *val;
490494
char *endptr;
491-
uint32 result;
495+
int64 result;
496+
int64 ext_port;
492497

493498
val = PQparameterStatus(conn, "qe_listener_port");
494499
if (!val)
495500
return 0;
496501

497502
errno = 0;
498-
result = strtoul(val, &endptr, 10);
503+
result = strtol(val, &endptr, 10);
504+
if (endptr == val || *endptr != ':' || errno == ERANGE)
505+
return 0;
506+
507+
val = endptr + 1;
508+
ext_port = strtol(val, &endptr, 10);
499509
if (endptr == val || *endptr != '\0' || errno == ERANGE)
500510
return 0;
501511

502-
return result;
503-
}
512+
if (result < 0 || ext_port < 0) return 0;
504513

514+
return (result | (ext_port << 32));
515+
}
505516

506517
/*-------------------------------------------------------------------------
507518
* QE Notice receiver support

src/backend/cdb/dispatcher/cdbgang.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -627,6 +627,7 @@ makeCdbProcess(SegmentDatabaseDescriptor *segdbDesc)
627627
process->listenerAddr = pstrdup(qeinfo->config->hostip);
628628

629629
process->listenerPort = (segdbDesc->motionListener & 0x0ffff);
630+
process->listenerExtPort = segdbDesc->motionExtListener;
630631
process->pid = segdbDesc->backendPid;
631632
process->contentid = segdbDesc->segindex;
632633
process->dbid = qeinfo->config->dbid;
@@ -1101,6 +1102,7 @@ gp_backend_info(PG_FUNCTION_ARGS)
11011102
qddesc->segindex = -1;
11021103
qddesc->conn = NULL;
11031104
qddesc->motionListener = 0;
1105+
qddesc->motionExtListener = 0;
11041106
qddesc->backendPid = MyProcPid;
11051107
qddesc->whoami = NULL;
11061108
qddesc->isWriter = false;

src/backend/nodes/copyfuncs.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5747,6 +5747,7 @@ _copyCdbProcess(const CdbProcess *from)
57475747

57485748
COPY_STRING_FIELD(listenerAddr);
57495749
COPY_SCALAR_FIELD(listenerPort);
5750+
COPY_SCALAR_FIELD(listenerExtPort);
57505751
COPY_SCALAR_FIELD(pid);
57515752
COPY_SCALAR_FIELD(contentid);
57525753
COPY_SCALAR_FIELD(dbid);

src/backend/nodes/outfuncs_common.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,7 @@ _outCdbProcess(StringInfo str, const CdbProcess *node)
14601460
WRITE_NODE_TYPE("CDBPROCESS");
14611461
WRITE_STRING_FIELD(listenerAddr);
14621462
WRITE_INT_FIELD(listenerPort);
1463+
WRITE_INT_FIELD(listenerExtPort);
14631464
WRITE_INT_FIELD(pid);
14641465
WRITE_INT_FIELD(contentid);
14651466
WRITE_INT_FIELD(dbid);

src/backend/nodes/readfuncs_common.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -536,6 +536,7 @@ _readCdbProcess(void)
536536

537537
READ_STRING_FIELD(listenerAddr);
538538
READ_INT_FIELD(listenerPort);
539+
READ_INT_FIELD(listenerExtPort);
539540
READ_INT_FIELD(pid);
540541
READ_INT_FIELD(contentid);
541542
READ_INT_FIELD(dbid);

src/backend/tcop/dest.c

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -329,9 +329,13 @@ void
329329
sendQEDetails(void)
330330
{
331331
StringInfoData msgbuf;
332-
char port_str[11];
332+
char port_str[32];
333+
int ext_port = 0;
333334

334-
snprintf(port_str, sizeof(port_str), "%u", CurrentMotionIPCLayer->GetListenPort());
335+
if (ext_interconnect && ext_interconnect->port)
336+
ext_port = ext_interconnect->port();
337+
338+
snprintf(port_str, sizeof(port_str), "%u:%d", CurrentMotionIPCLayer->GetListenPort(), ext_port);
335339

336340
pq_beginmessage(&msgbuf, 'S');
337341
pq_sendstring(&msgbuf, "qe_listener_port");

src/include/cdb/cdbconn.h

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,24 +37,25 @@ typedef struct SegmentDatabaseDescriptor
3737
*/
3838
int32 segindex;
3939

40-
/*
40+
/*
4141
* A non-NULL value points to the PGconn block of a successfully
4242
* established connection to the segment database.
4343
*/
4444
PGconn *conn;
4545

46-
/*
47-
* Connection info saved at most recent PQconnectdb.
48-
*
49-
* NB: Use malloc/free, not palloc/pfree, for the items below.
50-
*/
51-
uint32 motionListener; /* interconnect listener port */
52-
int32 backendPid;
53-
char *whoami; /* QE identifier for msgs */
54-
bool isWriter;
55-
int identifier; /* unique identifier in the cdbcomponent segment pool */
56-
double establishConnTime; /* the time of establish connection to the segment,
57-
* -1 means this connection is cached */
46+
/*
47+
* Connection info saved at most recent PQconnectdb.
48+
*
49+
* NB: Use malloc/free, not palloc/pfree, for the items below.
50+
*/
51+
uint32 motionListener; /* interconnect listener port */
52+
int32 backendPid;
53+
char *whoami; /* QE identifier for msgs */
54+
bool isWriter;
55+
uint16 motionExtListener; /* extensible interconnect listener port */
56+
int identifier; /* unique identifier in the cdbcomponent segment pool */
57+
double establishConnTime; /* the time of establish connection to the segment,
58+
* -1 means this connection is cached */
5859
} SegmentDatabaseDescriptor;
5960

6061
SegmentDatabaseDescriptor *

src/include/cdb/cdbgang.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ typedef struct CdbProcess
126126
char *listenerAddr; /* Interconnect listener IPv4 address, a C-string */
127127

128128
int listenerPort; /* Interconnect listener port */
129+
int listenerExtPort; /* extensible interconnect listener port */
129130
int pid; /* Backend PID of the process. */
130131

131132
int contentid;

src/include/cdb/cdbutil.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ typedef enum SegmentType
5151
SEGMENTTYPE_ANY
5252
}SegmentType;
5353

54+
struct ExtInterconnectFuncs {
55+
void (*init)();
56+
int (*port)();
57+
void (*exit)();
58+
};
59+
extern const struct ExtInterconnectFuncs *ext_interconnect;
60+
5461
/*
5562
* performs all necessary setup required for initializing Apache Cloudberry components.
5663
*

0 commit comments

Comments
 (0)