Skip to content

Commit

Permalink
Share some parts between sync and async parsers for protocol v2.
Browse files Browse the repository at this point in the history
  • Loading branch information
TSnake41 committed Aug 18, 2023
1 parent 3d2f251 commit ad90226
Showing 1 changed file with 82 additions and 96 deletions.
178 changes: 82 additions & 96 deletions xcp-metrics-common/src/rrdd/protocol_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,66 +77,26 @@ fn compute_data_checksum(timestamp: SystemTime, values: &[[u8; 8]]) -> u32 {
impl RrddMessageHeader {
/// Parse a message header from a readable source.
pub fn parse_from<R: Read>(input: &mut R) -> Result<Self, RrddProtocolError> {
let mut header_buffer = [0u8; PROTOCOL_V2_HEADER.len()];
let mut data_checksum_buffer = [0u8; 4];
let mut metadata_checksum_buffer = [0u8; 4];
let mut values_count_buffer = [0u8; 4];
let mut timestamp_buffer = [0u8; 8];

// Read the first part (all fields until datasource values)
let mut first_part_buffer = [0u8; RRDD_HEADER_LENGTH_PART1];
input.read_exact(&mut first_part_buffer)?;

// note: slices implements Read.
let mut first_part_slice = first_part_buffer.as_slice();

first_part_slice.read_exact(&mut header_buffer)?;

if *PROTOCOL_V2_HEADER != header_buffer {
return Err(RrddProtocolError::InvalidConstantString);
}

first_part_slice.read_exact(&mut data_checksum_buffer)?;
let data_checksum = u32::from_be_bytes(data_checksum_buffer);

first_part_slice.read_exact(&mut metadata_checksum_buffer)?;
let metadata_checksum = u32::from_be_bytes(metadata_checksum_buffer);

first_part_slice.read_exact(&mut values_count_buffer)?;
let values_count = u32::from_be_bytes(values_count_buffer);

first_part_slice.read_exact(&mut timestamp_buffer)?;
let timestamp_epoch = u64::from_be_bytes(timestamp_buffer);

let timestamp = time::UNIX_EPOCH + Duration::from_secs(timestamp_epoch);
let (data_checksum, metadata_checksum, values_count, timestamp) =
Self::parse_first_part(first_part_buffer, &mut timestamp_buffer)?;

// Second part (values and metadata length)
let mut second_part_buffer =
vec![0u8; (8 * values_count + 4/* metadata length */) as usize];
input.read_exact(&mut second_part_buffer)?;

// Split values and metadata
let (values_buffer, metadata_length_buffer) =
second_part_buffer.split_at(8 * values_count as usize);

// Check data checksum
let mut data_checksum_hasher = crc32fast::Hasher::new();
data_checksum_hasher.update(&timestamp_buffer);
data_checksum_hasher.update(values_buffer);

if data_checksum != data_checksum_hasher.finalize() {
return Err(RrddProtocolError::InvalidChecksum);
}

// TODO: Consider using slice::array_chunks when stabilized.
// https://github.com/rust-lang/rust/issues/74985
let values: Box<[[u8; 8]]> = values_buffer
.chunks_exact(8)
.map(|slice| slice.try_into().unwrap())
.collect();

let metadata_length =
u32::from_be_bytes(TryInto::<[u8; 4]>::try_into(metadata_length_buffer).unwrap());
let (values, metadata_length) = Self::parse_second_part(
&second_part_buffer,
values_count,
timestamp_buffer,
data_checksum,
)?;

Ok(Self {
data_checksum,
Expand All @@ -152,66 +112,26 @@ impl RrddMessageHeader {
pub async fn parse_async<R: AsyncRead + Unpin>(
input: &mut R,
) -> Result<Self, RrddProtocolError> {
let mut header_buffer = [0u8; PROTOCOL_V2_HEADER.len()];
let mut data_checksum_buffer = [0u8; 4];
let mut metadata_checksum_buffer = [0u8; 4];
let mut values_count_buffer = [0u8; 4];
let mut timestamp_buffer = [0u8; 8];

// Read the first part (all fields until datasource values)
let mut first_part_buffer = [0u8; RRDD_HEADER_LENGTH_PART1];
tokio::io::AsyncReadExt::read_exact(input, &mut first_part_buffer).await?;

// note: slices implements Read.
let mut first_part_slice = first_part_buffer.as_slice();

first_part_slice.read_exact(&mut header_buffer)?;

if *PROTOCOL_V2_HEADER != header_buffer {
return Err(RrddProtocolError::InvalidConstantString);
}

first_part_slice.read_exact(&mut data_checksum_buffer)?;
let data_checksum = u32::from_be_bytes(data_checksum_buffer);

first_part_slice.read_exact(&mut metadata_checksum_buffer)?;
let metadata_checksum = u32::from_be_bytes(metadata_checksum_buffer);

first_part_slice.read_exact(&mut values_count_buffer)?;
let values_count = u32::from_be_bytes(values_count_buffer);

first_part_slice.read_exact(&mut timestamp_buffer)?;
let timestamp_epoch = u64::from_be_bytes(timestamp_buffer);

let timestamp = time::UNIX_EPOCH + Duration::from_secs(timestamp_epoch);
let (data_checksum, metadata_checksum, values_count, timestamp) =
Self::parse_first_part(first_part_buffer, &mut timestamp_buffer)?;

// Second part (values and metadata length)
let mut second_part_buffer =
vec![0u8; (8 * values_count + 4/* metadata length */) as usize];
tokio::io::AsyncReadExt::read_exact(input, &mut second_part_buffer).await?;

// Split values and metadata
let (values_buffer, metadata_length_buffer) =
second_part_buffer.split_at(8 * values_count as usize);

// Check data checksum
let mut data_checksum_hasher = crc32fast::Hasher::new();
data_checksum_hasher.update(&timestamp_buffer);
data_checksum_hasher.update(values_buffer);

if data_checksum != data_checksum_hasher.finalize() {
return Err(RrddProtocolError::InvalidChecksum);
}

// TODO: Consider using slice::array_chunks when stabilized.
// https://github.com/rust-lang/rust/issues/74985
let values: Box<[[u8; 8]]> = values_buffer
.chunks_exact(8)
.map(|slice| slice.try_into().unwrap())
.collect();

let metadata_length =
u32::from_be_bytes(TryInto::<[u8; 4]>::try_into(metadata_length_buffer).unwrap());
let (values, metadata_length) = Self::parse_second_part(
&second_part_buffer,
values_count,
timestamp_buffer,
data_checksum,
)?;

Ok(Self {
data_checksum,
Expand Down Expand Up @@ -289,6 +209,72 @@ impl RrddMessageHeader {

Ok(())
}

fn parse_first_part(
first_part_buffer: [u8; 31],
timestamp_buffer: &mut [u8; 8],
) -> Result<(u32, u32, u32, SystemTime), RrddProtocolError> {
let mut header_buffer = [0u8; PROTOCOL_V2_HEADER.len()];
let mut data_checksum_buffer = [0u8; 4];
let mut metadata_checksum_buffer = [0u8; 4];
let mut values_count_buffer = [0u8; 4];

// note: slices implements Read.
let mut first_part_slice = first_part_buffer.as_slice();
first_part_slice.read_exact(&mut header_buffer)?;

if *PROTOCOL_V2_HEADER != header_buffer {
return Err(RrddProtocolError::InvalidConstantString);
}

first_part_slice.read_exact(&mut data_checksum_buffer)?;
let data_checksum = u32::from_be_bytes(data_checksum_buffer);

first_part_slice.read_exact(&mut metadata_checksum_buffer)?;
let metadata_checksum = u32::from_be_bytes(metadata_checksum_buffer);

first_part_slice.read_exact(&mut values_count_buffer)?;
let values_count = u32::from_be_bytes(values_count_buffer);

first_part_slice.read_exact(timestamp_buffer)?;
let timestamp_epoch = u64::from_be_bytes(*timestamp_buffer);

let timestamp = time::UNIX_EPOCH + Duration::from_secs(timestamp_epoch);

Ok((data_checksum, metadata_checksum, values_count, timestamp))
}

fn parse_second_part(
second_part_buffer: &[u8],
values_count: u32,
timestamp_buffer: [u8; 8],
data_checksum: u32,
) -> Result<(Box<[[u8; 8]]>, u32), RrddProtocolError> {
// Split values and metadata
let (values_buffer, metadata_length_buffer) =
second_part_buffer.split_at(8 * values_count as usize);

// Check data checksum
let mut data_checksum_hasher = crc32fast::Hasher::new();
data_checksum_hasher.update(&timestamp_buffer);
data_checksum_hasher.update(values_buffer);

if data_checksum != data_checksum_hasher.finalize() {
return Err(RrddProtocolError::InvalidChecksum);
}

// TODO: Consider using slice::array_chunks when stabilized.
// https://github.com/rust-lang/rust/issues/74985
let values: Box<[[u8; 8]]> = values_buffer
.chunks_exact(8)
.map(|slice| slice.try_into().unwrap())
.collect();

let metadata_length =
u32::from_be_bytes(TryInto::<[u8; 4]>::try_into(metadata_length_buffer).unwrap());

Ok((values, metadata_length))
}
}

/// A non-parsed metadata (datasource list).
Expand Down

0 comments on commit ad90226

Please sign in to comment.