Skip to content

Commit ee5c705

Browse files
Rename ZenithFeedback
1 parent 50b6edf commit ee5c705

File tree

4 files changed

+71
-71
lines changed

4 files changed

+71
-71
lines changed

contrib/neon/neon.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ backpressure_lsns(PG_FUNCTION_ARGS)
4949
bool nulls[3];
5050
TupleDesc tupdesc;
5151

52-
zenith_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
52+
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
5353

5454
tupdesc = CreateTemplateTupleDesc(3);
5555
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "received_lsn", PG_LSNOID, -1, 0);

src/backend/replication/walproposer.c

Lines changed: 45 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1783,9 +1783,9 @@ RecvAppendResponses(Safekeeper *sk)
17831783
return sk->state == SS_ACTIVE;
17841784
}
17851785

1786-
/* Parse a ZenithFeedback message, or the ZenithFeedback part of an AppendResponse */
1786+
/* Parse a ReplicationFeedback message, or the ReplicationFeedback part of an AppendResponse */
17871787
void
1788-
ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
1788+
ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *rf)
17891789
{
17901790
uint8 nkeys;
17911791
int i;
@@ -1800,42 +1800,42 @@ ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
18001800
if (strcmp(key, "current_timeline_size") == 0)
18011801
{
18021802
pq_getmsgint(reply_message, sizeof(int32)); // read value length
1803-
zf->currentClusterSize = pq_getmsgint64(reply_message);
1804-
elog(DEBUG2, "ParseZenithFeedbackMessage: current_timeline_size %lu",
1805-
zf->currentClusterSize);
1803+
rf->currentClusterSize = pq_getmsgint64(reply_message);
1804+
elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu",
1805+
rf->currentClusterSize);
18061806
}
18071807
else if (strcmp(key, "ps_writelsn") == 0)
18081808
{
18091809
pq_getmsgint(reply_message, sizeof(int32)); // read value length
1810-
zf->ps_writelsn = pq_getmsgint64(reply_message);
1811-
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_writelsn %X/%X",
1812-
LSN_FORMAT_ARGS(zf->ps_writelsn));
1810+
rf->ps_writelsn = pq_getmsgint64(reply_message);
1811+
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X",
1812+
LSN_FORMAT_ARGS(rf->ps_writelsn));
18131813
}
18141814
else if (strcmp(key, "ps_flushlsn") == 0)
18151815
{
18161816
pq_getmsgint(reply_message, sizeof(int32)); // read value length
1817-
zf->ps_flushlsn = pq_getmsgint64(reply_message);
1818-
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_flushlsn %X/%X",
1819-
LSN_FORMAT_ARGS(zf->ps_flushlsn));
1817+
rf->ps_flushlsn = pq_getmsgint64(reply_message);
1818+
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X",
1819+
LSN_FORMAT_ARGS(rf->ps_flushlsn));
18201820
}
18211821
else if (strcmp(key, "ps_applylsn") == 0)
18221822
{
18231823
pq_getmsgint(reply_message, sizeof(int32)); // read value length
1824-
zf->ps_applylsn = pq_getmsgint64(reply_message);
1825-
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_applylsn %X/%X",
1826-
LSN_FORMAT_ARGS(zf->ps_applylsn));
1824+
rf->ps_applylsn = pq_getmsgint64(reply_message);
1825+
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X",
1826+
LSN_FORMAT_ARGS(rf->ps_applylsn));
18271827
}
18281828
else if (strcmp(key, "ps_replytime") == 0)
18291829
{
18301830
pq_getmsgint(reply_message, sizeof(int32)); // read value length
1831-
zf->ps_replytime = pq_getmsgint64(reply_message);
1831+
rf->ps_replytime = pq_getmsgint64(reply_message);
18321832
{
18331833
char *replyTimeStr;
18341834

18351835
/* Copy because timestamptz_to_str returns a static buffer */
1836-
replyTimeStr = pstrdup(timestamptz_to_str(zf->ps_replytime));
1837-
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_replytime %lu reply_time: %s",
1838-
zf->ps_replytime, replyTimeStr);
1836+
replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime));
1837+
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s",
1838+
rf->ps_replytime, replyTimeStr);
18391839

18401840
pfree(replyTimeStr);
18411841
}
@@ -1844,7 +1844,7 @@ ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
18441844
{
18451845
len = pq_getmsgint(reply_message, sizeof(int32)); // read value length
18461846
// Skip unknown keys to support backward compatibile protocol changes
1847-
elog(LOG, "ParseZenithFeedbackMessage: unknown key: %s len %d", key, len);
1847+
elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len);
18481848
pq_getmsgbytes(reply_message, len);
18491849
};
18501850
}
@@ -1924,7 +1924,7 @@ GetAcknowledgedByQuorumWALPosition(void)
19241924
}
19251925

19261926
/*
1927-
* ZenithFeedbackShmemSize --- report amount of shared memory space needed
1927+
* ReplicationFeedbackShmemSize --- report amount of shared memory space needed
19281928
*/
19291929
Size
19301930
WalproposerShmemSize(void)
@@ -1953,16 +1953,16 @@ WalproposerShmemInit(void)
19531953
}
19541954

19551955
void
1956-
zenith_feedback_set(ZenithFeedback *zf)
1956+
replication_feedback_set(ReplicationFeedback *rf)
19571957
{
19581958
SpinLockAcquire(&walprop_shared->mutex);
1959-
memcpy(&walprop_shared->feedback, zf, sizeof(ZenithFeedback));
1959+
memcpy(&walprop_shared->feedback, rf, sizeof(ReplicationFeedback));
19601960
SpinLockRelease(&walprop_shared->mutex);
19611961
}
19621962

19631963

19641964
void
1965-
zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
1965+
replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
19661966
{
19671967
SpinLockAcquire(&walprop_shared->mutex);
19681968
*writeLsn = walprop_shared->feedback.ps_writelsn;
@@ -1973,37 +1973,37 @@ zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr
19731973

19741974

19751975
/*
1976-
* Get ZenithFeedback fields from the most advanced safekeeper
1976+
* Get ReplicationFeedback fields from the most advanced safekeeper
19771977
*/
19781978
static void
1979-
GetLatestZentihFeedback(ZenithFeedback *zf)
1979+
GetLatestZentihFeedback(ReplicationFeedback *rf)
19801980
{
19811981
int latest_safekeeper = 0;
19821982
XLogRecPtr ps_writelsn = InvalidXLogRecPtr;
19831983
for (int i = 0; i < n_safekeepers; i++)
19841984
{
1985-
if (safekeeper[i].appendResponse.zf.ps_writelsn > ps_writelsn)
1985+
if (safekeeper[i].appendResponse.rf.ps_writelsn > ps_writelsn)
19861986
{
19871987
latest_safekeeper = i;
1988-
ps_writelsn = safekeeper[i].appendResponse.zf.ps_writelsn;
1988+
ps_writelsn = safekeeper[i].appendResponse.rf.ps_writelsn;
19891989
}
19901990
}
19911991

1992-
zf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.zf.currentClusterSize;
1993-
zf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_writelsn;
1994-
zf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_flushlsn;
1995-
zf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_applylsn;
1996-
zf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.zf.ps_replytime;
1992+
rf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize;
1993+
rf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_writelsn;
1994+
rf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_flushlsn;
1995+
rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn;
1996+
rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime;
19971997

19981998
elog(DEBUG2, "GetLatestZentihFeedback: currentClusterSize %lu,"
19991999
" ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu",
2000-
zf->currentClusterSize,
2001-
LSN_FORMAT_ARGS(zf->ps_writelsn),
2002-
LSN_FORMAT_ARGS(zf->ps_flushlsn),
2003-
LSN_FORMAT_ARGS(zf->ps_applylsn),
2004-
zf->ps_replytime);
2000+
rf->currentClusterSize,
2001+
LSN_FORMAT_ARGS(rf->ps_writelsn),
2002+
LSN_FORMAT_ARGS(rf->ps_flushlsn),
2003+
LSN_FORMAT_ARGS(rf->ps_applylsn),
2004+
rf->ps_replytime);
20052005

2006-
zenith_feedback_set(zf);
2006+
replication_feedback_set(rf);
20072007
}
20082008

20092009
static void
@@ -2016,16 +2016,16 @@ HandleSafekeeperResponse(void)
20162016

20172017

20182018
minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
2019-
diskConsistentLsn = quorumFeedback.zf.ps_flushlsn;
2019+
diskConsistentLsn = quorumFeedback.rf.ps_flushlsn;
20202020

20212021
if (!syncSafekeepers)
20222022
{
2023-
// Get ZenithFeedback fields from the most advanced safekeeper
2024-
GetLatestZentihFeedback(&quorumFeedback.zf);
2025-
SetZenithCurrentClusterSize(quorumFeedback.zf.currentClusterSize);
2023+
// Get ReplicationFeedback fields from the most advanced safekeeper
2024+
GetLatestZentihFeedback(&quorumFeedback.rf);
2025+
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
20262026
}
20272027

2028-
if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.zf.ps_flushlsn)
2028+
if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.ps_flushlsn)
20292029
{
20302030

20312031
if (minQuorumLsn > quorumFeedback.flushLsn)
@@ -2039,7 +2039,7 @@ HandleSafekeeperResponse(void)
20392039
//flush_lsn - This is what durably stored in WAL service.
20402040
quorumFeedback.flushLsn,
20412041
//apply_lsn - This is what processed and durably saved at pageserver.
2042-
quorumFeedback.zf.ps_flushlsn,
2042+
quorumFeedback.rf.ps_flushlsn,
20432043
GetCurrentTimestamp(), false);
20442044
}
20452045

@@ -2222,7 +2222,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
22222222
msg->hs.xmin.value = pq_getmsgint64_le(&s);
22232223
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
22242224
if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE)
2225-
ParseZenithFeedbackMessage(&s, &msg->zf);
2225+
ParseReplicationFeedbackMessage(&s, &msg->rf);
22262226
pq_getmsgend(&s);
22272227
return true;
22282228
}

src/backend/replication/walsender.c

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ void StartReplication(StartReplicationCmd *cmd);
239239
static void StartLogicalReplication(StartReplicationCmd *cmd);
240240
static void ProcessStandbyMessage(void);
241241
static void ProcessStandbyReplyMessage(void);
242-
static void ProcessZenithFeedbackMessage(void);
242+
static void ProcessReplicationFeedbackMessage(void);
243243
static void ProcessStandbyHSFeedbackMessage(void);
244244
static void ProcessRepliesIfAny(void);
245245
static void WalSndKeepalive(bool requestReply);
@@ -1850,7 +1850,7 @@ ProcessStandbyMessage(void)
18501850
break;
18511851

18521852
case 'z':
1853-
ProcessZenithFeedbackMessage();
1853+
ProcessReplicationFeedbackMessage();
18541854
break;
18551855

18561856
default:
@@ -1925,25 +1925,25 @@ ProcessStandbyReplyMessage(void)
19251925
LSN_FORMAT_ARGS(applyPtr));
19261926
}
19271927

1928-
// This message is a zenith extension of postgres replication protocol
1928+
// This message is a neon extension of postgres replication protocol
19291929
static void
1930-
ProcessZenithFeedbackMessage(void)
1930+
ProcessReplicationFeedbackMessage(void)
19311931
{
1932-
ZenithFeedback zf;
1932+
ReplicationFeedback rf;
19331933

19341934
// consume message length
19351935
pq_getmsgint64(&reply_message);
19361936

1937-
ParseZenithFeedbackMessage(&reply_message, &zf);
1937+
ParseReplicationFeedbackMessage(&reply_message, &rf);
19381938

1939-
zenith_feedback_set(&zf);
1939+
replication_feedback_set(&rf);
19401940

1941-
SetZenithCurrentClusterSize(zf.currentClusterSize);
1941+
SetZenithCurrentClusterSize(rf.currentClusterSize);
19421942

1943-
ProcessStandbyReply(zf.ps_writelsn,
1944-
zf.ps_flushlsn,
1945-
zf.ps_applylsn,
1946-
zf.ps_replytime,
1943+
ProcessStandbyReply(rf.ps_writelsn,
1944+
rf.ps_flushlsn,
1945+
rf.ps_applylsn,
1946+
rf.ps_replytime,
19471947
false);
19481948
}
19491949

@@ -2030,7 +2030,7 @@ ProcessStandbyReply(XLogRecPtr writePtr,
20302030
if (!am_cascading_walsender)
20312031
SyncRepReleaseWaiters();
20322032

2033-
/*
2033+
/*
20342034
* walproposer use trunclateLsn instead of flushPtr for confirmed
20352035
* received location, so we shouldn't update restart_lsn here.
20362036
*/
@@ -3835,10 +3835,10 @@ backpressure_lag(void)
38353835
XLogRecPtr applyPtr;
38363836
XLogRecPtr myFlushLsn = GetFlushRecPtr();
38373837

3838-
zenith_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
3838+
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
38393839
#define MB ((XLogRecPtr)1024*1024)
38403840

3841-
elog(DEBUG2, "current flushLsn %X/%X ZenithFeedback: write %X/%X flush %X/%X apply %X/%X",
3841+
elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X",
38423842
LSN_FORMAT_ARGS(myFlushLsn),
38433843
LSN_FORMAT_ARGS(writePtr),
38443844
LSN_FORMAT_ARGS(flushPtr),

src/include/replication/walproposer.h

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ typedef struct HotStandbyFeedback
271271
} HotStandbyFeedback;
272272

273273

274-
typedef struct ZenithFeedback
274+
typedef struct ReplicationFeedback
275275
{
276276
// current size of the timeline on pageserver
277277
uint64 currentClusterSize;
@@ -280,13 +280,13 @@ typedef struct ZenithFeedback
280280
XLogRecPtr ps_flushlsn;
281281
XLogRecPtr ps_applylsn;
282282
TimestampTz ps_replytime;
283-
} ZenithFeedback;
283+
} ReplicationFeedback;
284284

285285

286286
typedef struct WalproposerShmemState
287287
{
288288
slock_t mutex;
289-
ZenithFeedback feedback;
289+
ReplicationFeedback feedback;
290290
term_t mineLastElectedTerm;
291291
} WalproposerShmemState;
292292

@@ -310,12 +310,12 @@ typedef struct AppendResponse
310310
// Feedback recieved from pageserver includes standby_status_update fields
311311
// and custom zenith feedback.
312312
// This part of the message is extensible.
313-
ZenithFeedback zf;
313+
ReplicationFeedback rf;
314314
} AppendResponse;
315315

316-
// ZenithFeedback is extensible part of the message that is parsed separately
316+
// ReplicationFeedback is extensible part of the message that is parsed separately
317317
// Other fields are fixed part
318-
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, zf)
318+
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)
319319

320320

321321
/*
@@ -388,15 +388,15 @@ void ProcessStandbyHSFeedback(TimestampTz replyTime,
388388
uint32 feedbackEpoch,
389389
TransactionId feedbackCatalogXmin,
390390
uint32 feedbackCatalogEpoch);
391-
void ParseZenithFeedbackMessage(StringInfo reply_message,
392-
ZenithFeedback *zf);
391+
void ParseReplicationFeedbackMessage(StringInfo reply_message,
392+
ReplicationFeedback *rf);
393393
void StartReplication(StartReplicationCmd *cmd);
394394
void WalProposerSync(int argc, char *argv[]);
395395

396396
Size WalproposerShmemSize(void);
397397
bool WalproposerShmemInit(void);
398-
void zenith_feedback_set(ZenithFeedback *zf);
399-
void zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
398+
void replication_feedback_set(ReplicationFeedback *rf);
399+
void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
400400

401401
/* libpqwalproposer hooks & helper type */
402402

0 commit comments

Comments
 (0)