Skip to content

Rename ZenithFeedback #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/neon/neon.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ backpressure_lsns(PG_FUNCTION_ARGS)
bool nulls[3];
TupleDesc tupdesc;

zenith_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);

tupdesc = CreateTemplateTupleDesc(3);
TupleDescInitEntry(tupdesc, (AttrNumber) 1, "received_lsn", PG_LSNOID, -1, 0);
Expand Down
90 changes: 45 additions & 45 deletions src/backend/replication/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1783,9 +1783,9 @@ RecvAppendResponses(Safekeeper *sk)
return sk->state == SS_ACTIVE;
}

/* Parse a ZenithFeedback message, or the ZenithFeedback part of an AppendResponse */
/* Parse a ReplicationFeedback message, or the ReplicationFeedback part of an AppendResponse */
void
ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *rf)
{
uint8 nkeys;
int i;
Expand All @@ -1800,42 +1800,42 @@ ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
if (strcmp(key, "current_timeline_size") == 0)
{
pq_getmsgint(reply_message, sizeof(int32)); // read value length
zf->currentClusterSize = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseZenithFeedbackMessage: current_timeline_size %lu",
zf->currentClusterSize);
rf->currentClusterSize = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: current_timeline_size %lu",
rf->currentClusterSize);
}
else if (strcmp(key, "ps_writelsn") == 0)
{
pq_getmsgint(reply_message, sizeof(int32)); // read value length
zf->ps_writelsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_writelsn %X/%X",
LSN_FORMAT_ARGS(zf->ps_writelsn));
rf->ps_writelsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_writelsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_writelsn));
}
else if (strcmp(key, "ps_flushlsn") == 0)
{
pq_getmsgint(reply_message, sizeof(int32)); // read value length
zf->ps_flushlsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_flushlsn %X/%X",
LSN_FORMAT_ARGS(zf->ps_flushlsn));
rf->ps_flushlsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_flushlsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_flushlsn));
}
else if (strcmp(key, "ps_applylsn") == 0)
{
pq_getmsgint(reply_message, sizeof(int32)); // read value length
zf->ps_applylsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_applylsn %X/%X",
LSN_FORMAT_ARGS(zf->ps_applylsn));
rf->ps_applylsn = pq_getmsgint64(reply_message);
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_applylsn %X/%X",
LSN_FORMAT_ARGS(rf->ps_applylsn));
}
else if (strcmp(key, "ps_replytime") == 0)
{
pq_getmsgint(reply_message, sizeof(int32)); // read value length
zf->ps_replytime = pq_getmsgint64(reply_message);
rf->ps_replytime = pq_getmsgint64(reply_message);
{
char *replyTimeStr;

/* Copy because timestamptz_to_str returns a static buffer */
replyTimeStr = pstrdup(timestamptz_to_str(zf->ps_replytime));
elog(DEBUG2, "ParseZenithFeedbackMessage: ps_replytime %lu reply_time: %s",
zf->ps_replytime, replyTimeStr);
replyTimeStr = pstrdup(timestamptz_to_str(rf->ps_replytime));
elog(DEBUG2, "ParseReplicationFeedbackMessage: ps_replytime %lu reply_time: %s",
rf->ps_replytime, replyTimeStr);

pfree(replyTimeStr);
}
Expand All @@ -1844,7 +1844,7 @@ ParseZenithFeedbackMessage(StringInfo reply_message, ZenithFeedback *zf)
{
len = pq_getmsgint(reply_message, sizeof(int32)); // read value length
// Skip unknown keys to support backward compatibile protocol changes
elog(LOG, "ParseZenithFeedbackMessage: unknown key: %s len %d", key, len);
elog(LOG, "ParseReplicationFeedbackMessage: unknown key: %s len %d", key, len);
pq_getmsgbytes(reply_message, len);
};
}
Expand Down Expand Up @@ -1924,7 +1924,7 @@ GetAcknowledgedByQuorumWALPosition(void)
}

/*
* ZenithFeedbackShmemSize --- report amount of shared memory space needed
* ReplicationFeedbackShmemSize --- report amount of shared memory space needed
*/
Size
WalproposerShmemSize(void)
Expand Down Expand Up @@ -1953,16 +1953,16 @@ WalproposerShmemInit(void)
}

void
zenith_feedback_set(ZenithFeedback *zf)
replication_feedback_set(ReplicationFeedback *rf)
{
SpinLockAcquire(&walprop_shared->mutex);
memcpy(&walprop_shared->feedback, zf, sizeof(ZenithFeedback));
memcpy(&walprop_shared->feedback, rf, sizeof(ReplicationFeedback));
SpinLockRelease(&walprop_shared->mutex);
}


void
zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn)
{
SpinLockAcquire(&walprop_shared->mutex);
*writeLsn = walprop_shared->feedback.ps_writelsn;
Expand All @@ -1973,37 +1973,37 @@ zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr


/*
* Get ZenithFeedback fields from the most advanced safekeeper
* Get ReplicationFeedback fields from the most advanced safekeeper
*/
static void
GetLatestZentihFeedback(ZenithFeedback *zf)
GetLatestZentihFeedback(ReplicationFeedback *rf)
{
int latest_safekeeper = 0;
XLogRecPtr ps_writelsn = InvalidXLogRecPtr;
for (int i = 0; i < n_safekeepers; i++)
{
if (safekeeper[i].appendResponse.zf.ps_writelsn > ps_writelsn)
if (safekeeper[i].appendResponse.rf.ps_writelsn > ps_writelsn)
{
latest_safekeeper = i;
ps_writelsn = safekeeper[i].appendResponse.zf.ps_writelsn;
ps_writelsn = safekeeper[i].appendResponse.rf.ps_writelsn;
}
}

zf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.zf.currentClusterSize;
zf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_writelsn;
zf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_flushlsn;
zf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.zf.ps_applylsn;
zf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.zf.ps_replytime;
rf->currentClusterSize = safekeeper[latest_safekeeper].appendResponse.rf.currentClusterSize;
rf->ps_writelsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_writelsn;
rf->ps_flushlsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_flushlsn;
rf->ps_applylsn = safekeeper[latest_safekeeper].appendResponse.rf.ps_applylsn;
rf->ps_replytime = safekeeper[latest_safekeeper].appendResponse.rf.ps_replytime;

elog(DEBUG2, "GetLatestZentihFeedback: currentClusterSize %lu,"
" ps_writelsn %X/%X, ps_flushlsn %X/%X, ps_applylsn %X/%X, ps_replytime %lu",
zf->currentClusterSize,
LSN_FORMAT_ARGS(zf->ps_writelsn),
LSN_FORMAT_ARGS(zf->ps_flushlsn),
LSN_FORMAT_ARGS(zf->ps_applylsn),
zf->ps_replytime);
rf->currentClusterSize,
LSN_FORMAT_ARGS(rf->ps_writelsn),
LSN_FORMAT_ARGS(rf->ps_flushlsn),
LSN_FORMAT_ARGS(rf->ps_applylsn),
rf->ps_replytime);

zenith_feedback_set(zf);
replication_feedback_set(rf);
}

static void
Expand All @@ -2016,16 +2016,16 @@ HandleSafekeeperResponse(void)


minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
diskConsistentLsn = quorumFeedback.zf.ps_flushlsn;
diskConsistentLsn = quorumFeedback.rf.ps_flushlsn;

if (!syncSafekeepers)
{
// Get ZenithFeedback fields from the most advanced safekeeper
GetLatestZentihFeedback(&quorumFeedback.zf);
SetZenithCurrentClusterSize(quorumFeedback.zf.currentClusterSize);
// Get ReplicationFeedback fields from the most advanced safekeeper
GetLatestZentihFeedback(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);
}

if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.zf.ps_flushlsn)
if (minQuorumLsn > quorumFeedback.flushLsn || diskConsistentLsn != quorumFeedback.rf.ps_flushlsn)
{

if (minQuorumLsn > quorumFeedback.flushLsn)
Expand All @@ -2039,7 +2039,7 @@ HandleSafekeeperResponse(void)
//flush_lsn - This is what durably stored in WAL service.
quorumFeedback.flushLsn,
//apply_lsn - This is what processed and durably saved at pageserver.
quorumFeedback.zf.ps_flushlsn,
quorumFeedback.rf.ps_flushlsn,
GetCurrentTimestamp(), false);
}

Expand Down Expand Up @@ -2222,7 +2222,7 @@ AsyncReadMessage(Safekeeper *sk, AcceptorProposerMessage *anymsg)
msg->hs.xmin.value = pq_getmsgint64_le(&s);
msg->hs.catalog_xmin.value = pq_getmsgint64_le(&s);
if (buf_size > APPENDRESPONSE_FIXEDPART_SIZE)
ParseZenithFeedbackMessage(&s, &msg->zf);
ParseReplicationFeedbackMessage(&s, &msg->rf);
pq_getmsgend(&s);
return true;
}
Expand Down
30 changes: 15 additions & 15 deletions src/backend/replication/walsender.c
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ void StartReplication(StartReplicationCmd *cmd);
static void StartLogicalReplication(StartReplicationCmd *cmd);
static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessZenithFeedbackMessage(void);
static void ProcessReplicationFeedbackMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
static void WalSndKeepalive(bool requestReply);
Expand Down Expand Up @@ -1850,7 +1850,7 @@ ProcessStandbyMessage(void)
break;

case 'z':
ProcessZenithFeedbackMessage();
ProcessReplicationFeedbackMessage();
break;

default:
Expand Down Expand Up @@ -1925,25 +1925,25 @@ ProcessStandbyReplyMessage(void)
LSN_FORMAT_ARGS(applyPtr));
}

// This message is a zenith extension of postgres replication protocol
// This message is a neon extension of postgres replication protocol
static void
ProcessZenithFeedbackMessage(void)
ProcessReplicationFeedbackMessage(void)
{
ZenithFeedback zf;
ReplicationFeedback rf;

// consume message length
pq_getmsgint64(&reply_message);

ParseZenithFeedbackMessage(&reply_message, &zf);
ParseReplicationFeedbackMessage(&reply_message, &rf);

zenith_feedback_set(&zf);
replication_feedback_set(&rf);

SetZenithCurrentClusterSize(zf.currentClusterSize);
SetZenithCurrentClusterSize(rf.currentClusterSize);

ProcessStandbyReply(zf.ps_writelsn,
zf.ps_flushlsn,
zf.ps_applylsn,
zf.ps_replytime,
ProcessStandbyReply(rf.ps_writelsn,
rf.ps_flushlsn,
rf.ps_applylsn,
rf.ps_replytime,
false);
}

Expand Down Expand Up @@ -2030,7 +2030,7 @@ ProcessStandbyReply(XLogRecPtr writePtr,
if (!am_cascading_walsender)
SyncRepReleaseWaiters();

/*
/*
* walproposer use trunclateLsn instead of flushPtr for confirmed
* received location, so we shouldn't update restart_lsn here.
*/
Expand Down Expand Up @@ -3835,10 +3835,10 @@ backpressure_lag(void)
XLogRecPtr applyPtr;
XLogRecPtr myFlushLsn = GetFlushRecPtr();

zenith_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr);
#define MB ((XLogRecPtr)1024*1024)

elog(DEBUG2, "current flushLsn %X/%X ZenithFeedback: write %X/%X flush %X/%X apply %X/%X",
elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X",
LSN_FORMAT_ARGS(myFlushLsn),
LSN_FORMAT_ARGS(writePtr),
LSN_FORMAT_ARGS(flushPtr),
Expand Down
20 changes: 10 additions & 10 deletions src/include/replication/walproposer.h
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ typedef struct HotStandbyFeedback
} HotStandbyFeedback;


typedef struct ZenithFeedback
typedef struct ReplicationFeedback
{
// current size of the timeline on pageserver
uint64 currentClusterSize;
Expand All @@ -280,13 +280,13 @@ typedef struct ZenithFeedback
XLogRecPtr ps_flushlsn;
XLogRecPtr ps_applylsn;
TimestampTz ps_replytime;
} ZenithFeedback;
} ReplicationFeedback;


typedef struct WalproposerShmemState
{
slock_t mutex;
ZenithFeedback feedback;
ReplicationFeedback feedback;
term_t mineLastElectedTerm;
} WalproposerShmemState;

Expand All @@ -310,12 +310,12 @@ typedef struct AppendResponse
// Feedback recieved from pageserver includes standby_status_update fields
// and custom zenith feedback.
// This part of the message is extensible.
ZenithFeedback zf;
ReplicationFeedback rf;
} AppendResponse;

// ZenithFeedback is extensible part of the message that is parsed separately
// ReplicationFeedback is extensible part of the message that is parsed separately
// Other fields are fixed part
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, zf)
#define APPENDRESPONSE_FIXEDPART_SIZE offsetof(AppendResponse, rf)


/*
Expand Down Expand Up @@ -388,15 +388,15 @@ void ProcessStandbyHSFeedback(TimestampTz replyTime,
uint32 feedbackEpoch,
TransactionId feedbackCatalogXmin,
uint32 feedbackCatalogEpoch);
void ParseZenithFeedbackMessage(StringInfo reply_message,
ZenithFeedback *zf);
void ParseReplicationFeedbackMessage(StringInfo reply_message,
ReplicationFeedback *rf);
void StartReplication(StartReplicationCmd *cmd);
void WalProposerSync(int argc, char *argv[]);

Size WalproposerShmemSize(void);
bool WalproposerShmemInit(void);
void zenith_feedback_set(ZenithFeedback *zf);
void zenith_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);
void replication_feedback_set(ReplicationFeedback *rf);
void replication_feedback_get_lsns(XLogRecPtr *writeLsn, XLogRecPtr *flushLsn, XLogRecPtr *applyLsn);

/* libpqwalproposer hooks & helper type */

Expand Down