Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion examples/flute-receiver/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ fn main() {

log::info!("Create FLUTE, write objects to {:?}", dest_dir);

let writer = Rc::new(writer::ObjectWriterFSBuilder::new(dest_dir).unwrap());
let enable_md5_check = true;
let writer = Rc::new(writer::ObjectWriterFSBuilder::new(dest_dir, enable_md5_check).unwrap());
let mut receiver = MultiReceiver::new(writer, None, false);

// Receive from 224.0.0.1:3400 on 127.0.0.1 (lo) interface
Expand Down
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
//! local.run_until(async move {
//! let nonsend_data = nonsend_data.clone();
//! task::spawn_local(async move {
//! let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir")).unwrap_or_else(|_| std::process::exit(0)));
//! let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), true).unwrap_or_else(|_| std::process::exit(0)));
//! let mut receiver = MultiReceiver::new(writer, None, false);
//! // ... run the receiver
//! }).await.unwrap();
Expand Down Expand Up @@ -111,7 +111,8 @@
//! let udp_socket = UdpSocket::bind(format!("{}:{}", endpoint.destination_group_address, endpoint.port)).expect("Fail to bind");
//!
//! // Create a writer able to write received files to the filesystem
//! let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"))
//! let enable_md5_check = true;
//! let writer = Rc::new(writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./flute_dir"), enable_md5_check)
//! .unwrap_or_else(|_| std::process::exit(0)));
//!
//! // Create a multi-receiver capable of de-multiplexing several FLUTE sessions
Expand Down
4 changes: 2 additions & 2 deletions src/py/receiver/objectwriterbuilder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ impl ObjectWriterBuilder {
#[new]
fn new(path: &str) -> PyResult<Self> {
let writer =
crate::receiver::writer::ObjectWriterFSBuilder::new(std::path::Path::new(path))
crate::receiver::writer::ObjectWriterFSBuilder::new(std::path::Path::new(path), true)
.map_err(|e| PyTypeError::new_err(e.0.to_string()))?;
Ok(Self {
inner: Rc::new(writer),
Expand All @@ -21,7 +21,7 @@ impl ObjectWriterBuilder {

#[staticmethod]
fn new_buffer() -> Self {
let writer = crate::receiver::writer::ObjectWriterBufferBuilder::new();
let writer = crate::receiver::writer::ObjectWriterBufferBuilder::new(true);
Self {
inner: Rc::new(writer),
}
Expand Down
9 changes: 7 additions & 2 deletions src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ impl FdtReceiver {
&lct::TOI_FDT,
Some(fdt_id),
fdt_builder,
true,
1024 * 1024,
now,
))),
Expand Down Expand Up @@ -121,7 +120,9 @@ impl FdtReceiver {
self.meta = Some(obj.create_meta());
self.obj = None
}
objectreceiver::State::Interrupted => self.inner.borrow_mut().state = FDTState::Error,
objectreceiver::State::Interrupted => {
self.inner.borrow_mut().state = FDTState::Error
}
objectreceiver::State::Error => self.inner.borrow_mut().state = FDTState::Error,
}
}
Expand Down Expand Up @@ -267,4 +268,8 @@ impl ObjectWriter for FdtWriter {
let mut inner = self.inner.borrow_mut();
inner.state = FDTState::Error;
}

fn enable_md5_check(&self) -> bool {
false
}
}
3 changes: 2 additions & 1 deletion src/receiver/multireceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ impl MultiReceiver {
///
/// let tsi: u64 = 1;
/// // Write object to a buffer
/// let writer = Rc::new(ObjectWriterBufferBuilder::new());
/// let enable_md5_check = true;
/// let writer = Rc::new(ObjectWriterBufferBuilder::new(enable_md5_check));
/// let mut receiver = MultiReceiver::new(writer.clone(), None, true);
/// let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_owned(), 3000);
/// receiver.add_listen_tsi(endpoint, tsi)
Expand Down
13 changes: 7 additions & 6 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ impl ObjectReceiver {
toi: &u128,
_fdt_instance_id: Option<u32>,
object_writer_builder: Rc<dyn ObjectWriterBuilder>,
enable_md5_check: bool,
max_size_allocated: usize,
now: SystemTime,
) -> ObjectReceiver {
Expand All @@ -99,7 +98,7 @@ impl ObjectReceiver {
transfer_length: None,
cenc: None,
content_md5: None,
enable_md5_check,
enable_md5_check: false,
blocks_variable_size: false,
a_large: 0,
a_small: 0,
Expand Down Expand Up @@ -357,9 +356,7 @@ impl ObjectReceiver {
#[cfg(feature = "opentelemetry")]
let _span = self.logger.as_mut().map(|l| l.fdt_attached());

if self.enable_md5_check {
self.content_md5 = file.content_md5.clone();
}
self.content_md5 = file.content_md5.clone();
self.fdt_instance_id = Some(fdt_instance_id);

self.cache_duration = file.get_cache_duration(fdt.get_expiration_date(), server_time);
Expand Down Expand Up @@ -435,6 +432,10 @@ impl ObjectReceiver {
now,
);

if self.content_md5.is_some() {
self.enable_md5_check = object_writer.enable_md5_check();
}

debug_assert!(self.block_writer.is_none());
self.object_writer = Some(ObjectWriterSession {
writer: object_writer,
Expand All @@ -454,7 +455,7 @@ impl ObjectReceiver {
transfer_length as usize,
self.content_length.clone(),
self.cenc.unwrap(),
self.content_md5.is_some(),
self.enable_md5_check,
));
}

Expand Down
4 changes: 0 additions & 4 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ pub struct Config {
pub object_timeout: Option<Duration>,
/// Maximum cache size that can be allocated to received an object. Default is 10MB.
pub object_max_cache_size: Option<usize>,
/// Enable MD5 check of the received objects. Default `true`
pub enable_md5_check: bool,
/// When set to `true`, the receiver will only reconstruct each object once.
/// If the same object is transferred again, it will be automatically discarded.
pub object_receive_once: bool,
Expand All @@ -46,7 +44,6 @@ impl Default for Config {
session_timeout: None,
object_timeout: Some(Duration::from_secs(10)),
object_max_cache_size: None,
enable_md5_check: true,
object_receive_once: true,
enable_fdt_expiration_check: true,
}
Expand Down Expand Up @@ -665,7 +662,6 @@ impl Receiver {
toi,
None,
self.writer.clone(),
self.config.enable_md5_check,
self.config
.object_max_cache_size
.unwrap_or(10 * 1024 * 1024),
Expand Down
10 changes: 9 additions & 1 deletion src/receiver/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
//! ```
//! use flute::receiver::writer;
//!
//! let writer = writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./destination_dir")).ok();
//! let enable_md5_check = true;
//! let writer = writer::ObjectWriterFSBuilder::new(&std::path::Path::new("./destination_dir"), enable_md5_check).ok();
//! ```
//!

Expand Down Expand Up @@ -101,6 +102,13 @@ pub trait ObjectWriter {
fn error(&self, now: SystemTime);
/// Called when the sender has interrupted the transmission of this object
fn interrupted(&self, now: SystemTime);
/// Indicates whether MD5 checksum verification is enabled for this object.
///
/// - `true`: The MD5 checksum will be verified. If the checksum is invalid,
/// the object will transition to an error state.
/// - `false`: The MD5 checksum will be skipped. Even if the checksum is invalid
/// or missing, the object will proceed to a complete state without error.
fn enable_md5_check(&self) -> bool;
}

impl std::fmt::Debug for dyn ObjectWriterBuilder {
Expand Down
17 changes: 14 additions & 3 deletions src/receiver/writer/objectwriterbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use std::{cell::RefCell, rc::Rc, time::SystemTime};
pub struct ObjectWriterBufferBuilder {
/// List of all objects received
pub objects: RefCell<Vec<Rc<RefCell<ObjectWriterBuffer>>>>,
/// True when MD5 check is enabled
pub enable_md5_check: bool,
}

///
Expand All @@ -17,6 +19,7 @@ pub struct ObjectWriterBufferBuilder {
#[derive(Debug)]
struct ObjectWriterBufferWrapper {
inner: Rc<RefCell<ObjectWriterBuffer>>,
enable_md5_check: bool,
}

#[derive(Debug)]
Expand All @@ -38,16 +41,17 @@ pub struct ObjectWriterBuffer {

impl ObjectWriterBufferBuilder {
/// Return a new `ObjectWriterBuffer`
pub fn new() -> ObjectWriterBufferBuilder {
pub fn new(enable_md5_check: bool) -> ObjectWriterBufferBuilder {
ObjectWriterBufferBuilder {
objects: RefCell::new(Vec::new()),
enable_md5_check,
}
}
}

impl Default for ObjectWriterBufferBuilder {
fn default() -> Self {
Self::new()
Self::new(true)
}
}

Expand All @@ -69,7 +73,10 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder {
end_time: None,
}));

let obj_wrapper = Box::new(ObjectWriterBufferWrapper { inner: obj.clone() });
let obj_wrapper = Box::new(ObjectWriterBufferWrapper {
inner: obj.clone(),
enable_md5_check: self.enable_md5_check,
});
self.objects.borrow_mut().push(obj);
obj_wrapper
}
Expand Down Expand Up @@ -129,4 +136,8 @@ impl ObjectWriter for ObjectWriterBufferWrapper {
inner.error = true;
inner.end_time = Some(now);
}

fn enable_md5_check(&self) -> bool {
self.enable_md5_check
}
}
10 changes: 9 additions & 1 deletion src/receiver/writer/objectwriterfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ use std::{cell::RefCell, io::Write, time::SystemTime};
#[derive(Debug)]
pub struct ObjectWriterFSBuilder {
dest: std::path::PathBuf,
enable_md5_check: bool,
}

impl ObjectWriterFSBuilder {
/// Return a new `ObjectWriterBuffer`
pub fn new(dest: &std::path::Path) -> Result<ObjectWriterFSBuilder> {
pub fn new(dest: &std::path::Path, enable_md5_check: bool) -> Result<ObjectWriterFSBuilder> {
if !dest.is_dir() {
return Err(FluteError::new(format!("{:?} is not a directory", dest)));
}

Ok(ObjectWriterFSBuilder {
dest: dest.to_path_buf(),
enable_md5_check,
})
}
}
Expand All @@ -42,6 +44,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder {
writer: None,
}),
meta: meta.clone(),
enable_md5_check: self.enable_md5_check,
})
}

Expand Down Expand Up @@ -82,6 +85,7 @@ pub struct ObjectWriterFS {
dest: std::path::PathBuf,
inner: RefCell<ObjectWriterFSInner>,
meta: ObjectMetadata,
enable_md5_check: bool,
}

///
Expand Down Expand Up @@ -174,4 +178,8 @@ impl ObjectWriter for ObjectWriterFS {
fn interrupted(&self, now: SystemTime) {
self.error(now);
}

fn enable_md5_check(&self) -> bool {
self.enable_md5_check
}
}
8 changes: 4 additions & 4 deletions tests/flute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ mod tests {

let input_content_location = obj.content_location.clone();

let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new());
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new(true));
let mut receiver = receiver::MultiReceiver::new(output.clone(), None, false);
receiver.add_listener(TestMultiReceiverObserver::new());

Expand Down Expand Up @@ -657,7 +657,7 @@ mod tests {
None,
None,
);
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new());
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new(true));
let mut receiver = receiver::MultiReceiver::new(output.clone(), None, false);
let mut sender = create_sender(
vec![obj],
Expand Down Expand Up @@ -747,7 +747,7 @@ mod tests {
None,
);

let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new());
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new(true));
let mut receiver = receiver::MultiReceiver::new(output.clone(), None, false);

let mut sender_config: sender::Config = Default::default();
Expand Down Expand Up @@ -824,7 +824,7 @@ mod tests {
None,
);
obj.max_transfer_count = max_transfert_count as u32;
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new());
let output = Rc::new(receiver::writer::ObjectWriterBufferBuilder::new(true));
let mut receiver_config = receiver::Config::default();
receiver_config.object_receive_once = false;
let mut receiver =
Expand Down
Loading