Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,15 @@ This project follows [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
(`EVENT_CATEGORY_BFD`, `BGP_EVENT_TYPE_BFD_SESSION_{UP,DOWN,STATE_CHANGED}`,
`BfdSessionEvent`, the `BgpEvent.bfd` oneof) so it is stable, but BFD events
**do not yet stream** over `EventService.WatchEvents` — actor event emission
lands in a follow-up.
lands in a follow-up (the next entry).
- **ADR-0067 BFD event emission.** The BFD actor now publishes session
state-change events into the unified `EventService.WatchEvents` stream as
`BfdSessionEvent` payloads (`EVENT_CATEGORY_BFD`,
`BGP_EVENT_TYPE_BFD_SESSION_{UP,DOWN,STATE_CHANGED}`). Opt-in like the
dataplane / EVPN categories (not in the default route+session set);
filterable by category, event type, and peer address. The actor stays
decoupled from the gRPC proto — it broadcasts an internal event that a daemon
bridge converts (mirrors the FIB dataplane bridge).
- **ADR-0063 EVPN runtime convergence — `ip_vrf` relink.**
`EvpnService.ApplyEvpnRuntime` now commits an L2VNI re-homed to a different
IP-VRF (or its `ip_vrf` link added/removed) at runtime. A relink edits no
Expand Down
162 changes: 161 additions & 1 deletion crates/api/src/event_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ const DATAPLANE_EVENT_POLL_INTERVAL: std::time::Duration = std::time::Duration::

pub(crate) type DataplaneEventBroadcaster = Arc<Mutex<Option<broadcast::Sender<proto::BgpEvent>>>>;
pub(crate) type DataplaneRouteEventBroadcaster = Option<broadcast::Sender<proto::BgpEvent>>;
/// Live ADR-0067 BFD session-event source for `WatchEvents`. `None` disables
/// the BFD event stream.
pub(crate) type BfdEventBroadcaster = Option<broadcast::Sender<proto::BgpEvent>>;

#[must_use]
pub(crate) fn dataplane_event_broadcaster() -> DataplaneEventBroadcaster {
Expand All @@ -56,6 +59,7 @@ pub struct EventService {
fib_route_snapshot: FibRouteSnapshotFn,
dataplane_events: DataplaneEventBroadcaster,
dataplane_route_events: DataplaneRouteEventBroadcaster,
bfd_events: BfdEventBroadcaster,
metrics: BgpMetrics,
Comment on lines 59 to 63
}

Expand Down Expand Up @@ -105,18 +109,24 @@ impl EventService {
fib_route_snapshot,
dataplane_events,
None,
None,
BgpMetrics::new(),
)
}

#[must_use]
#[expect(
clippy::too_many_arguments,
reason = "EventService aggregates several independent event sources + snapshots"
)]
pub(crate) fn with_dataplane_snapshots_broadcaster_and_metrics(
rib_tx: tokio::sync::mpsc::Sender<RibUpdate>,
peer_mgr_tx: tokio::sync::mpsc::Sender<PeerManagerCommand>,
blackhole_discard_snapshot: BlackholeDiscardSnapshotFn,
fib_route_snapshot: FibRouteSnapshotFn,
dataplane_events: DataplaneEventBroadcaster,
dataplane_route_events: DataplaneRouteEventBroadcaster,
bfd_events: BfdEventBroadcaster,
metrics: BgpMetrics,
) -> Self {
Self {
Expand All @@ -126,6 +136,7 @@ impl EventService {
fib_route_snapshot,
dataplane_events,
dataplane_route_events,
bfd_events,
metrics,
}
}
Expand Down Expand Up @@ -421,12 +432,52 @@ impl proto::event_service_server::EventService for EventService {
Box::pin(tokio_stream::empty())
};

let bfd_stream: Pin<Box<dyn Stream<Item = Result<proto::BgpEvent, Status>> + Send>> =
if filter.wants_bfd_events() {
if let Some(bfd_rx) = self
.bfd_events
.as_ref()
.map(tokio::sync::broadcast::Sender::subscribe)
{
let bfd_filter = filter.clone();
let bfd_metrics = self.metrics.clone();
let bfd_subscriber_guard =
bfd_metrics.event_stream_subscriber_guard("watch_events", "bfd");
Box::pin(BroadcastStream::new(bfd_rx).filter_map(
move |result| match result {
Err(BroadcastStreamRecvError::Lagged(missed)) => {
let _subscriber_guard = &bfd_subscriber_guard;
bfd_metrics.record_event_stream_lagged("watch_events", "bfd", missed);
debug!(
missed,
"WatchEvents BFD subscriber lagged, emitting missed-event signal"
);
Some(Ok(stream_lag_bgp_event(proto::EventCategory::Bfd, missed)))
}
Ok(event) => {
let _subscriber_guard = &bfd_subscriber_guard;
if bfd_filter.matches_bfd_event(&event) {
Some(Ok(event))
} else {
None
}
}
},
))
} else {
Box::pin(tokio_stream::empty())
}
} else {
Box::pin(tokio_stream::empty())
};

Ok(Response::new(Box::pin(
route_stream
.merge(session_stream)
.merge(policy_stream)
.merge(dataplane_stream)
.merge(evpn_stream),
.merge(evpn_stream)
.merge(bfd_stream),
)))
}

Expand Down Expand Up @@ -1251,6 +1302,7 @@ mod tests {
Arc::new(Vec::new),
dataplane_event_broadcaster(),
Some(route_tx.clone()),
None,
BgpMetrics::new(),
);
let response = service
Expand Down Expand Up @@ -1309,6 +1361,112 @@ mod tests {
assert_eq!(payload.table_name, "blue");
}

fn bfd_bgp_event(event_type: proto::BgpEventType, peer: &str) -> proto::BgpEvent {
proto::BgpEvent {
timestamp: "0".to_string(),
category: proto::EventCategory::Bfd as i32,
event_type: event_type as i32,
severity: proto::EventSeverity::Info as i32,
peer_address: peer.to_string(),
previous_peer_address: String::new(),
prefix: String::new(),
prefix_length: 0,
afi_safi: proto::AddressFamily::Unspecified as i32,
summary: format!("bfd {peer}"),
target_peer_address: String::new(),
payload: Some(proto::bgp_event::Payload::Bfd(proto::BfdSessionEvent {
event_type: event_type as i32,
peer_address: peer.to_string(),
timestamp: "0".to_string(),
old_state: proto::BfdSessionState::Down as i32,
new_state: proto::BfdSessionState::Up as i32,
diagnostic: "none".to_string(),
reason: String::new(),
})),
}
}

#[tokio::test]
async fn bfd_events_stream_filtered_by_category_and_peer() {
let (rib_tx, _) = spawn_fake_rib();
let (peer_tx, _, _) = spawn_fake_peer_manager();
let (bfd_tx, _) = broadcast::channel(16);
let service = EventService::with_dataplane_snapshots_broadcaster_and_metrics(
rib_tx,
peer_tx,
Arc::new(Vec::new),
Arc::new(Vec::new),
dataplane_event_broadcaster(),
None,
Some(bfd_tx.clone()),
BgpMetrics::new(),
);
let response = service
.watch_events(Request::new(proto::WatchEventsRequest {
categories: vec![proto::EventCategory::Bfd as i32],
event_types: vec![],
neighbor_address: "10.0.0.1".to_string(),
afi_safi: proto::AddressFamily::Unspecified as i32,
prefix: String::new(),
prefix_length: 0,
}))
.await
.unwrap();
let mut stream = response.into_inner();

// Wrong peer — filtered out.
bfd_tx
.send(bfd_bgp_event(proto::BgpEventType::BfdSessionUp, "10.0.0.2"))
.unwrap();
// Matching peer — delivered.
bfd_tx
.send(bfd_bgp_event(proto::BgpEventType::BfdSessionUp, "10.0.0.1"))
.unwrap();

let event = tokio::time::timeout(std::time::Duration::from_secs(1), stream.next())
.await
.unwrap()
.unwrap()
.unwrap();
assert_eq!(event.category, proto::EventCategory::Bfd as i32);
assert_eq!(event.event_type, proto::BgpEventType::BfdSessionUp as i32);
assert_eq!(event.peer_address, "10.0.0.1");
let Some(proto::bgp_event::Payload::Bfd(payload)) = event.payload else {
panic!("expected bfd payload");
};
assert_eq!(payload.new_state, proto::BfdSessionState::Up as i32);
}

#[tokio::test]
async fn bfd_events_not_in_default_route_session_stream() {
let (rib_tx, _) = spawn_fake_rib();
let (peer_tx, _, _) = spawn_fake_peer_manager();
let (bfd_tx, _) = broadcast::channel(16);
let service = EventService::with_dataplane_snapshots_broadcaster_and_metrics(
rib_tx,
peer_tx,
Arc::new(Vec::new),
Arc::new(Vec::new),
dataplane_event_broadcaster(),
None,
Some(bfd_tx.clone()),
BgpMetrics::new(),
);
// Default request (no categories) = route + session only; BFD opt-in.
let response = service
.watch_events(Request::new(proto::WatchEventsRequest::default()))
.await
.unwrap();
let mut stream = response.into_inner();
// The default stream never subscribes to BFD, so `send` may report no
// receivers — that absence is exactly the property under test.
let _ = bfd_tx.send(bfd_bgp_event(proto::BgpEventType::BfdSessionUp, "10.0.0.1"));
// The BFD event must not leak into the default stream.
let result =
tokio::time::timeout(std::time::Duration::from_millis(300), stream.next()).await;
assert!(result.is_err(), "BFD event leaked into the default stream");
}

#[test]
fn dataplane_summaries_count_blackhole_and_fib_states() {
let summaries = dataplane_summaries(
Expand Down Expand Up @@ -1998,6 +2156,7 @@ mod tests {
Arc::new(Vec::new),
dataplane_event_broadcaster(),
None,
None,
metrics.clone(),
);
let response = service
Expand Down Expand Up @@ -2038,6 +2197,7 @@ mod tests {
Arc::new(Vec::new),
dataplane_event_broadcaster(),
None,
None,
metrics.clone(),
);
let response = service
Expand Down
74 changes: 56 additions & 18 deletions crates/api/src/event_service/filters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,40 @@ impl WatchEventsFilter {
evpn_selected && self.afi.is_none() && self.prefix.is_none() && evpn_type_allowed
}

pub(super) fn wants_bfd_events(&self) -> bool {
// BFD events are peer-scoped but carry no family/prefix, so a
// family/prefix-filtered request never selects them (like session /
// EVPN). Opt-in: not part of the default route+session set.
let bfd_category_requested = self
.categories
.contains(&(proto::EventCategory::Bfd as i32));
let bfd_type_allowed = self.event_types_match_bfd_category();
let bfd_selected = bfd_category_requested
|| (self.categories.is_empty() && !self.event_types.is_empty() && bfd_type_allowed);
bfd_selected && self.afi.is_none() && self.prefix.is_none() && bfd_type_allowed
}

pub(super) fn matches_bfd_event(&self, event: &proto::BgpEvent) -> bool {
if !self.wants_bfd_events() || event.category != proto::EventCategory::Bfd as i32 {
return false;
}
let Ok(event_type) = proto::BgpEventType::try_from(event.event_type) else {
return false;
};
if !bfd_bgp_event_type_allowed(event_type) {
return false;
}
if !self.event_types.is_empty() && !self.event_types.contains(&event.event_type) {
return false;
}
if let Some(peer) = self.peer
&& event.peer_address.parse::<IpAddr>().ok() != Some(peer)
{
return false;
}
true
}

pub(super) fn matches_session_event(&self, event: &SessionEvent) -> bool {
if !self.wants_session_events() {
return false;
Expand Down Expand Up @@ -284,6 +318,23 @@ impl WatchEventsFilter {
)
})
}

fn event_types_match_bfd_category(&self) -> bool {
self.event_types.is_empty()
|| self.event_types.iter().any(|event_type| {
proto::BgpEventType::try_from(*event_type).is_ok_and(bfd_bgp_event_type_allowed)
})
}
}

fn bfd_bgp_event_type_allowed(event_type: proto::BgpEventType) -> bool {
matches!(
event_type,
proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged
)
}

fn dataplane_bgp_event_type_allowed(event_type: proto::BgpEventType) -> bool {
Expand Down Expand Up @@ -388,17 +439,10 @@ fn parse_category_filter(categories: &[i32]) -> Result<BTreeSet<i32>, Status> {
| proto::EventCategory::Session
| proto::EventCategory::Policy
| proto::EventCategory::Dataplane
| proto::EventCategory::Evpn => {
| proto::EventCategory::Evpn
| proto::EventCategory::Bfd => {
parsed.insert(category as i32);
}
// The BFD event proto contract exists, but the actor does not yet
// emit into WatchEvents (ADR-0067 step 3b). Reject the filter rather
// than hand back an empty/immediately-closed stream.
proto::EventCategory::Bfd => {
return Err(Status::invalid_argument(
"BFD event streaming is not yet available",
));
}
proto::EventCategory::Unspecified => {
return Err(Status::invalid_argument(
"EVENT_CATEGORY_UNSPECIFIED is not a valid filter",
Expand Down Expand Up @@ -434,18 +478,12 @@ fn parse_event_type_filter(event_types: &[i32]) -> Result<BTreeSet<i32>, Status>
| proto::BgpEventType::EvpnRouteAdded
| proto::BgpEventType::EvpnRouteWithdrawn
| proto::BgpEventType::EvpnRouteBestChanged
| proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged
| proto::BgpEventType::StreamLagged => {
parsed.insert(event_type as i32);
}
// BFD event types are defined but not yet streamed (ADR-0067 step
// 3b); reject rather than silently match nothing.
proto::BgpEventType::BfdSessionUp
| proto::BgpEventType::BfdSessionDown
| proto::BgpEventType::BfdSessionStateChanged => {
return Err(Status::invalid_argument(
"BFD event streaming is not yet available",
));
}
proto::BgpEventType::Unspecified => {
return Err(Status::invalid_argument(
"BGP_EVENT_TYPE_UNSPECIFIED is not a valid filter",
Expand Down
Loading
Loading