Skip to content

Commit 7e2b1e9

Browse files
authored
Add include_primary_if_replica_banned rw_split strategy (#583)
1 parent b35bf88 commit 7e2b1e9

File tree

3 files changed

+86
-1
lines changed

3 files changed

+86
-1
lines changed

pgdog/src/backend/pool/replicas/mod.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,12 @@ impl Replicas {
178178

179179
let mut candidates: Vec<&ReadTarget> = self.replicas.iter().collect();
180180

181-
let primary_reads = self.rw_split == IncludePrimary;
181+
let primary_reads = match self.rw_split {
182+
IncludePrimary => true,
183+
IncludePrimaryIfReplicaBanned => candidates.iter().any(|target| target.ban.banned()),
184+
ExcludePrimary => false,
185+
};
186+
182187
if primary_reads {
183188
if let Some(ref primary) = self.primary {
184189
candidates.push(primary);

pgdog/src/backend/pool/replicas/test.rs

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,3 +769,81 @@ async fn test_monitor_health_state_race() {
769769

770770
replicas.shutdown();
771771
}
772+
773+
#[tokio::test]
774+
async fn test_include_primary_if_replica_banned_no_bans() {
775+
let primary_config = create_test_pool_config("127.0.0.1", 5432);
776+
let primary_pool = Pool::new(&primary_config);
777+
primary_pool.launch();
778+
779+
let replica_configs = [create_test_pool_config("localhost", 5432)];
780+
781+
let replicas = Replicas::new(
782+
&Some(primary_pool),
783+
&replica_configs,
784+
LoadBalancingStrategy::Random,
785+
ReadWriteSplit::IncludePrimaryIfReplicaBanned,
786+
);
787+
replicas.launch();
788+
789+
let request = Request::default();
790+
791+
// When no replicas are banned, primary should NOT be used
792+
let mut used_pool_ids = HashSet::new();
793+
for _ in 0..20 {
794+
let conn = replicas.get(&request).await.unwrap();
795+
used_pool_ids.insert(conn.pool.id());
796+
}
797+
798+
// Should only use replica pool
799+
assert_eq!(used_pool_ids.len(), 1);
800+
801+
// Verify primary pool ID is not in the set of used pools
802+
let primary_id = replicas.primary.as_ref().unwrap().pool.id();
803+
assert!(!used_pool_ids.contains(&primary_id));
804+
805+
// Shutdown both primary and replicas
806+
replicas.primary.as_ref().unwrap().pool.shutdown();
807+
replicas.shutdown();
808+
}
809+
810+
#[tokio::test]
811+
async fn test_include_primary_if_replica_banned_with_ban() {
812+
let primary_config = create_test_pool_config("127.0.0.1", 5432);
813+
let primary_pool = Pool::new(&primary_config);
814+
primary_pool.launch();
815+
816+
let replica_configs = [create_test_pool_config("localhost", 5432)];
817+
818+
let replicas = Replicas::new(
819+
&Some(primary_pool),
820+
&replica_configs,
821+
LoadBalancingStrategy::Random,
822+
ReadWriteSplit::IncludePrimaryIfReplicaBanned,
823+
);
824+
replicas.launch();
825+
826+
// Ban the replica
827+
let replica_ban = &replicas.replicas[0].ban;
828+
replica_ban.ban(Error::ServerError, Duration::from_millis(1000));
829+
830+
let request = Request::default();
831+
832+
// When replica is banned, primary SHOULD be used
833+
let mut used_pool_ids = HashSet::new();
834+
for _ in 0..20 {
835+
let conn = replicas.get(&request).await.unwrap();
836+
used_pool_ids.insert(conn.pool.id());
837+
}
838+
839+
// Should only use primary pool since replica is banned
840+
assert_eq!(used_pool_ids.len(), 1);
841+
842+
// Verify primary pool ID is in the set of used pools
843+
let primary_id = replicas.primary.as_ref().unwrap().pool.id();
844+
assert!(used_pool_ids.contains(&primary_id));
845+
846+
// Shutdown both primary and replicas
847+
replicas.primary.as_ref().unwrap().pool.shutdown();
848+
replicas.shutdown();
849+
}

pgdog/src/config/database.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub enum ReadWriteSplit {
5151
#[default]
5252
IncludePrimary,
5353
ExcludePrimary,
54+
IncludePrimaryIfReplicaBanned,
5455
}
5556

5657
impl FromStr for ReadWriteSplit {
@@ -60,6 +61,7 @@ impl FromStr for ReadWriteSplit {
6061
match s.to_lowercase().replace(['_', '-'], "").as_str() {
6162
"includeprimary" => Ok(Self::IncludePrimary),
6263
"excludeprimary" => Ok(Self::ExcludePrimary),
64+
"includeprimaryifreplicabanned" => Ok(Self::IncludePrimaryIfReplicaBanned),
6365
_ => Err(format!("Invalid read-write split: {}", s)),
6466
}
6567
}

0 commit comments

Comments
 (0)