Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog.d/24027_file_source_multi_char_delimiter.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Fixed a bug in the `file` source, which could silently corrupt data when using multi-char delimiters

authors: lfrancke
191 changes: 184 additions & 7 deletions lib/file-source-common/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,46 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
let delim_len = delim.len();
let mut discarded_for_size_and_truncated = Vec::new();
let mut reader = Box::new(reader);

// Track partial delimiter matches across buffer boundaries
let mut partial_delim: BytesMut = BytesMut::with_capacity(delim_len);

loop {
let available: &[u8] = match reader.fill_buf().await {
Ok(n) => n,
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};

// First, check if we have a partial delimiter from the previous iteration
if !partial_delim.is_empty() {
let expected_suffix = &delim[partial_delim.len()..];
let expected_suffix_len = expected_suffix.len();

if available.len() >= expected_suffix_len
&& &available[..expected_suffix_len] == expected_suffix
{
// Complete delimiter found! Consume the suffix
reader.consume(expected_suffix_len);
*position += expected_suffix_len as u64;
total_read += expected_suffix_len;
partial_delim.clear();

// Found a complete delimiter, return the current buffer
return Ok(ReadResult {
successfully_read: Some(total_read),
discarded_for_size_and_truncated,
});
} else {
// Not a complete delimiter after all. Add partial_delim to output buffer
if !discarding {
buf.extend_from_slice(&partial_delim);
}
partial_delim.clear();
// Continue processing current available buffer
}
}

let (done, used) = {
match delim_finder.find(available) {
Some(i) => {
Expand All @@ -62,13 +95,47 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
(true, i + delim_len)
}
None => {
if !discarding {
buf.extend_from_slice(available);
// No delimiter found in current buffer. Check if buffer ends with a
// partial delimiter match. For multi-byte delimiters like \r\n, we need
// to handle the case where the delimiter is split across buffer boundaries.
let mut partial_match_len = 0;
if !available.is_empty() && delim_len > 1 {
// Check if the end matches a prefix of the delimiter.
// We iterate from longest to shortest prefix and break on first match.
// Performance: For typical 2-byte delimiters (CRLF), this is 1 iteration.
// For longer delimiters, this runs O(delim_len) times but only occurs
// at buffer boundaries (~every 8KB), making the impact negligible.
for prefix_len in (1..delim_len).rev() {
if available.len() >= prefix_len
&& available.ends_with(&delim[..prefix_len])
{
partial_match_len = prefix_len;
break;
}
}
}

let bytes_to_copy = available.len() - partial_match_len;

if !discarding && bytes_to_copy > 0 {
buf.extend_from_slice(&available[..bytes_to_copy]);
}

// If we found a potential partial delimiter, save it for the next iteration
if partial_match_len > 0 {
partial_delim.clear();
partial_delim.extend_from_slice(&available[bytes_to_copy..]);
}

(false, available.len())
}
}
};

// Check if we're at EOF before we start processing
// (for borrow checker, has to come before `consume`)
let at_eof = available.is_empty();

reader.consume(used);
*position += used as u64; // do this at exactly same time
total_read += used;
Expand All @@ -92,11 +159,12 @@ pub async fn read_until_with_max_size<'a, R: AsyncBufRead + ?Sized + Unpin>(
discarding = false;
buf.clear();
}
} else if used == 0 {
// We've hit EOF but not yet seen a newline. This can happen when unlucky timing causes
// us to observe an incomplete write. We return None here and let the loop continue
// next time the method is called. This is safe because the buffer is specific to this
// FileWatcher.
} else if used == 0 && at_eof {
// We've hit EOF but haven't seen a delimiter. This can happen when:
// 1. The file ends without a trailing delimiter
// 2. We're observing an incomplete write
//
// Return None to signal the caller to retry later.
return Ok(ReadResult {
successfully_read: None,
discarded_for_size_and_truncated,
Expand Down Expand Up @@ -262,4 +330,113 @@ mod test {
.await
.unwrap()
}

/// Generic test helper that tests delimiter splits across buffer boundaries
/// for any delimiter length. This function:
/// 1. Creates test data with delimiters positioned to split at buffer boundaries
/// 2. Tests multiple iterations to ensure state tracking works correctly
/// 3. Verifies all lines are correctly separated without merging
async fn test_delimiter_boundary_split_helper(delimiter: &[u8], num_lines: usize) {
let delimiter_len = delimiter.len();

// Use a buffer capacity that will force splits
// We'll position delimiters to split at this boundary
let buffer_capacity = 10;

// Build test data where each delimiter is positioned to split across buffer boundary
// Strategy: For each line, calculate position so delimiter starts at boundary - (delimiter_len - 1)
let mut data = Vec::new();
let mut expected_lines = Vec::new();

for i in 0..num_lines {
// Create line content that positions the delimiter to split at buffer boundary
// We want the delimiter to straddle a buffer_capacity boundary

// Calculate how many bytes until the next buffer boundary
let current_pos = data.len();
let bytes_until_boundary = buffer_capacity - (current_pos % buffer_capacity);

// Create line content that will position delimiter to split
// We want (delimiter_len - 1) bytes before boundary, then 1 byte after
let line_content = if bytes_until_boundary > delimiter_len {
let content_len = bytes_until_boundary - (delimiter_len - 1);
format!("line{:0width$}", i, width = content_len.saturating_sub(4)).into_bytes()
} else {
// Not enough room in this buffer, pad to next boundary
let padding = bytes_until_boundary;
let extra_content = buffer_capacity - (delimiter_len - 1);
let mut content = vec![b'X'; padding];
content.extend_from_slice(
format!("L{:0width$}", i, width = extra_content.saturating_sub(1)).as_bytes(),
);
content
};

expected_lines.push(line_content.clone());
data.extend_from_slice(&line_content);
data.extend_from_slice(delimiter);
}

// Now test reading this data
let cursor = Cursor::new(data);
let mut reader = BufReader::with_capacity(buffer_capacity, cursor);
let mut position = 0;
let max_size = 1024;

// Read each line and verify it matches expected
for (i, expected_line) in expected_lines.iter().enumerate() {
let mut buffer = BytesMut::new();
let result = read_until_with_max_size(
Box::pin(&mut reader),
&mut position,
delimiter,
&mut buffer,
max_size,
)
.await
.unwrap();

assert_eq!(
buffer.as_ref(),
expected_line.as_slice(),
"Line {} should match expected content. Got: {:?}, Expected: {:?}",
i,
String::from_utf8_lossy(&buffer),
String::from_utf8_lossy(expected_line)
);

assert!(
result.successfully_read.is_some(),
"Should find delimiter for line {}",
i
);
}
}

#[tokio::test]
async fn test_single_byte_delimiter_boundary() {
// Test single-byte delimiter (should work without any special handling)
test_delimiter_boundary_split_helper(b"\n", 5).await;
}

#[tokio::test]
async fn test_two_byte_delimiter_boundary() {
// Test two-byte delimiter (CRLF case)
test_delimiter_boundary_split_helper(b"\r\n", 5).await;
}

#[tokio::test]
async fn test_three_byte_delimiter_boundary() {
test_delimiter_boundary_split_helper(b"|||", 5).await;
}

#[tokio::test]
async fn test_four_byte_delimiter_boundary() {
test_delimiter_boundary_split_helper(b"<|>|", 5).await;
}

#[tokio::test]
async fn test_five_byte_delimiter_boundary() {
test_delimiter_boundary_split_helper(b"<<>>>", 5).await;
}
}
Loading