From 11cf16e3f363ce027e53b1834a77858d50daee0d Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Mon, 19 Aug 2024 14:42:07 +0300 Subject: [PATCH] safekeeper: add term_bump endpoint. When walproposer observes now higher term it restarts instead of crashing whole compute with PANIC; this avoids compute crash after term_bump call. After successfull election we're still checking last_log_term of the highest given vote to ensure basebackup is good, and PANIC otherwise. It will be used for migration per 035-safekeeper-dynamic-membership-change.md and https://github.com/neondatabase/docs/pull/21 ref https://github.com/neondatabase/neon/issues/8700 --- libs/safekeeper_api/src/models.rs | 13 +++++++++ pgxn/neon/walproposer.c | 24 ++++++++++----- safekeeper/src/auth.rs | 3 ++ safekeeper/src/http/routes.rs | 28 +++++++++++++++++- safekeeper/src/state.rs | 26 +++++++++++++++-- safekeeper/src/timeline.rs | 10 +++++++ test_runner/fixtures/safekeeper/http.py | 29 +++++++++++++++++++ test_runner/regress/test_wal_acceptor.py | 37 ++++++++++++++++++++++++ 8 files changed, 159 insertions(+), 11 deletions(-) diff --git a/libs/safekeeper_api/src/models.rs b/libs/safekeeper_api/src/models.rs index 2fbc333075b2..28666d197afd 100644 --- a/libs/safekeeper_api/src/models.rs +++ b/libs/safekeeper_api/src/models.rs @@ -60,3 +60,16 @@ pub struct TimelineCopyRequest { pub target_timeline_id: TimelineId, pub until_lsn: Lsn, } + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TimelineTermBumpRequest { + /// bump to + pub term: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct TimelineTermBumpResponse { + // before the request + pub previous_term: u64, + pub current_term: u64, +} diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index c53257923a3f..c1914421ecf1 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1038,9 +1038,12 @@ DetermineEpochStartLsn(WalProposer *wp) if (SkipXLogPageHeader(wp, wp->propEpochStartLsn) != wp->api.get_redo_start_lsn(wp)) { /* - * However, allow to proceed if previously elected leader was me; - * plain restart of walproposer not intervened by concurrent - * compute (who could generate WAL) is ok. + * However, allow to proceed if last_log_term on the node which gave + * the highest vote (i.e. point where we are going to start writing) + * actually had been won by me; plain restart of walproposer not + * intervened by concurrent compute which wrote WAL is ok. + * + * This avoids compute crash after manual term_bump. */ if (!((dth->n_entries >= 1) && (dth->entries[dth->n_entries - 1].term == pg_atomic_read_u64(&walprop_shared->mineLastElectedTerm)))) @@ -1442,12 +1445,17 @@ RecvAppendResponses(Safekeeper *sk) if (sk->appendResponse.term > wp->propTerm) { /* - * Another compute with higher term is running. Panic to restart - * PG as we likely need to retake basebackup. However, don't dump - * core as this is kinda expected scenario. + * + * Term has changed to higher one, probably another compute is + * running. If this is the case we could PANIC as well because + * likely it inserted some data and our basebackup is unsuitable + * anymore. However, we also bump term manually (term_bump endpoint) + * on safekeepers for migration purposes, in this case we do want + * compute to stay alive. So restart walproposer with FATAL instead + * of panicking; if basebackup is spoiled next election will notice + * this. */ - disable_core_dump(); - wp_log(PANIC, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us", + wp_log(FATAL, "WAL acceptor %s:%s with term " INT64_FORMAT " rejected our request, our term " INT64_FORMAT ", meaning another compute is running at the same time, and it conflicts with us", sk->host, sk->port, sk->appendResponse.term, wp->propTerm); } diff --git a/safekeeper/src/auth.rs b/safekeeper/src/auth.rs index b8bc3f3e0689..c5c9393c0084 100644 --- a/safekeeper/src/auth.rs +++ b/safekeeper/src/auth.rs @@ -1,6 +1,9 @@ use utils::auth::{AuthError, Claims, Scope}; use utils::id::TenantId; +/// If tenant_id is provided, allow if token (claims) is for this tenant or +/// whole safekeeper scope (SafekeeperData). Else, allow only if token is +/// SafekeeperData. pub fn check_permission(claims: &Claims, tenant_id: Option) -> Result<(), AuthError> { match (&claims.scope, tenant_id) { (Scope::Tenant, None) => Err(AuthError( diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index 9b7424a8189c..e482edea55a2 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -18,8 +18,8 @@ use utils::http::endpoint::{prometheus_metrics_handler, request_span, ChannelWri use utils::http::request::parse_query_param; use postgres_ffi::WAL_SEGMENT_SIZE; -use safekeeper_api::models::TimelineCreateRequest; use safekeeper_api::models::{SkTimelineInfo, TimelineCopyRequest}; +use safekeeper_api::models::{TimelineCreateRequest, TimelineTermBumpRequest}; use utils::{ auth::SwappableJwtAuth, http::{ @@ -408,6 +408,28 @@ async fn timeline_backup_partial_reset(request: Request) -> Result, +) -> Result, ApiError> { + let ttid = TenantTimelineId::new( + parse_request_param(&request, "tenant_id")?, + parse_request_param(&request, "timeline_id")?, + ); + check_permission(&request, Some(ttid.tenant_id))?; + + let request_data: TimelineTermBumpRequest = json_request(&mut request).await?; + + let tli = GlobalTimelines::get(ttid).map_err(ApiError::from)?; + let response = tli + .term_bump(request_data.term) + .await + .map_err(ApiError::InternalServerError)?; + + json_response(StatusCode::OK, response) +} + /// Used only in tests to hand craft required data. async fn record_safekeeper_info(mut request: Request) -> Result, ApiError> { let ttid = TenantTimelineId::new( @@ -630,6 +652,10 @@ pub fn make_router(conf: SafeKeeperConf) -> RouterBuilder "/v1/tenant/:tenant_id/timeline/:timeline_id/backup_partial_reset", |r| request_span(r, timeline_backup_partial_reset), ) + .post( + "/v1/tenant/:tenant_id/timeline/:timeline_id/term_bump", + |r| request_span(r, timeline_term_bump_handler), + ) .post("/v1/record_safekeeper_info/:tenant_id/:timeline_id", |r| { request_span(r, record_safekeeper_info) }) diff --git a/safekeeper/src/state.rs b/safekeeper/src/state.rs index 97eeae363899..8ae749ded5f5 100644 --- a/safekeeper/src/state.rs +++ b/safekeeper/src/state.rs @@ -1,9 +1,10 @@ //! Defines per timeline data stored persistently (SafeKeeperPersistentState) //! and its wrapper with in memory layer (SafekeeperState). -use std::ops::Deref; +use std::{cmp::max, ops::Deref}; use anyhow::Result; +use safekeeper_api::models::TimelineTermBumpResponse; use serde::{Deserialize, Serialize}; use utils::{ id::{NodeId, TenantId, TenantTimelineId, TimelineId}, @@ -12,7 +13,7 @@ use utils::{ use crate::{ control_file, - safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, TermHistory}, + safekeeper::{AcceptorState, PersistedPeerInfo, PgUuid, ServerInfo, Term, TermHistory}, wal_backup_partial::{self}, }; @@ -211,6 +212,27 @@ where let s = self.start_change(); self.finish_change(&s).await } + + /// Make term at least as `to`. If `to` is None, increment current one. This + /// is not in safekeeper.rs because we want to be able to do it even if + /// timeline is offloaded. + pub async fn term_bump(&mut self, to: Option) -> Result { + let before = self.acceptor_state.term; + let mut state = self.start_change(); + let new = match to { + Some(to) => max(state.acceptor_state.term, to), + None => state.acceptor_state.term + 1, + }; + if new > state.acceptor_state.term { + state.acceptor_state.term = new; + self.finish_change(&state).await?; + } + let after = self.acceptor_state.term; + Ok(TimelineTermBumpResponse { + previous_term: before, + current_term: after, + }) + } } impl Deref for TimelineState diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 6fd5de0ad680..fb98534768ff 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -4,6 +4,7 @@ use anyhow::{anyhow, bail, Result}; use camino::Utf8PathBuf; use remote_storage::RemotePath; +use safekeeper_api::models::TimelineTermBumpResponse; use serde::{Deserialize, Serialize}; use tokio::fs::{self}; use tokio_util::sync::CancellationToken; @@ -215,6 +216,10 @@ impl StateSK { .get_last_log_term(self.flush_lsn()) } + pub async fn term_bump(&mut self, to: Option) -> Result { + self.state_mut().term_bump(to).await + } + /// Close open WAL files to release FDs. fn close_wal_store(&mut self) { if let StateSK::Loaded(sk) = self { @@ -854,6 +859,11 @@ impl Timeline { Ok(res) } + pub async fn term_bump(self: &Arc, to: Option) -> Result { + let mut state = self.write_shared_state().await; + state.sk.term_bump(to).await + } + /// Get the timeline guard for reading/writing WAL files. /// If WAL files are not present on disk (evicted), they will be automatically /// downloaded from remote storage. This is done in the manager task, which is diff --git a/test_runner/fixtures/safekeeper/http.py b/test_runner/fixtures/safekeeper/http.py index 9bf03554e751..96c84d161673 100644 --- a/test_runner/fixtures/safekeeper/http.py +++ b/test_runner/fixtures/safekeeper/http.py @@ -50,6 +50,19 @@ def commit_lsn_inexact(self, tenant_id: TenantId, timeline_id: TimelineId): ).value +@dataclass +class TermBumpResponse: + previous_term: int + current_term: int + + @classmethod + def from_json(cls, d: Dict[str, Any]) -> "TermBumpResponse": + return TermBumpResponse( + previous_term=d["previous_term"], + current_term=d["current_term"], + ) + + class SafekeeperHttpClient(requests.Session, MetricsGetter): HTTPError = requests.HTTPError @@ -252,6 +265,22 @@ def backup_partial_reset(self, tenant_id: TenantId, timeline_id: TimelineId): res.raise_for_status() return res.json() + def term_bump( + self, + tenant_id: TenantId, + timeline_id: TimelineId, + term: Optional[int], + ) -> TermBumpResponse: + body = {} + if term is not None: + body["term"] = term + res = self.post( + f"http://localhost:{self.port}/v1/tenant/{tenant_id}/timeline/{timeline_id}/term_bump", + json=body, + ) + res.raise_for_status() + return TermBumpResponse.from_json(res.json()) + def record_safekeeper_info(self, tenant_id: TenantId, timeline_id: TimelineId, body): res = self.post( f"http://localhost:{self.port}/v1/record_safekeeper_info/{tenant_id}/{timeline_id}", diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 5672e836eea1..50fac441c05c 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -2194,6 +2194,43 @@ def test_patch_control_file(neon_env_builder: NeonEnvBuilder): assert res["timelines"][0]["control_file"]["timeline_start_lsn"] == "0/1" +def test_term_bump(neon_env_builder: NeonEnvBuilder): + neon_env_builder.num_safekeepers = 1 + env = neon_env_builder.init_start() + + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + endpoint = env.endpoints.create_start("main") + # initialize safekeeper + endpoint.safe_psql("create table t(key int, value text)") + + http_cli = env.safekeepers[0].http_client() + + # check that bump up to specific term works + curr_term = http_cli.timeline_status(tenant_id, timeline_id).term + bump_to = curr_term + 3 + res = http_cli.term_bump(tenant_id, timeline_id, bump_to) + log.info(f"bump to {bump_to} res: {res}") + assert res.current_term >= bump_to + + # check that bump to none increments current term + res = http_cli.term_bump(tenant_id, timeline_id, None) + log.info(f"bump to None res: {res}") + assert res.current_term > bump_to + assert res.current_term > res.previous_term + + # check that bumping doesn't work downward + res = http_cli.term_bump(tenant_id, timeline_id, 2) + log.info(f"bump to 2 res: {res}") + assert res.current_term > bump_to + assert res.current_term == res.previous_term + + # check that this doesn't kill endpoint because last WAL flush was his and + # thus its basebackup is still good + endpoint.safe_psql("insert into t values (1, 'payload')") + + # Test disables periodic pushes from safekeeper to the broker and checks that # pageserver can still discover safekeepers with discovery requests. def test_broker_discovery(neon_env_builder: NeonEnvBuilder):