Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle HTTP traffic over opaque transport connections #1416

Merged
merged 6 commits into from
Dec 23, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 96 additions & 19 deletions linkerd/app/inbound/src/direct.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{policy, Inbound};
use linkerd_app_core::{
identity, io,
proxy::http,
svc::{self, ExtractParam, InsertParam, Param},
tls,
transport::{self, metrics::SensorIo, ClientAddr, OrigDstAddr, Remote, ServerAddr},
Expand All @@ -24,11 +25,12 @@ pub struct RefusedNoIdentity(());
#[error("a named target must be provided on gateway connections")]
struct RefusedNoTarget;

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone)]
pub(crate) struct Local {
addr: Remote<ServerAddr>,
client_id: tls::ClientId,
permit: policy::Permit,
policy: policy::AllowPolicy,
client: ClientInfo,
protocol: Option<SessionProtocol>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -67,10 +69,11 @@ impl<N> Inbound<N> {
/// 2. TLS is required;
/// 3. A transport header is expected. It's not strictly required, as
/// gateways may need to accept HTTP requests from older proxy versions
pub(crate) fn push_direct<T, I, NSvc, G, GSvc>(
pub(crate) fn push_direct<T, I, NSvc, G, GSvc, H, HSvc>(
self,
policies: impl policy::CheckPolicy + Clone + Send + Sync + 'static,
gateway: G,
http: H,
) -> Inbound<svc::ArcNewTcp<T, I>>
where
T: Param<Remote<ClientAddr>> + Param<OrigDstAddr>,
Expand All @@ -90,6 +93,10 @@ impl<N> Inbound<N> {
GSvc: svc::Service<GatewayIo<I>, Response = ()> + Send + 'static,
GSvc::Error: Into<Error>,
GSvc::Future: Send,
H: svc::NewService<Local, Service = HSvc> + Clone + Send + Sync + Unpin + 'static,
HSvc: svc::Service<io::PrefixedIo<TlsIo<I>>, Response = ()> + Send + 'static,
HSvc::Error: Into<Error>,
HSvc::Future: Send,
{
self.map_stack(|config, rt, inner| {
let detect_timeout = config.proxy.detect_protocol_timeout;
Expand All @@ -105,6 +112,19 @@ impl<N> Inbound<N> {
rt.metrics.proxy.transport.clone(),
))
.instrument(|_: &_| debug_span!("opaque"))
.check_new_service::<Local, _>()
.push_switch(
|t: Local| -> Result<_> {
if t.protocol.is_none() {
Ok(svc::Either::A(t))
} else {
Ok(svc::Either::B(t))
}
},
svc::stack(http)
.instrument(|_: &_| debug_span!("opaque.http"))
.into_inner(),
)
// When the transport header is present, it may be used for either local TCP
// forwarding, or we may be processing an HTTP gateway connection. HTTP gateway
// connections that have a transport header must provide a target name as a part of
Expand All @@ -125,18 +145,11 @@ impl<N> Inbound<N> {
// it.
let addr = (client.local_addr.ip(), port).into();
let allow = policies.check_policy(OrigDstAddr(addr))?;
let tls = tls::ConditionalServerTls::Some(
tls::ServerTls::Established {
client_id: Some(client.client_id.clone()),
negotiated_protocol: client.alpn,
},
);
let permit =
allow.check_authorized(client.client_addr, &tls)?;
Comment on lines -134 to -135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we are no longer checking the policy on opaque connections?

It's probably better to introduce a new target type--LocalHttp or something (and rename Local to LocalTcp?). This switch could return an Either<Either<LocalTcp, LocalHttp>, GatewayTransportHeader> and then the inner switch predicate can simply return the target... I can probably put a suggestion up to this effect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok(svc::Either::A(Local {
addr: Remote(ServerAddr(addr)),
permit,
client_id: client.client_id,
policy: allow,
protocol: None,
client,
}))
}
TransportHeader {
Expand All @@ -157,10 +170,22 @@ impl<N> Inbound<N> {
}))
}
TransportHeader {
port,
name: None,
protocol: Some(_),
..
} => Err(RefusedNoTarget.into()),
protocol: Some(protocol),
} => {
// When TransportHeader includes the protocol, but does not
// include an alternate name we go through the Inbound HTTP
// stack.
let addr = (client.local_addr.ip(), port).into();
let allow = policies.check_policy(OrigDstAddr(addr))?;
Ok(svc::Either::A(Local {
addr: Remote(ServerAddr(addr)),
policy: allow,
protocol: Some(protocol),
client,
}))
}
}
}
},
Expand All @@ -176,8 +201,10 @@ impl<N> Inbound<N> {
)
.into_inner(),
)
.check_new_service::<(TransportHeader, ClientInfo), _>()
// Use ALPN to determine whether a transport header should be read.
.push(NewTransportHeaderServer::layer(detect_timeout))
.check_new_service::<ClientInfo, _>()
.push_request_filter(|client: ClientInfo| -> Result<_> {
if client.header_negotiated() {
Ok(client)
Expand All @@ -195,6 +222,7 @@ impl<N> Inbound<N> {
identity,
},
))
.check_new_service::<T, I>()
.push_on_service(svc::BoxService::layer())
.push(svc::ArcNewService::layer())
})
Expand Down Expand Up @@ -243,19 +271,68 @@ impl Param<Remote<ServerAddr>> for Local {
}
}

impl Param<OrigDstAddr> for Local {
fn param(&self) -> OrigDstAddr {
self.client.local_addr
}
}

impl Param<Remote<ClientAddr>> for Local {
fn param(&self) -> Remote<ClientAddr> {
self.client.client_addr
}
}

impl Param<transport::labels::Key> for Local {
fn param(&self) -> transport::labels::Key {
transport::labels::Key::inbound_server(
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some(self.client_id.clone()),
client_id: Some(self.client.client_id.clone()),
negotiated_protocol: None,
}),
self.addr.into(),
self.permit.labels.server.clone(),
self.policy.server_label(),
)
}
}

impl svc::Param<policy::AllowPolicy> for Local {
fn param(&self) -> policy::AllowPolicy {
self.policy.clone()
}
}

impl svc::Param<http::Version> for Local {
fn param(&self) -> http::Version {
match &self.protocol {
Some(SessionProtocol::Http1) => http::Version::Http1,
Some(SessionProtocol::Http2) => http::Version::H2,
None => http::Version::H2,
}
}
}

impl svc::Param<http::normalize_uri::DefaultAuthority> for Local {
fn param(&self) -> http::normalize_uri::DefaultAuthority {
http::normalize_uri::DefaultAuthority(None)
}
}

impl svc::Param<policy::ServerLabel> for Local {
fn param(&self) -> policy::ServerLabel {
self.policy.server_label()
}
}

impl svc::Param<tls::ConditionalServerTls> for Local {
fn param(&self) -> tls::ConditionalServerTls {
tls::ConditionalServerTls::Some(tls::ServerTls::Established {
client_id: Some(self.client.client_id.clone()),
negotiated_protocol: self.client.alpn.clone(),
})
}
}

// === impl GatewayTransportHeader ===

impl Param<transport::labels::Key> for GatewayTransportHeader {
Expand Down
27 changes: 18 additions & 9 deletions linkerd/app/inbound/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,15 +60,24 @@ impl Inbound<()> {
.into_inner();

// Handles connections that target the inbound proxy port.
let direct = self
.clone()
.into_tcp_connect(addr.port())
.push_tcp_forward()
.map_stack(|_, _, s| s.push_map_target(TcpEndpoint::from_param))
.push_direct(policies.clone(), gateway)
.into_stack()
.instrument(|_: &_| debug_span!("direct"))
.into_inner();
let direct = {
// Handles HTTP connections.
olix0r marked this conversation as resolved.
Show resolved Hide resolved
let http = self
.clone()
.into_tcp_connect(addr.port())
.push_http_router(profiles.clone())
.push_http_server()
.into_inner();

self.clone()
.into_tcp_connect(addr.port())
.push_tcp_forward()
.map_stack(|_, _, s| s.push_map_target(TcpEndpoint::from_param))
.push_direct(policies.clone(), gateway, http)
.into_stack()
.instrument(|_: &_| debug_span!("direct"))
.into_inner()
};

// Handles HTTP connections.
let http = self
Expand Down