Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ This project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
transmit source port is in range, detection drives Down when the peer goes
silent). See ADR-0067 for the staged plan and deferral list (multihop,
echo/demand, auth, dynamic-peer BFD, IPv6 link-local → v1.1).
- **ADR-0067 BFD operator inspection surface.** New `BfdService.GetBfdSessions`
gRPC RPC (`BfdSession` { peer, state, diagnostic, strict }, optional
peer-address filter) reading the actor's status snapshot, and
`rustbgpctl bfd [list | show <peer>]` (JSON + table). Read-only, ADR-0064
tier `sensitive_read`. This slice also lands the **event proto contract**
(`EVENT_CATEGORY_BFD`, `BGP_EVENT_TYPE_BFD_SESSION_{UP,DOWN,STATE_CHANGED}`,
`BfdSessionEvent`, the `BgpEvent.bfd` oneof) so it is stable, but BFD events
**do not yet stream** over `EventService.WatchEvents` — actor event emission
lands in a follow-up.
- **ADR-0063 EVPN runtime convergence — `ip_vrf` relink.**
`EvpnService.ApplyEvpnRuntime` now commits an L2VNI re-homed to a different
IP-VRF (or its `ip_vrf` link added/removed) at runtime. A relink edits no
Expand Down
10 changes: 8 additions & 2 deletions crates/api/src/authz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ pub const METHODS: &[GrpcMethodAuthz] = &[
"/rustbgpd.v1.RibService/ListEvpnRoutes",
AuthTier::SensitiveRead,
),
method(
"rustbgpd.v1.BfdService",
"GetBfdSessions",
"/rustbgpd.v1.BfdService/GetBfdSessions",
AuthTier::SensitiveRead,
),
method(
"rustbgpd.v1.EventService",
"WatchEvents",
Expand Down Expand Up @@ -663,7 +669,7 @@ mod tests {
.collect::<BTreeSet<_>>();

assert_eq!(matrix_methods, proto_methods);
assert_eq!(METHODS.len(), 71);
assert_eq!(METHODS.len(), 72);
}

#[test]
Expand Down Expand Up @@ -704,7 +710,7 @@ mod tests {
#[test]
fn method_matrix_tier_counts_match_inventory() {
assert_eq!(method_count_by_tier(AuthTier::Read), 0);
assert_eq!(method_count_by_tier(AuthTier::SensitiveRead), 36);
assert_eq!(method_count_by_tier(AuthTier::SensitiveRead), 37);
assert_eq!(method_count_by_tier(AuthTier::Mutating), 17);
assert_eq!(method_count_by_tier(AuthTier::OperatorOnly), 18);
}
Expand Down
147 changes: 147 additions & 0 deletions crates/api/src/bfd_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//! gRPC BFD service — single-hop BFD session inspection (ADR-0067).
//!
//! Read-only operator surface over the BFD actor's published session status.
//! The actor owns the sessions; this service just snapshots their state.

use tonic::{Request, Response, Status};

use crate::proto;

/// Live snapshot provider for BFD session status. The daemon wires this to the
/// BFD actor's status `watch` channel; off Linux / when BFD is unconfigured it
/// is an empty-vec closure.
pub type BfdSessionSnapshotFn =
std::sync::Arc<dyn Fn() -> Vec<proto::BfdSession> + Send + Sync + 'static>;

/// gRPC service exposing BFD session state (read-only).
pub struct BfdService {
snapshot: BfdSessionSnapshotFn,
}

impl BfdService {
/// Create a BFD service backed by a live session snapshot provider.
pub fn with_snapshot(snapshot: BfdSessionSnapshotFn) -> Self {
Self { snapshot }
}
}

impl Default for BfdService {
/// A service with no sessions — used off Linux / when BFD is unconfigured.
fn default() -> Self {
Self {
snapshot: std::sync::Arc::new(Vec::new),
}
}
}

#[tonic::async_trait]
impl proto::bfd_service_server::BfdService for BfdService {
async fn get_bfd_sessions(
&self,
request: Request<proto::GetBfdSessionsRequest>,
) -> Result<Response<proto::GetBfdSessionsResponse>, Status> {
let filter = request.into_inner().peer_address;
let mut sessions = (self.snapshot)();
if !filter.is_empty() {
// Parse to IpAddr and compare canonicalized forms so equivalent
// textual representations (notably IPv6) match — mirrors the
// address-filter handling in NeighborService / RibService. Snapshot
// peer addresses are already `IpAddr::to_string()` (canonical).
let wanted = filter
.parse::<std::net::IpAddr>()
.map_err(|e| Status::invalid_argument(format!("invalid peer_address: {e}")))?
.to_string();
sessions.retain(|s| s.peer_address == wanted);
}
Comment on lines +43 to +55
Ok(Response::new(proto::GetBfdSessionsResponse { sessions }))
}
}

#[cfg(test)]
mod tests {
use super::*;
use proto::bfd_service_server::BfdService as _;

fn session(peer: &str, state: proto::BfdSessionState) -> proto::BfdSession {
proto::BfdSession {
peer_address: peer.to_string(),
state: state as i32,
diagnostic: "none".to_string(),
strict: false,
}
}

#[tokio::test]
async fn returns_all_sessions_when_unfiltered() {
let svc = BfdService::with_snapshot(std::sync::Arc::new(|| {
vec![
session("10.0.0.1", proto::BfdSessionState::Up),
session("10.0.0.2", proto::BfdSessionState::Down),
]
}));
let resp = svc
.get_bfd_sessions(Request::new(proto::GetBfdSessionsRequest::default()))
.await
.unwrap()
.into_inner();
assert_eq!(resp.sessions.len(), 2);
}

#[tokio::test]
async fn filters_by_peer_address() {
let svc = BfdService::with_snapshot(std::sync::Arc::new(|| {
vec![
session("10.0.0.1", proto::BfdSessionState::Up),
session("10.0.0.2", proto::BfdSessionState::Down),
]
}));
let resp = svc
.get_bfd_sessions(Request::new(proto::GetBfdSessionsRequest {
peer_address: "10.0.0.2".to_string(),
}))
.await
.unwrap()
.into_inner();
assert_eq!(resp.sessions.len(), 1);
assert_eq!(resp.sessions[0].peer_address, "10.0.0.2");
}

#[tokio::test]
async fn rejects_invalid_peer_address() {
let svc = BfdService::default();
let err = svc
.get_bfd_sessions(Request::new(proto::GetBfdSessionsRequest {
peer_address: "not-an-ip".to_string(),
}))
.await
.unwrap_err();
assert_eq!(err.code(), tonic::Code::InvalidArgument);
}

#[tokio::test]
async fn ipv6_filter_matches_canonical_form() {
// Snapshot stores the canonical form; a non-canonical request still matches.
let svc = BfdService::with_snapshot(std::sync::Arc::new(|| {
vec![session("2001:db8::1", proto::BfdSessionState::Up)]
}));
let resp = svc
.get_bfd_sessions(Request::new(proto::GetBfdSessionsRequest {
peer_address: "2001:DB8:0:0:0:0:0:1".to_string(),
}))
.await
.unwrap()
.into_inner();
assert_eq!(resp.sessions.len(), 1);
}

#[tokio::test]
async fn default_service_has_no_sessions() {
let svc = BfdService::default();
let resp = svc
.get_bfd_sessions(Request::new(proto::GetBfdSessionsRequest::default()))
.await
.unwrap()
.into_inner();
assert!(resp.sessions.is_empty());
}
}
4 changes: 4 additions & 0 deletions crates/api/src/event_service/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ pub(crate) fn route_event_to_bgp_event(event: rustbgpd_rib::RouteEvent) -> proto
| proto::BgpEventType::EvpnRouteAdded
| proto::BgpEventType::EvpnRouteWithdrawn
| proto::BgpEventType::EvpnRouteBestChanged
| proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged => "changed",
},
route.prefix,
Expand Down Expand Up @@ -288,6 +291,7 @@ pub(crate) fn stream_lag_bgp_event(
proto::EventCategory::Policy => "policy",
proto::EventCategory::Dataplane => "dataplane",
proto::EventCategory::Evpn => "evpn",
proto::EventCategory::Bfd => "bfd",
proto::EventCategory::Unspecified => "unknown",
};
let summary = format!("{source} event stream lagged; missed {missed_count} event(s)");
Expand Down
26 changes: 26 additions & 0 deletions crates/api/src/event_service/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ fn bgp_event_type_to_session_event_type(
| proto::BgpEventType::DataplaneRouteInstalled
| proto::BgpEventType::DataplaneRouteWithdrawn
| proto::BgpEventType::DataplaneRouteFailed
| proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged => None,
}
}
Expand All @@ -388,6 +391,14 @@ fn parse_category_filter(categories: &[i32]) -> Result<BTreeSet<i32>, Status> {
| proto::EventCategory::Evpn => {
parsed.insert(category as i32);
}
// The BFD event proto contract exists, but the actor does not yet
// emit into WatchEvents (ADR-0067 step 3b). Reject the filter rather
// than hand back an empty/immediately-closed stream.
proto::EventCategory::Bfd => {
return Err(Status::invalid_argument(
"BFD event streaming is not yet available",
));
}
Comment on lines +394 to +401
proto::EventCategory::Unspecified => {
return Err(Status::invalid_argument(
"EVENT_CATEGORY_UNSPECIFIED is not a valid filter",
Expand Down Expand Up @@ -426,6 +437,15 @@ fn parse_event_type_filter(event_types: &[i32]) -> Result<BTreeSet<i32>, Status>
| proto::BgpEventType::StreamLagged => {
parsed.insert(event_type as i32);
}
// BFD event types are defined but not yet streamed (ADR-0067 step
// 3b); reject rather than silently match nothing.
proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged => {
return Err(Status::invalid_argument(
"BFD event streaming is not yet available",
));
}
proto::BgpEventType::Unspecified => {
return Err(Status::invalid_argument(
"BGP_EVENT_TYPE_UNSPECIFIED is not a valid filter",
Expand Down Expand Up @@ -487,6 +507,9 @@ fn parse_policy_event_type_filter(event_types: &[i32]) -> Result<(), Status> {
| proto::BgpEventType::EvpnRouteAdded
| proto::BgpEventType::EvpnRouteWithdrawn
| proto::BgpEventType::EvpnRouteBestChanged
| proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged => {
return Err(Status::invalid_argument(
"ListPolicyEvents only supports policy event types",
Expand Down Expand Up @@ -533,6 +556,9 @@ fn parse_evpn_event_type_filter(event_types: &[i32]) -> Result<BTreeSet<RouteEve
| proto::BgpEventType::DataplaneRouteInstalled
| proto::BgpEventType::DataplaneRouteWithdrawn
| proto::BgpEventType::DataplaneRouteFailed
| proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged => {
return Err(Status::invalid_argument(
"ListEvpnEvents only supports EVPN event types",
Expand Down
1 change: 1 addition & 0 deletions crates/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod audit;
pub mod authz;
pub mod authz_principal;
pub mod authz_runtime;
pub mod bfd_service;
mod config_service;
mod connect_info;
mod control_service;
Expand Down
18 changes: 18 additions & 0 deletions crates/api/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing::{error, info, warn};

use crate::authz::{AuthEnforcement, AuthTier, PrincipalRole};
use crate::authz_runtime::{BearerAuthSecret, GrpcAuthAuditContext, GrpcAuthnKind, GrpcAuthzLayer};
use crate::bfd_service::BfdService;
use crate::config_service::ConfigService;
use crate::connect_info::RustbgpdTcpStream;
use crate::control_service::{ControlService, MrtTriggerTx};
Expand All @@ -34,6 +35,7 @@ use crate::neighbor_service::NeighborService;
use crate::peer_group_service::PeerGroupService;
use crate::peer_types::{ConfigEvent, PeerManagerCommand};
use crate::policy_service::PolicyService;
use crate::proto::bfd_service_server::BfdServiceServer;
use crate::proto::config_service_server::ConfigServiceServer;
use crate::proto::control_service_server::ControlServiceServer;
use crate::proto::event_service_server::EventServiceServer;
Expand Down Expand Up @@ -108,6 +110,9 @@ pub struct ServeConfig {
/// separate from the aggregate dataplane poller so route events
/// are not delayed by snapshot polling.
pub dataplane_route_events: Option<tokio::sync::broadcast::Sender<crate::proto::BgpEvent>>,
/// Live snapshot reader for ADR-0067 single-hop BFD session state.
/// Returns an empty list when no BFD sessions are configured or off Linux.
pub bfd_session_snapshot: crate::bfd_service::BfdSessionSnapshotFn,
}

/// Resolved gRPC listener configuration.
Expand Down Expand Up @@ -377,6 +382,7 @@ async fn run_listener(
let blackhole_discard_snapshot = config.blackhole_discard_snapshot;
let fib_route_snapshot = config.fib_route_snapshot;
let dataplane_route_events = config.dataplane_route_events;
let bfd_session_snapshot = config.bfd_session_snapshot;
let ListenerConfig {
endpoint,
access_mode,
Expand Down Expand Up @@ -420,6 +426,7 @@ async fn run_listener(
blackhole_discard_snapshot,
fib_route_snapshot,
dataplane_route_events,
bfd_session_snapshot,
dataplane_events,
shutdown_rx,
rpc_shutdown_tx,
Expand Down Expand Up @@ -458,6 +465,7 @@ async fn run_listener(
blackhole_discard_snapshot,
fib_route_snapshot,
dataplane_route_events,
bfd_session_snapshot,
dataplane_events,
shutdown_rx,
rpc_shutdown_tx,
Expand Down Expand Up @@ -503,6 +511,7 @@ async fn run_tcp_listener(
blackhole_discard_snapshot: crate::rib_service::BlackholeDiscardSnapshotFn,
fib_route_snapshot: crate::rib_service::FibRouteSnapshotFn,
dataplane_route_events: Option<tokio::sync::broadcast::Sender<crate::proto::BgpEvent>>,
bfd_session_snapshot: crate::bfd_service::BfdSessionSnapshotFn,
dataplane_events: DataplaneEventBroadcaster,
shutdown_rx: watch::Receiver<bool>,
rpc_shutdown_tx: watch::Sender<bool>,
Expand Down Expand Up @@ -566,6 +575,10 @@ async fn run_tcp_listener(
),
interceptor.clone(),
))
.add_service(BfdServiceServer::with_interceptor(
BfdService::with_snapshot(bfd_session_snapshot),
interceptor.clone(),
))
.add_service(InjectionServiceServer::with_interceptor(
InjectionService::new(rib_tx, access_mode),
interceptor.clone(),
Expand Down Expand Up @@ -663,6 +676,7 @@ async fn run_uds_listener(
blackhole_discard_snapshot: crate::rib_service::BlackholeDiscardSnapshotFn,
fib_route_snapshot: crate::rib_service::FibRouteSnapshotFn,
dataplane_route_events: Option<tokio::sync::broadcast::Sender<crate::proto::BgpEvent>>,
bfd_session_snapshot: crate::bfd_service::BfdSessionSnapshotFn,
dataplane_events: DataplaneEventBroadcaster,
shutdown_rx: watch::Receiver<bool>,
rpc_shutdown_tx: watch::Sender<bool>,
Expand Down Expand Up @@ -708,6 +722,10 @@ async fn run_uds_listener(
),
interceptor.clone(),
))
.add_service(BfdServiceServer::with_interceptor(
BfdService::with_snapshot(bfd_session_snapshot),
interceptor.clone(),
))
.add_service(InjectionServiceServer::with_interceptor(
InjectionService::new(rib_tx, access_mode),
interceptor.clone(),
Expand Down
Loading
Loading