Skip to content

Commit

Permalink
Implement negative cache API endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
yanorei32 committed Feb 4, 2024
1 parent 6b5ea0c commit f42744b
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 67 deletions.
14 changes: 8 additions & 6 deletions src/duplicator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::sync::Arc;

use tokio::sync::Semaphore;

use super::duplicator::{Enqueuer, InnerRunner, Runner};
use super::processor::{Enqueuer, InnerRunner, Runner};
use super::load_balancer::LoadBalancer;
use super::negative_cache::NegativeCache;
use super::model::Task;
use super::queue::two_level_queue;
use super::queue::priority_queue;

pub struct Builder {
clients: Vec<reqwest::Client>,
Expand Down Expand Up @@ -45,23 +45,25 @@ impl Builder {
}

#[must_use]
pub fn build(self) -> (Enqueuer, Runner) {
let (task_tx, task_rx) = two_level_queue::<Task>();
pub fn build(self) -> (Enqueuer, NegativeCache, Runner) {
let (task_tx, task_rx) = priority_queue::<Task>();

let enqueuer = Enqueuer::new(task_tx, self.ttl);
let global_limit = Arc::new(Semaphore::new(self.global_limit));

let negative_cache = NegativeCache::new();

let runner = Runner::new(
Arc::new(InnerRunner::new(
LoadBalancer::new(self.clients),
NegativeCache::new(),
negative_cache.clone(),
enqueuer.clone(),
self.retry_after,
)),
global_limit,
task_rx,
);

(enqueuer, runner)
(enqueuer, negative_cache, runner)
}
}
8 changes: 4 additions & 4 deletions src/duplicator/load_balancer.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::atomic::AtomicUsize;

pub struct LoadBalancer<T> {
pub(super) struct LoadBalancer<T> {
targets: Vec<T>,
counter: AtomicUsize,
}

impl<T> LoadBalancer<T> {
#[must_use]
pub fn new(targets: Vec<T>) -> Self {
if targets.len() == 0 {
pub(super) fn new(targets: Vec<T>) -> Self {
if targets.is_empty() {
panic!("Cannot load balance 0 targets");
}

Expand All @@ -19,7 +19,7 @@ impl<T> LoadBalancer<T> {
}

#[must_use]
pub fn fetch_next_ref(&self) -> &T {
pub(super) fn fetch_next_ref(&self) -> &T {
let n = self.counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
&self.targets[n % self.targets.len()]
}
Expand Down
5 changes: 3 additions & 2 deletions src/duplicator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
mod builder;
mod duplicator;
mod processor;
mod load_balancer;
mod model;
mod negative_cache;
mod queue;

pub use builder::Builder;
pub use duplicator::Enqueuer;
pub use processor::Enqueuer;
pub use model::Payload;
pub use queue::Priority;
pub use negative_cache::NegativeCache;
26 changes: 22 additions & 4 deletions src/duplicator/negative_cache.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
use std::collections::HashSet;
use std::sync::Arc;

use tokio::sync::RwLock;

pub(super) struct NegativeCache {
struct Inner {
cache: RwLock<HashSet<String>>,
}

#[derive(Clone)]
pub struct NegativeCache {
inner: Arc<Inner>,
}

impl NegativeCache {
pub(super) fn new() -> Self {
NegativeCache { cache: RwLock::new(HashSet::new()) }
NegativeCache {
inner: Arc::new(Inner {
cache: RwLock::new(HashSet::new()),
}),
}
}

pub(super) async fn ban(&self, url: &str) {
self.cache.write().await.insert(url.to_owned());
self.inner.cache.write().await.insert(url.to_owned());
}

pub(super) async fn is_banned(&self, url: &str) -> bool {
self.cache.read().await.contains(url)
self.inner.cache.read().await.contains(url)
}

pub async fn list(&self) -> Vec<String> {
Vec::from_iter(self.inner.cache.read().await.iter().map(|s| s.to_owned()))
}

pub async fn delete(&self, url: &str) {
self.inner.cache.write().await.remove(url);
}
}
10 changes: 5 additions & 5 deletions src/duplicator/duplicator.rs → src/duplicator/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ use tokio::sync::Semaphore;
use super::load_balancer::LoadBalancer;
use super::model::{Context, Payload, Task};
use super::negative_cache::NegativeCache;
use super::queue::{Priority, TwoLevelQueueSender, TwoLevelQueueReceiver};
use super::queue::{Priority, PriorizedSender, PriorizedReceiver};

#[derive(Clone)]
pub struct Enqueuer {
task_tx: TwoLevelQueueSender<Task>,
task_tx: PriorizedSender<Task>,
ttl: usize,
}

impl Enqueuer {
pub(super) fn new(task_tx: TwoLevelQueueSender<Task>, ttl: usize) -> Self {
pub(super) fn new(task_tx: PriorizedSender<Task>, ttl: usize) -> Self {
Self { task_tx, ttl }
}

Expand Down Expand Up @@ -77,14 +77,14 @@ impl InnerRunner {
pub struct Runner {
inner: Arc<InnerRunner>,
global_limit: Arc<Semaphore>,
task_rx: TwoLevelQueueReceiver<Task>,
task_rx: PriorizedReceiver<Task>
}

impl Runner {
pub(super) fn new(
inner: Arc<InnerRunner>,
global_limit: Arc<Semaphore>,
task_rx: TwoLevelQueueReceiver<Task>,
task_rx: PriorizedReceiver<Task>
) -> Self {
Self {
inner,
Expand Down
8 changes: 3 additions & 5 deletions src/duplicator/queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
mod queue;
mod two_level_queue;
mod priority;
mod simple;

pub use two_level_queue::{
two_level_queue, Priority, Priorized, TwoLevelQueueReceiver, TwoLevelQueueSender,
};
pub use priority::{priority_queue, Priority, Priorized, PriorizedReceiver, PriorizedSender};
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::queue::{queue, QueueReceiver, QueueSender};
use super::simple::{queue, Receiver, Sender};

#[derive(Clone, Copy, Debug)]
pub enum Priority {
Expand All @@ -11,17 +11,17 @@ pub trait Priorized {
}

#[derive(Clone)]
pub struct TwoLevelQueueSender<T> {
high_priority: QueueSender<T>,
low_priority: QueueSender<T>,
pub struct PriorizedSender<T> {
high_priority: Sender<T>,
low_priority: Sender<T>,
}

pub struct TwoLevelQueueReceiver<T> {
high_priority: QueueReceiver<T>,
low_priority: QueueReceiver<T>,
pub struct PriorizedReceiver<T> {
high_priority: Receiver<T>,
low_priority: Receiver<T>,
}

impl<T> TwoLevelQueueReceiver<T> {
impl<T> PriorizedReceiver<T> {
pub async fn fetch(&mut self) -> T {
let idle = self.high_priority.is_empty();

Expand All @@ -38,7 +38,7 @@ impl<T> TwoLevelQueueReceiver<T> {
}
}

impl<T: Priorized> TwoLevelQueueSender<T> {
impl<T: Priorized> PriorizedSender<T> {
pub fn enqueue(&self, t: T) {
match t.priority() {
Priority::High => self.high_priority.enqueue(t),
Expand All @@ -47,16 +47,16 @@ impl<T: Priorized> TwoLevelQueueSender<T> {
}
}

pub fn two_level_queue<T>() -> (TwoLevelQueueSender<T>, TwoLevelQueueReceiver<T>) {
pub fn priority_queue<T>() -> (PriorizedSender<T>, PriorizedReceiver<T>) {
let (high_tx, high_rx) = queue();
let (low_tx, low_rx) = queue();

(
TwoLevelQueueSender {
PriorizedSender {
high_priority: high_tx,
low_priority: low_tx,
},
TwoLevelQueueReceiver {
PriorizedReceiver {
high_priority: high_rx,
low_priority: low_rx,
},
Expand Down
14 changes: 7 additions & 7 deletions src/duplicator/queue/queue.rs → src/duplicator/queue/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ pub(super) struct Shared {
}

#[derive(Clone)]
pub(super) struct QueueSender<T> {
pub(super) struct Sender<T> {
tx: UnboundedSender<T>,
shared: Arc<Shared>,
}

pub(super) struct QueueReceiver<T> {
pub(super) struct Receiver<T> {
rx: UnboundedReceiver<T>,
shared: Arc<Shared>,
}

impl<T> QueueReceiver<T> {
impl<T> Receiver<T> {
pub(super) async fn dequeue(&mut self) -> Option<T> {
self.rx.recv().await
}
Expand All @@ -34,19 +34,19 @@ impl<T> QueueReceiver<T> {
}
}

impl<T> QueueSender<T> {
impl<T> Sender<T> {
pub(super) fn enqueue(&self, t: T) {
self.shared.len.fetch_add(1, Relaxed);
self.tx.send(t).unwrap();
}
}

pub(super) fn queue<T>() -> (QueueSender<T>, QueueReceiver<T>) {
pub(super) fn queue<T>() -> (Sender<T>, Receiver<T>) {
let (tx, rx) = mpsc::unbounded_channel();
let shared = Arc::new(Shared { len: AtomicUsize::new(0) });

let sender_view = QueueSender { tx: tx.clone(), shared: shared.clone() };
let receiver_view = QueueReceiver { rx, shared: shared.clone() };
let sender_view = Sender { tx: tx.clone(), shared: shared.clone() };
let receiver_view = Receiver { rx, shared: shared.clone() };

(sender_view, receiver_view)
}
Loading

0 comments on commit f42744b

Please sign in to comment.