Skip to content

Commit

Permalink
[Traffic Control] Make it play nicely with proxy infra (#17854)
Browse files Browse the repository at this point in the history
## Description 

We received reports from a community validator of error logs from
failure to parse `x-forwarded-for` header value to a `SocketAddr`.
Investigation revealed that this is because the validator is running
HAProxy, which attempts to insert the header itself (and in this case
the value inserted was `-`).

We are not using this value anyway, and even if we were, we would not
want to be reading the value inserted by HAProxy as this would
correspond to the fullnode rather than the client.

Note that this also should obviate the fact that running
TrafficController or related on a validator that is running HAProxy can
have bad unintended side effects - namely that a spammy fullnode may
cause a validator running HAProxy to block its HAProxy instance rather
than the fullnode itself.

To fix all of these issues, this PR introduces configuration of
`client-id-source`, which by default will select the `socket-addr` (as
today) to be treated as the "connection". Alternatively, the node
operator can select `x-forwarded-for`, in which case we will search for
this header key and use its contents to determine the client connection.

Note that In the x-forwarded-for case, we have seen that this can be
written as a domain name rather than an IP by load balancers such as
HAProxy, however resolving this to IP address from our side would be
very expensive, so any node that configures `x-forwarded-for` source
type must configure their proxy to also fully resolve the domain name
and write this to the header, otherwise we will skip traffic control.

Note that in addition to infra proxies such as HAProxy, traffic
controller also has the concept of proxy, which from the perspective of
a validator would be a fullnode, as it is forwarding a client request to
the entire committee. This is a different issue entirely, and as of yet
is unsupported. Nevertheless, to reduce the chance for confusion, there
is some liberal renaming in this PR.

TODOS
* Handle `client-id-source: x-forwarded-for` on json rpc side. Currently
we say "unsupported" and skip traffic control.
* Add tests for when `client-id-source: x-forwarded-for` is set (via
both IP and domain name)
* Unit tests for domain name parsing  

## Test plan 

Existing tests. More unit tests to come for the x-forwarded-for header
case.

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
  • Loading branch information
williampsmith authored Jun 12, 2024
1 parent 4c9ffd7 commit 116e527
Show file tree
Hide file tree
Showing 6 changed files with 265 additions and 206 deletions.
135 changes: 81 additions & 54 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ use prometheus::{
register_int_counter_vec_with_registry, register_int_counter_with_registry, IntCounter,
IntCounterVec, Registry,
};
use std::{io, net::SocketAddr, sync::Arc, time::SystemTime};
use std::{
io,
net::{IpAddr, SocketAddr},
sync::Arc,
time::SystemTime,
};
use sui_network::{
api::{Validator, ValidatorServer},
tonic,
Expand All @@ -25,7 +30,7 @@ use sui_types::messages_grpc::{
};
use sui_types::multiaddr::Multiaddr;
use sui_types::sui_system_state::SuiSystemState;
use sui_types::traffic_control::{PolicyConfig, RemoteFirewallConfig, Weight};
use sui_types::traffic_control::{ClientIdSource, PolicyConfig, RemoteFirewallConfig, Weight};
use sui_types::{error::*, transaction::*};
use sui_types::{
fp_ensure,
Expand Down Expand Up @@ -135,6 +140,7 @@ impl AuthorityServer {
consensus_adapter: self.consensus_adapter,
metrics: self.metrics.clone(),
traffic_controller: None,
client_id_source: None,
}))
.bind(&address)
.await
Expand Down Expand Up @@ -167,6 +173,7 @@ pub struct ValidatorServiceMetrics {
connection_ip_not_found: IntCounter,
forwarded_header_parse_error: IntCounter,
forwarded_header_invalid: IntCounter,
forwarded_header_not_included: IntCounter,
}

impl ValidatorServiceMetrics {
Expand Down Expand Up @@ -257,6 +264,12 @@ impl ValidatorServiceMetrics {
registry,
)
.unwrap(),
forwarded_header_not_included: register_int_counter_with_registry!(
"validator_service_forwarded_header_not_included",
"Number of times x-forwarded-for header was (unexpectedly) not included in request",
registry,
)
.unwrap(),
}
}

Expand All @@ -272,6 +285,7 @@ pub struct ValidatorService {
consensus_adapter: Arc<ConsensusAdapter>,
metrics: Arc<ValidatorServiceMetrics>,
traffic_controller: Option<Arc<TrafficController>>,
client_id_source: Option<ClientIdSource>,
}

impl ValidatorService {
Expand All @@ -287,13 +301,14 @@ impl ValidatorService {
state,
consensus_adapter,
metrics: validator_metrics,
traffic_controller: policy_config.map(|policy| {
traffic_controller: policy_config.clone().map(|policy| {
Arc::new(TrafficController::spawn(
policy,
traffic_controller_metrics,
firewall_config,
))
}),
client_id_source: policy_config.map(|policy| policy.client_id_source),
}
}

Expand Down Expand Up @@ -326,6 +341,7 @@ impl ValidatorService {
consensus_adapter,
metrics,
traffic_controller: _,
client_id_source: _,
} = self.clone();
let transaction = request.into_inner();
let epoch_store = state.load_epoch_store_one_call_per_task();
Expand Down Expand Up @@ -707,15 +723,9 @@ impl ValidatorService {
Ok(tonic::Response::new(response))
}

async fn handle_traffic_req(
&self,
connection_ip: Option<SocketAddr>,
proxy_ip: Option<SocketAddr>,
) -> Result<(), tonic::Status> {
async fn handle_traffic_req(&self, client: Option<IpAddr>) -> Result<(), tonic::Status> {
if let Some(traffic_controller) = &self.traffic_controller {
let connection = connection_ip.map(|ip| ip.ip());
let proxy = proxy_ip.map(|ip| ip.ip());
if !traffic_controller.check(connection, proxy).await {
if !traffic_controller.check(&client, &None).await {
// Entity in blocklist
Err(tonic::Status::from_error(SuiError::TooManyRequests.into()))
} else {
Expand All @@ -728,8 +738,7 @@ impl ValidatorService {

fn handle_traffic_resp<T>(
&self,
connection_ip: Option<SocketAddr>,
proxy_ip: Option<SocketAddr>,
client: Option<IpAddr>,
response: &Result<tonic::Response<T>, tonic::Status>,
) {
let error: Option<SuiError> = if let Err(status) = response {
Expand All @@ -740,8 +749,8 @@ impl ValidatorService {

if let Some(traffic_controller) = self.traffic_controller.clone() {
traffic_controller.tally(TrafficTally {
connection_ip: connection_ip.map(|ip| ip.ip()),
proxy_ip: proxy_ip.map(|ip| ip.ip()),
direct: client,
through_fullnode: None,
error_weight: error.map(normalize).unwrap_or(Weight::zero()),
timestamp: SystemTime::now(),
})
Expand Down Expand Up @@ -781,57 +790,75 @@ fn normalize(err: SuiError) -> Weight {
#[macro_export]
macro_rules! handle_with_decoration {
($self:ident, $func_name:ident, $request:ident) => {{
// extract IP info. Note that in addition to extracting the client IP from
// the request header, we also get the remote address in case we need to
// throttle a fullnode, or an end user is running a local quorum driver.
let connection_ip: Option<SocketAddr> = $request.remote_addr();

// We will hit this case if the IO type used does not
// implement Connected or when using a unix domain socket.
// TODO: once we have confirmed that no legitimate traffic
// is hitting this case, we should reject such requests that
// hit this case.
if connection_ip.is_none() {
if cfg!(msim) {
// Ignore the error from simtests.
} else if cfg!(test) {
panic!("Failed to get remote address from request");
} else {
$self.metrics.connection_ip_not_found.inc();
error!("Failed to get remote address from request");
}
if $self.client_id_source.is_none() {
return $self.$func_name($request).await;
}

let proxy_ip: Option<SocketAddr> =
if let Some(op) = $request.metadata().get("x-forwarded-for") {
match op.to_str() {
Ok(ip) => match ip.parse() {
Ok(ret) => Some(ret),
let client = match $self.client_id_source.as_ref().unwrap() {
ClientIdSource::SocketAddr => {
let socket_addr: Option<SocketAddr> = $request.remote_addr();

// We will hit this case if the IO type used does not
// implement Connected or when using a unix domain socket.
// TODO: once we have confirmed that no legitimate traffic
// is hitting this case, we should reject such requests that
// hit this case.
if let Some(socket_addr) = socket_addr {
Some(socket_addr.ip())
} else {
if cfg!(msim) {
// Ignore the error from simtests.
} else if cfg!(test) {
panic!("Failed to get remote address from request");
} else {
$self.metrics.connection_ip_not_found.inc();
error!("Failed to get remote address from request");
}
None
}
}
ClientIdSource::XForwardedFor => {
if let Some(op) = $request.metadata().get("x-forwarded-for") {
match op.to_str() {
Ok(header_val) => {
match header_val.parse::<SocketAddr>() {
Ok(socket_addr) => Some(socket_addr.ip()),
Err(err) => {
$self.metrics.forwarded_header_parse_error.inc();
error!(
"Failed to parse x-forwarded-for header value of {:?} to ip address: {:?}. \
Please ensure that your proxy is configured to resolve client domains to an \
IP address before writing header",
header_val,
err,
);
None
}
}
}
Err(e) => {
$self.metrics.forwarded_header_parse_error.inc();
error!("Failed to parse x-forwarded-for header value to SocketAddr: {:?}", e);
// TODO: once we have confirmed that no legitimate traffic
// is hitting this case, we should reject such requests that
// hit this case.
$self.metrics.forwarded_header_invalid.inc();
error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
None
}
},
Err(e) => {
// TODO: once we have confirmed that no legitimate traffic
// is hitting this case, we should reject such requests that
// hit this case.
$self.metrics.forwarded_header_invalid.inc();
error!("Invalid UTF-8 in x-forwarded-for header: {:?}", e);
None
}
} else {
$self.metrics.forwarded_header_not_included.inc();
error!("x-forwarded-header not present for request despite node configuring XForwardedFor tracking type");
None
}
} else {
None
};
}
};

// check if either IP is blocked, in which case return early
$self.handle_traffic_req(connection_ip, proxy_ip).await?;
$self.handle_traffic_req(client.clone()).await?;
// handle request
let response = $self.$func_name($request).await;
// handle response tallying
$self.handle_traffic_resp(connection_ip, proxy_ip, &response);
$self.handle_traffic_resp(client, &response);
response
}};
}
Expand Down
Loading

0 comments on commit 116e527

Please sign in to comment.