Skip to content

Commit

Permalink
sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
iffyio committed Jan 28, 2021
1 parent 529f93e commit 83e983e
Showing 1 changed file with 81 additions and 1 deletion.
82 changes: 81 additions & 1 deletion src/extensions/filter_manager.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Google LLC All Rights Reserved.
* Copyright 2021 Google LLC All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -137,3 +137,83 @@ impl FilterManager {
base_logger.new(o!("source" => "FilterManager"))
}
}

#[cfg(test)]
mod tests {
use super::FilterManager;
use crate::extensions::{DownstreamContext, DownstreamResponse, Filter, FilterChain};
use crate::test_utils::logger;

use std::sync::Arc;

use crate::cluster::Endpoint;
use crate::config::{Endpoints, UpstreamEndpoints};
use tokio::sync::mpsc;
use tokio::sync::watch;

#[ignore]
#[tokio::test]
async fn spawn_updater() {
let filter_manager = FilterManager::fixed(logger(), Arc::new(FilterChain::new(vec![])));
let (filter_chain_updates_tx, filter_chain_updates_rx) = mpsc::channel(10);
let (_shutdown_tx, shutdown_rx) = watch::channel(());

FilterManager::spawn_updater(
logger(),
filter_manager.clone(),
filter_chain_updates_rx,
shutdown_rx,
);

let manager_guard = filter_manager.read();
let filter_chain = manager_guard.get_filter_chain().as_ref().unwrap();

let test_endpoints = Endpoints::new(vec![Endpoint::from_address(
"127.0.0.1:8080".parse().unwrap(),
)])
.unwrap();
let response = filter_chain.on_downstream_receive(DownstreamContext::new(
UpstreamEndpoints::from(test_endpoints.clone()),
"127.0.0.1:8081".parse().unwrap(),
vec![],
));
assert!(response.is_some());

struct Drop;
impl Filter for Drop {
fn on_downstream_receive(&self, _: DownstreamContext) -> Option<DownstreamResponse> {
None
}
}

let filter_chain = Arc::new(FilterChain::new(vec![Box::new(Drop)]));
assert!(filter_chain_updates_tx.send(filter_chain).await.is_ok());

let mut num_iterations = 0;
loop {
// Wait for the new filter chain to be applied.
// The new filter chain drops packets instead.
let manager_guard = filter_manager.read();
let filter_chain = manager_guard.get_filter_chain().as_ref().unwrap();
if filter_chain
.on_downstream_receive(DownstreamContext::new(
UpstreamEndpoints::from(test_endpoints.clone()),
"127.0.0.1:8081".parse().unwrap(),
vec![],
))
.is_none()
{
break;
}

num_iterations += 1;
if num_iterations > 1000 {
unreachable!("timed-out waiting for new filter chain to be applied");
}

println!("sleep start");
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
println!("sleep end");
}
}
}

0 comments on commit 83e983e

Please sign in to comment.