Skip to content

Commit

Permalink
Add ordering to nat changeset (#5036)
Browse files Browse the repository at this point in the history
When querying the DB for nat entries, it is not sufficient to
filter for entries with a version_added and version_removed greater
than a specified value, then order. The sorted columns need to be
interleaved as well during the query, or we may accidentally skip
entries. This commit adds a view to the db that enables such a query.
  • Loading branch information
internet-diglett authored Feb 9, 2024
1 parent dcbc9cb commit 887a91e
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 18 deletions.
36 changes: 24 additions & 12 deletions nexus/db-model/src/ipv4_nat_entry.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::net::{Ipv4Addr, Ipv6Addr};

use super::MacAddr;
use crate::{schema::ipv4_nat_entry, Ipv4Net, Ipv6Net, SqlU16, Vni};
use crate::{
schema::ipv4_nat_changes, schema::ipv4_nat_entry, Ipv4Net, Ipv6Net, SqlU16,
Vni,
};
use chrono::{DateTime, Utc};
use omicron_common::api::external;
use schemars::JsonSchema;
Expand Down Expand Up @@ -48,6 +51,20 @@ impl Ipv4NatEntry {
}
}

/// Summary of changes to ipv4 nat entries.
#[derive(Queryable, Debug, Clone, Selectable, Serialize, Deserialize)]
#[diesel(table_name = ipv4_nat_changes)]
pub struct Ipv4NatChange {
pub external_address: Ipv4Net,
pub first_port: SqlU16,
pub last_port: SqlU16,
pub sled_address: Ipv6Net,
pub vni: Vni,
pub mac: MacAddr,
pub version: i64,
pub deleted: bool,
}

/// NAT Record
#[derive(Clone, Debug, Serialize, JsonSchema)]
pub struct Ipv4NatEntryView {
Expand All @@ -61,22 +78,17 @@ pub struct Ipv4NatEntryView {
pub deleted: bool,
}

impl From<Ipv4NatEntry> for Ipv4NatEntryView {
fn from(value: Ipv4NatEntry) -> Self {
let (gen, deleted) = match value.version_removed {
Some(gen) => (gen, true),
None => (value.version_added, false),
};

impl From<Ipv4NatChange> for Ipv4NatEntryView {
fn from(value: Ipv4NatChange) -> Self {
Self {
external_address: value.external_address.ip(),
first_port: value.first_port(),
last_port: value.last_port(),
first_port: value.first_port.into(),
last_port: value.last_port.into(),
sled_address: value.sled_address.ip(),
vni: value.vni.0,
mac: *value.mac,
gen,
deleted,
gen: value.version,
deleted: value.deleted,
}
}
}
16 changes: 15 additions & 1 deletion nexus/db-model/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use omicron_common::api::external::SemverVersion;
///
/// This should be updated whenever the schema is changed. For more details,
/// refer to: schema/crdb/README.adoc
pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(32, 0, 0);
pub const SCHEMA_VERSION: SemverVersion = SemverVersion::new(33, 0, 1);

table! {
disk (id) {
Expand Down Expand Up @@ -546,6 +546,20 @@ table! {
}
}

// View used for summarizing changes to ipv4_nat_entry
table! {
ipv4_nat_changes (version) {
external_address -> Inet,
first_port -> Int4,
last_port -> Int4,
sled_address -> Inet,
vni -> Int4,
mac -> Int8,
version -> Int8,
deleted -> Bool,
}
}

// This is the sequence used for the version number
// in ipv4_nat_entry.
table! {
Expand Down
169 changes: 165 additions & 4 deletions nexus/db-queries/src/db/datastore/ipv4_nat_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use chrono::{DateTime, Utc};
use diesel::prelude::*;
use diesel::sql_types::BigInt;
use nexus_db_model::ExternalIp;
use nexus_db_model::Ipv4NatChange;
use nexus_db_model::Ipv4NatEntryView;
use omicron_common::api::external::CreateResult;
use omicron_common::api::external::DeleteResult;
Expand Down Expand Up @@ -317,10 +318,19 @@ impl DataStore {
version: i64,
limit: u32,
) -> ListResultVec<Ipv4NatEntryView> {
let nat_entries =
self.ipv4_nat_list_since_version(opctx, version, limit).await?;
use db::schema::ipv4_nat_changes::dsl;

let nat_changes = dsl::ipv4_nat_changes
.filter(dsl::version.gt(version))
.limit(limit as i64)
.order_by(dsl::version)
.select(Ipv4NatChange::as_select())
.load_async(&*self.pool_connection_authorized(opctx).await?)
.await
.map_err(|e| public_error_from_diesel(e, ErrorHandler::Server))?;

let nat_entries: Vec<Ipv4NatEntryView> =
nat_entries.iter().map(|e| e.clone().into()).collect();
nat_changes.iter().map(|e| e.clone().into()).collect();
Ok(nat_entries)
}

Expand Down Expand Up @@ -367,14 +377,15 @@ fn ipv4_nat_next_version() -> diesel::expression::SqlLiteral<BigInt> {

#[cfg(test)]
mod test {
use std::str::FromStr;
use std::{net::Ipv4Addr, str::FromStr};

use crate::db::datastore::datastore_test;
use chrono::Utc;
use nexus_db_model::{Ipv4NatEntry, Ipv4NatValues, MacAddr, Vni};
use nexus_test_utils::db::test_setup_database;
use omicron_common::api::external;
use omicron_test_utils::dev;
use rand::seq::IteratorRandom;

// Test our ability to track additions and deletions since a given version number
#[tokio::test]
Expand Down Expand Up @@ -802,4 +813,154 @@ mod test {
db.cleanup().await.unwrap();
logctx.cleanup_successful();
}

// Test our ability to return all changes interleaved in the correct order
#[tokio::test]
async fn ipv4_nat_changeset() {
let logctx = dev::test_setup_log("test_nat_version_tracking");
let mut db = test_setup_database(&logctx.log).await;
let (opctx, datastore) = datastore_test(&logctx, &db).await;

// We should not have any NAT entries at this moment
let initial_state =
datastore.ipv4_nat_list_since_version(&opctx, 0, 10).await.unwrap();

assert!(initial_state.is_empty());
assert_eq!(
datastore.ipv4_nat_current_version(&opctx).await.unwrap(),
0
);

let addresses = (0..=255).map(|i| {
let addr = Ipv4Addr::new(10, 0, 0, i);
let net = ipnetwork::Ipv4Network::new(addr, 32).unwrap();
external::Ipv4Net(net)
});

let sled_address = external::Ipv6Net(
ipnetwork::Ipv6Network::try_from("fd00:1122:3344:104::1").unwrap(),
);

let nat_entries = addresses.map(|external_address| {
// build a bunch of nat entries
Ipv4NatValues {
external_address: external_address.into(),
first_port: u16::MIN.into(),
last_port: u16::MAX.into(),
sled_address: sled_address.into(),
vni: Vni(external::Vni::random()),
mac: MacAddr(external::MacAddr::random_guest()),
}
});

let mut db_records = vec![];

// create the nat entries
for entry in nat_entries {
let result = datastore
.ensure_ipv4_nat_entry(&opctx, entry.clone())
.await
.unwrap();

db_records.push(result);
}

// delete a subset of the entries
for entry in
db_records.iter().choose_multiple(&mut rand::thread_rng(), 50)
{
datastore.ipv4_nat_delete(&opctx, entry).await.unwrap();
}

// get the new state of all nat entries
// note that this is not the method under test
let db_records = datastore
.ipv4_nat_list_since_version(&opctx, 0, 300)
.await
.unwrap();

// Count the actual number of changes seen.
// This check is required because we _were_ getting changes in ascending order,
// but some entries were being skipped. We want to ensure we are getting
// *all* of the changes in ascending order.
let mut total_changes = 0;

// ensure that the changeset is ordered, displaying the correct
// version numbers, and displaying the correct `deleted` status
let mut version = 0;
let limit = 100;
let mut changes =
datastore.ipv4_nat_changeset(&opctx, version, limit).await.unwrap();

while !changes.is_empty() {
// check ordering
assert!(changes
.windows(2)
.all(|entries| entries[0].gen < entries[1].gen));

// check deleted status and version numbers
changes.iter().for_each(|change| match change.deleted {
true => {
// version should match a deleted entry
let deleted_nat = db_records
.iter()
.find(|entry| entry.version_removed == Some(change.gen))
.expect("did not find a deleted nat entry with a matching version number");

assert_eq!(
deleted_nat.external_address.ip(),
change.external_address
);
assert_eq!(
deleted_nat.first_port,
change.first_port.into()
);
assert_eq!(deleted_nat.last_port, change.last_port.into());
assert_eq!(
deleted_nat.sled_address.ip(),
change.sled_address
);
assert_eq!(*deleted_nat.mac, change.mac);
assert_eq!(deleted_nat.vni.0, change.vni);
}
false => {
// version should match an active nat entry
let added_nat = db_records
.iter()
.find(|entry| entry.version_added == change.gen)
.expect("did not find an active nat entry with a matching version number");

assert!(added_nat.version_removed.is_none());

assert_eq!(
added_nat.external_address.ip(),
change.external_address
);
assert_eq!(added_nat.first_port, change.first_port.into());
assert_eq!(added_nat.last_port, change.last_port.into());
assert_eq!(
added_nat.sled_address.ip(),
change.sled_address
);
assert_eq!(*added_nat.mac, change.mac);
assert_eq!(added_nat.vni.0, change.vni);
}
});

// bump the count of changes seen
total_changes += changes.len();

version = changes.last().unwrap().gen;
changes = datastore
.ipv4_nat_changeset(&opctx, version, limit)
.await
.unwrap();
}

// did we see everything?
assert_eq!(total_changes, db_records.len());

db.cleanup().await.unwrap();
logctx.cleanup_successful();
}
}
42 changes: 42 additions & 0 deletions schema/crdb/33.0.0/up01.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* A view of the ipv4 nat change history
* used to summarize changes for external viewing
*/
CREATE VIEW IF NOT EXISTS omicron.public.ipv4_nat_changes
AS
WITH interleaved_versions AS (
SELECT
external_address,
first_port,
last_port,
sled_address,
vni,
mac,
version_added AS version,
(version_removed IS NOT NULL) as deleted
FROM ipv4_nat_entry
WHERE version_removed IS NULL

UNION

SELECT
external_address,
first_port,
last_port,
sled_address,
vni,
mac,
version_added AS version,
(version_removed IS NOT NULL) as deleted
FROM ipv4_nat_entry WHERE version_removed IS NOT NULL
)
SELECT
external_address,
first_port,
last_port,
sled_address,
vni,
mac,
version,
deleted
FROM interleaved_versions;
1 change: 1 addition & 0 deletions schema/crdb/33.0.1/up01.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP VIEW IF EXISTS omicron.public.ipv4_nat_changes;
Loading

0 comments on commit 887a91e

Please sign in to comment.