Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coredns: work on tcp requests concurrently #503

Merged
merged 2 commits into from
Sep 2, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
coredns: work on tcp requests concurrently
Right now for a single network all requests where processed serial and
with tcp a caller is able to block us for a long time if it just opens
the connection but sends very little or no data. To avoid this always
spawn a new task if we accept a new tcp connection.

We could do the same for udp however my testing with contrib/perf/run.sh
has shown that it slows things down as the overhead of spawning a task
is greater than the few quick simple map lookups so we only spawn where
needed. We still have to spawn when forwarding external requests as this
can take a long time.

Fixes #500

Signed-off-by: Paul Holzinger <pholzing@redhat.com>
  • Loading branch information
Luap99 committed Sep 2, 2024
commit 39d0043c306c936fb5b6480b456cc1fdec869e25
129 changes: 86 additions & 43 deletions src/dns/coredns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@ use tokio::net::UdpSocket;
const CONTAINER_TTL: u32 = 60;

pub struct CoreDns {
rx: flume::Receiver<()>, // kill switch receiver
inner: CoreDnsData,
}

#[derive(Clone)]
struct CoreDnsData {
network_name: String, // raw network name
backend: &'static ArcSwap<DNSBackend>, // server's data store
rx: flume::Receiver<()>, // kill switch receiver
no_proxy: bool, // do not forward to external resolvers
nameservers: Arc<Mutex<Vec<ScopedIp>>>, // host nameservers from resolv.conf
}
Expand All @@ -57,11 +62,13 @@ impl CoreDns {
nameservers: Arc<Mutex<Vec<ScopedIp>>>,
) -> Self {
CoreDns {
network_name,
backend,
rx,
no_proxy,
nameservers,
inner: CoreDnsData {
network_name,
backend,
no_proxy,
nameservers,
},
}
}

Expand All @@ -87,12 +94,12 @@ impl CoreDns {
continue;
}
};
self.process_message(msg_received, &sender_original, Protocol::Udp);
Self::process_message(&self.inner, msg_received, &sender_original, Protocol::Udp).await;
},
res = tcp_listener.accept() => {
match res {
Ok((sock,addr)) => {
self.process_tcp_stream(sock, addr).await
tokio::spawn(Self::process_tcp_stream(self.inner.clone(), sock, addr));
}
Err(e) => {
error!("Failed to accept new tcp connection: {e}");
Expand All @@ -105,7 +112,11 @@ impl CoreDns {
Ok(())
}

async fn process_tcp_stream(&self, stream: tokio::net::TcpStream, peer: SocketAddr) {
async fn process_tcp_stream(
data: CoreDnsData,
stream: tokio::net::TcpStream,
peer: SocketAddr,
) {
let (mut hickory_stream, sender_original) =
TcpStream::from_stream(AsyncIoTokioAsStd(stream), peer);

Expand All @@ -114,7 +125,7 @@ impl CoreDns {
match tokio::time::timeout(Duration::from_secs(3), hickory_stream.next()).await {
Ok(message) => {
if let Some(msg) = message {
self.process_message(msg, &sender_original, Protocol::Tcp);
Self::process_message(&data, msg, &sender_original, Protocol::Tcp).await;
// The API is a bit strange, first time we call next we get the message,
// but we must call again to send our reply back
hickory_stream.next().await;
Expand All @@ -127,8 +138,8 @@ impl CoreDns {
}
}

fn process_message(
&self,
async fn process_message(
data: &CoreDnsData,
msg_received: Result<SerialMessage, Error>,
sender_original: &BufDnsStreamHandle,
proto: Protocol,
Expand All @@ -140,7 +151,7 @@ impl CoreDns {
return;
}
};
let backend = self.backend.load();
let backend = data.backend.load();
let src_address = msg.addr();
let mut sender = sender_original.with_remote_addr(src_address);
let (request_name, record_type, mut req) = match parse_dns_msg(msg) {
Expand All @@ -153,7 +164,7 @@ impl CoreDns {
let request_name_string = request_name.to_string();

// Create debug and trace info for key parameters.
trace!("server network name: {:?}", self.network_name);
trace!("server network name: {:?}", data.network_name);
debug!("request source address: {:?}", src_address);
trace!("requested record type: {:?}", record_type);
debug!(
Expand All @@ -175,7 +186,7 @@ impl CoreDns {
if let Some(msg) = reply_ip(
&request_name_string,
&request_name,
&self.network_name,
&data.network_name,
record_type,
&backend,
src_address,
Expand All @@ -195,7 +206,7 @@ impl CoreDns {
};

// are we allowed to forward?
if self.no_proxy
if data.no_proxy
|| backend.ctr_is_internal(&src_address.ip())
|| request_name_string.ends_with(&backend.search_domain)
|| request_name_string.matches('.').count() == 1
Expand Down Expand Up @@ -224,41 +235,73 @@ impl CoreDns {
}
// Use host resolvers if no custom resolvers are set for the container.
if nameservers.is_empty() {
nameservers.clone_from(&self.nameservers.lock().expect("lock nameservers"));
nameservers.clone_from(&data.nameservers.lock().expect("lock nameservers"));
}

match proto {
Protocol::Udp => {
tokio::spawn(Self::forward_to_servers(
nameservers,
sender,
src_address,
req,
proto,
));
}
Protocol::Tcp => {
// we already spawned a new future when we read the message so there is no need to spawn another one
Self::forward_to_servers(nameservers, sender, src_address, req, proto).await;
}
}
}
}

tokio::spawn(async move {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = AsyncClient::connect(stream).await?;
let handle = tokio::spawn(bg);
(cl, handle)
async fn forward_to_servers(
nameservers: Vec<ScopedIp>,
mut sender: BufDnsStreamHandle,
src_address: SocketAddr,
req: Message,
proto: Protocol,
) {
// forward dns request to hosts's /etc/resolv.conf
for nameserver in &nameservers {
let addr = SocketAddr::new(nameserver.into(), 53);
let (client, handle) = match proto {
Protocol::Udp => {
let stream = UdpClientStream::<UdpSocket>::new(addr);
let (cl, bg) = match AsyncClient::connect(stream).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
Protocol::Tcp => {
let (stream, sender) = TcpClientStream::<
AsyncIoTokioAsStd<tokio::net::TcpStream>,
>::new(addr);
let (cl, bg) = AsyncClient::new(stream, sender, None).await?;
let handle = tokio::spawn(bg);
(cl, handle)
};
let handle = tokio::spawn(bg);
(cl, handle)
}
Protocol::Tcp => {
let (stream, sender) =
TcpClientStream::<AsyncIoTokioAsStd<tokio::net::TcpStream>>::new(addr);
let (cl, bg) = match AsyncClient::new(stream, sender, None).await {
Ok(a) => a,
Err(e) => {
debug!("Failed to connect to {addr}: {e}");
continue;
}
};
let handle = tokio::spawn(bg);
(cl, handle)
}
};

if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
}
drop(handle);
if let Some(resp) = forward_dns_req(client, req.clone()).await {
if reply(&mut sender, src_address, &resp).is_some() {
// request resolved from following resolver so
// break and don't try other resolvers
break;
}
Ok::<(), std::io::Error>(())
});
}
handle.abort();
}
}
}
Expand Down