Skip to content

Commit

Permalink
safekeeper: add term_bump endpoint.
Browse files Browse the repository at this point in the history
When walproposer observes now higher term it restarts instead of
crashing whole compute with PANIC; this avoids compute crash after
term_bump call. After successfull election we're still checking
last_log_term of the highest given vote to ensure basebackup is good,
and PANIC otherwise.

It will be used for migration per
035-safekeeper-dynamic-membership-change.md
and
neondatabase/docs#21

ref neondatabase#8700
  • Loading branch information
arssher committed Sep 6, 2024
1 parent af6f636 commit 11cf16e
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 11 deletions.
13 changes: 13 additions & 0 deletions libs/safekeeper_api/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,16 @@ pub struct TimelineCopyRequest {
pub target_timeline_id: TimelineId,
pub until_lsn: Lsn,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpRequest {
/// bump to
pub term: Option<u64>,
}

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimelineTermBumpResponse {
// before the request
pub previous_term: u64,
pub current_term: u64,
}
24 changes: 16 additions & 8 deletions pgxn/neon/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp)
if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp))
{
/*
* However, allow to proceed if previously elected leader was me;
* plain restart of walproposer not intervened by concurrent
* compute (who could generate WAL) is ok.
* However, allow to proceed if last_log_term on the node which gave
* the highest vote (i.e. point where we are going to start writing)
* actually had been won by me; plain restart of walproposer not
* intervened by concurrent compute which wrote WAL is ok.
*
* This avoids compute crash after manual term_bump.
*/
if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term ==
pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm))))
Expand Down Expand Up @@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk)
if (sk->appendResponse.term > wp->propTerm)
{
/*
* Another compute with higher term is running. Panic to restart
* PG as we likely need to retake basebackup. However, don't dump
* core as this is kinda expected scenario.
*
* Term has changed to higher one, probably another compute is
* running. If this is the case we could PANIC as well because
* likely it inserted some data and our basebackup is unsuitable
* anymore. However, we also bump term manually (term_bump endpoint)
* on safekeepers for migration purposes, in this case we do want
* compute to stay alive. So restart walproposer with FATAL instead
* of panicking; if basebackup is spoiled next election will notice
* this.
*/
disable_core_dump();
wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us",
sk->host, sk->port,
sk->appendResponse.term, wp->propTerm);
}
Expand Down
3 changes: 3 additions & 0 deletions safekeeper/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use utils::auth::{AuthError, Claims, Scope};
use utils::id::TenantId;

/// If tenant_id is provided, allow if token (claims) is for this tenant or
/// whole safekeeper scope (SafekeeperData). Else, allow only if token is
/// SafekeeperData.
pub fn check_permission(claims: &Claims, tenant_id: Option<TenantId>) -> Result<(), AuthError> {
match (&claims.scope, tenant_id) {
(Scope::Tenant, None) => Err(AuthError(
Expand Down
28 changes: 27 additions & 1 deletion safekeeper/src/http/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri
use utils::http::request::parse_query_param;

use postgres_ffi::WAL_SEGMENT_SIZE;
use safekeeper_api::models::TimelineCreateRequest;
use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest};
use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest};
use utils::{
auth::SwappableJwtAuth,
http::{
Expand Down Expand Up @@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request<Body>) -> Result<Respons
json_response(StatusCode::OK, response)
}

/// Make term at least as high as one in request. If one in request is None,
/// increment current one.
async fn timeline_term_bump_handler(
mut request: Request<Body>,
) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
parse_request_param(&request, "tenant_id")?,
parse_request_param(&request, "timeline_id")?,
);
check_permission(&request, Some(ttid.tenant_id))?;

let request_data: TimelineTermBumpRequest = json_request(&mut request).await?;

let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?;
let response = tli
.term_bump(request_data.term)
.await
.map_err(ApiError::InternalServerError)?;

json_response(StatusCode::OK, response)
}

/// Used only in tests to hand craft required data.
async fn record_safekeeper_info(mut request: Request<Body>) -> Result<Response<Body>, ApiError> {
let ttid = TenantTimelineId::new(
Expand Down Expand Up @@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder<hyper::Body, ApiError>
"/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset",
|r| request_span(r, timeline_backup_partial_reset),
)
.post(
"/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump",
|r| request_span(r, timeline_term_bump_handler),
)
.post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| {
request_span(r, record_safekeeper_info)
})
Expand Down
26 changes: 24 additions & 2 deletions safekeeper/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Defines per timeline data stored persistently (SafeKeeperPersistentState)
//! and its wrapper with in memory layer (SafekeeperState).

use std::ops::Deref;
use std::{cmp::max, ops::Deref};

use anyhow::Result;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use utils::{
id::{NodeId, TenantId, TenantTimelineId, TimelineId},
Expand All @@ -12,7 +13,7 @@ use utils::{

use crate::{
control_file,
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory},
safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory},
wal_backup_partial::{self},
};

Expand Down Expand Up @@ -211,6 +212,27 @@ where
let s = self.start_change();
self.finish_change(&s).await
}

/// Make term at least as `to`. If `to` is None, increment current one. This
/// is not in safekeeper.rs because we want to be able to do it even if
/// timeline is offloaded.
pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let before = self.acceptor_state.term;
let mut state = self.start_change();
let new = match to {
Some(to) => max(state.acceptor_state.term, to),
None => state.acceptor_state.term + 1,
};
if new > state.acceptor_state.term {
state.acceptor_state.term = new;
self.finish_change(&state).await?;
}
let after = self.acceptor_state.term;
Ok(TimelineTermBumpResponse {
previous_term: before,
current_term: after,
})
}
}

impl<CTRL> Deref for TimelineState<CTRL>
Expand Down
10 changes: 10 additions & 0 deletions safekeeper/src/timeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use anyhow::{anyhow, bail, Result};
use camino::Utf8PathBuf;
use remote_storage::RemotePath;
use safekeeper_api::models::TimelineTermBumpResponse;
use serde::{Deserialize, Serialize};
use tokio::fs::{self};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -215,6 +216,10 @@ impl StateSK {
.get_last_log_term(self.flush_lsn())
}

pub async fn term_bump(&mut self, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
self.state_mut().term_bump(to).await
}

/// Close open WAL files to release FDs.
fn close_wal_store(&mut self) {
if let StateSK::Loaded(sk) = self {
Expand Down Expand Up @@ -854,6 +859,11 @@ impl Timeline {
Ok(res)
}

pub async fn term_bump(self: &Arc<Self>, to: Option<Term>) -> Result<TimelineTermBumpResponse> {
let mut state = self.write_shared_state().await;
state.sk.term_bump(to).await
}

/// Get the timeline guard for reading/writing WAL files.
/// If WAL files are not present on disk (evicted), they will be automatically
/// downloaded from remote storage. This is done in the manager task, which is
Expand Down
29 changes: 29 additions & 0 deletions test_runner/fixtures/safekeeper/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,19 @@ def commit_lsn_inexact(self, tenant_id: TenantId, timeline_id: TimelineId):
).value


@dataclass
class TermBumpResponse:
previous_term: int
current_term: int

@classmethod
def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse":
return TermBumpResponse(
previous_term=d["previous_term"],
current_term=d["current_term"],
)


class SafekeeperHttpClient(requests.Session, MetricsGetter):
HTTPError = requests.HTTPError

Expand Down Expand Up @@ -252,6 +265,22 @@ def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId):
res.raise_for_status()
return res.json()

def term_bump(
self,
tenant_id: TenantId,
timeline_id: TimelineId,
term: Optional[int],
) -> TermBumpResponse:
body = {}
if term is not None:
body["term"] = term
res = self.post(
f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump",
json=body,
)
res.raise_for_status()
return TermBumpResponse.from_json(res.json())

def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body):
res = self.post(
f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}",
Expand Down
37 changes: 37 additions & 0 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2194,6 +2194,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder):
assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1"


def test_term_bump(neon_env_builder: NeonEnvBuilder):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()

tenant_id = env.initial_tenant
timeline_id = env.initial_timeline

endpoint = env.endpoints.create_start("main")
# initialize safekeeper
endpoint.safe_psql("create table t(key int, value text)")

http_cli = env.safekeepers[0].http_client()

# check that bump up to specific term works
curr_term = http_cli.timeline_status(tenant_id, timeline_id).term
bump_to = curr_term + 3
res = http_cli.term_bump(tenant_id, timeline_id, bump_to)
log.info(f"bump to {bump_to} res: {res}")
assert res.current_term >= bump_to

# check that bump to none increments current term
res = http_cli.term_bump(tenant_id, timeline_id, None)
log.info(f"bump to None res: {res}")
assert res.current_term > bump_to
assert res.current_term > res.previous_term

# check that bumping doesn't work downward
res = http_cli.term_bump(tenant_id, timeline_id, 2)
log.info(f"bump to 2 res: {res}")
assert res.current_term > bump_to
assert res.current_term == res.previous_term

# check that this doesn't kill endpoint because last WAL flush was his and
# thus its basebackup is still good
endpoint.safe_psql("insert into t values (1, 'payload')")


# Test disables periodic pushes from safekeeper to the broker and checks that
# pageserver can still discover safekeepers with discovery requests.
def test_broker_discovery(neon_env_builder: NeonEnvBuilder):
Expand Down

0 comments on commit 11cf16e

Please sign in to comment.