Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ let obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
&url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
sender.add_object(0, obj);

// Always call publish after adding objects
// Always call publish after adding objects when FDT publish mode is FullFDT
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -279,7 +279,7 @@ obj.target_acquisition = Some(TargetAcquisition::WithinDuration(std::time::Durat
// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
// Always call publish after adding objects when FDT publish mode is FullFDT when FDT publish mode is FullFDT
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -328,7 +328,7 @@ obj.target_acquisition = Some(TargetAcquisition::WithinTime(target_end_time));
// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
// Always call publish after adding objects when FDT publish mode is FullFDT
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -387,7 +387,7 @@ Some(carousel_mode), None, None, None, Cenc::Null, true, None, true).unwrap();
// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
// Always call publish after adding objects when FDT publish mode is FullFDT
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -443,7 +443,7 @@ Some(carousel_mode), None, None, None, Cenc::Null, true, None, true).unwrap();
// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
// Always call publish after adding objects when FDT publish mode is FullFDT
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
Expand Down
10 changes: 5 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
//! &url::Url::parse("file:///hello.txt").unwrap(), 1, None, None, None, None, Cenc::Null, true, None, true).unwrap();
//! sender.add_object(0, obj);
//!
//! // Always call publish after adding objects
//! // Always call publish after adding objects when FDT publish mode is FullFDT
//! sender.publish(SystemTime::now());
//!
//! // Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -279,7 +279,7 @@
//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
//! sender.add_object(0, obj);
//!
//! // Always call publish after adding objects
//! // Always call publish after adding objects when FDT publish mode is FullFDT when FDT publish mode is FullFDT
//! sender.publish(SystemTime::now());
//!
//! // Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -328,7 +328,7 @@
//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
//! sender.add_object(0, obj);
//!
//! // Always call publish after adding objects
//! // Always call publish after adding objects when FDT publish mode is FullFDT
//! sender.publish(SystemTime::now());
//!
//! // Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -387,7 +387,7 @@
//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
//! sender.add_object(0, obj);
//!
//! // Always call publish after adding objects
//! // Always call publish after adding objects when FDT publish mode is FullFDT
//! sender.publish(SystemTime::now());
//!
//! // Send FLUTE packets over UDP/IP
Expand Down Expand Up @@ -443,7 +443,7 @@
//! // Add object(s) (files) to the FLUTE sender (priority queue 0)
//! sender.add_object(0, obj);
//!
//! // Always call publish after adding objects
//! // Always call publish after adding objects when FDT publish mode is FullFDT
//! sender.publish(SystemTime::now());
//!
//! // Send FLUTE packets over UDP/IP
Expand Down
2 changes: 1 addition & 1 deletion src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub struct ObjectReceiver {
total_allocated_blocks_size: usize,
#[cfg(feature = "opentelemetry")]
logger: Option<ObjectReceiverLogger>,
pub content_length: Option<usize>,
content_length: Option<usize>,
content_type: Option<String>,
pub cache_control: Option<ObjectCacheControl>,
groups: Vec<String>,
Expand Down
73 changes: 0 additions & 73 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,46 +435,6 @@ impl Receiver {
let files = fdt_instance.file.as_ref()?;
let fdt_expiration_date = fdt_instance.get_expiration_date();

if let Some(true) = fdt_instance.full_fdt {
let files_toi: std::collections::HashMap<u128, Option<&String>> = files
.iter()
.map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref()))
.collect();
let mut remove_candidates: std::collections::HashMap<u128, ObjectCompletedMeta> = self
.objects_completed
.iter()
.filter_map(
|(toi, object_completed)| match files_toi.contains_key(toi) {
true => None,
false => Some((*toi, object_completed.clone())),
},
)
.collect();

if !remove_candidates.is_empty() {
let content_locations: std::collections::HashSet<&str> =
files.iter().map(|f| f.content_location.as_str()).collect();
let cache_control =
ObjectCacheControl::ExpiresAt(now + std::time::Duration::from_secs(4));
for (toi, object_completed) in &mut remove_candidates {
if !content_locations
.contains(object_completed.metadata.content_location.as_str())
{
object_completed.metadata.cache_control = cache_control;
self.writer.update_cache_control(
&self.endpoint,
&self.tsi,
toi,
&object_completed.metadata,
now,
);
}
}
self.objects_completed
.retain(|f, _| !remove_candidates.contains_key(f));
}
}

for file in files {
let toi: u128 = file.toi.parse().unwrap_or_default();
let cache_control = file.get_object_cache_control(fdt_expiration_date);
Expand Down Expand Up @@ -610,10 +570,6 @@ impl Receiver {
None => return,
};

if let Some(true) = instance.full_fdt {
return;
}

let before = self.objects_completed.len();
if let Some(files) = instance.file.as_ref() {
let current_tois: std::collections::HashSet<u128> = files
Expand Down Expand Up @@ -685,32 +641,3 @@ impl Receiver {
self.objects.insert(*toi, obj);
}
}

impl Drop for Receiver {
fn drop(&mut self) {
log::info!("Drop Flute Receiver");

if let Some(fdt) = self.fdt_current.front_mut() {
if let Some(instance) = fdt.fdt_instance() {
if instance.full_fdt == Some(true) {
let now = self.last_timestamp.unwrap_or_else(|| SystemTime::now());
for obj in &mut self.objects_completed {
log::info!(
"Remove from cache {}",
&obj.1.metadata.content_location.to_string()
);

obj.1.metadata.cache_control = ObjectCacheControl::ExpiresAt(now);
self.writer.update_cache_control(
&self.endpoint,
&self.tsi,
obj.0,
&obj.1.metadata,
self.last_timestamp.unwrap_or_else(|| SystemTime::now()),
);
}
}
}
}
}
}
15 changes: 9 additions & 6 deletions src/sender/fdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ impl Fdt {
};

let files = match self.publish_mode {
FDTPublishMode::Automatic => self.get_files_being_transferred(),
FDTPublishMode::Manual => self.files.values().map(|desc| desc.as_ref()).collect(),
FDTPublishMode::ObjectsBeingTransferred => self.get_files_being_transferred(),
FDTPublishMode::FullFDT => self.files.values().map(|desc| desc.as_ref()).collect(),
};

FdtInstance {
Expand Down Expand Up @@ -125,7 +125,10 @@ impl Fdt {
xmlns_mbms_2012: None,
xmlns_mbms_2015: None,
xmlns_sv: None,
full_fdt: None,
full_fdt: match self.publish_mode {
FDTPublishMode::FullFDT => Some(true),
FDTPublishMode::ObjectsBeingTransferred => None,
},
base_url_1: None,
base_url_2: None,
group: self.groups.clone(),
Expand Down Expand Up @@ -327,10 +330,10 @@ impl Fdt {
file.transfer_started(now);

match self.publish_mode {
FDTPublishMode::Automatic => {
FDTPublishMode::ObjectsBeingTransferred => {
self.publish(now).ok();
}
FDTPublishMode::Manual => {}
FDTPublishMode::FullFDT => {}
}

Some(file.clone())
Expand Down Expand Up @@ -448,7 +451,7 @@ mod tests {
crate::sender::TOIMaxLength::ToiMax112,
Some(1),
Some(vec!["Group1".to_owned()]),
crate::sender::FDTPublishMode::Manual,
crate::sender::FDTPublishMode::FullFDT,
);
let obj1 = objectdesc::ObjectDesc::create_from_buffer(
Vec::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/sender/filedesc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ impl FileDesc {
return false;
}

if fdt_publish_mode == FDTPublishMode::Manual && !self.is_published() {
if fdt_publish_mode == FDTPublishMode::FullFDT && !self.is_published() {
log::warn!("File with TOI {} is not published", self.toi);
return false;
}
Expand Down
12 changes: 6 additions & 6 deletions src/sender/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,20 @@ impl PriorityQueue {
// This enum defines when and how the FDT is updated and sent.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FDTPublishMode {
/// Manual publishing mode.
/// FullFDT publishing mode.
///
/// - The FDT is published only when `publish()` is explicitly called.
/// - It includes all objects that have been inserted up to the time of publication.
/// - Provides full control over when the FDT is updated and sent.
Manual,
FullFDT,

/// Automatic publishing mode.
///
/// - The FDT is automatically published before the transmission of each object.
/// - It contains only the objects that are currently being transferred.
/// - Ensures that each transmission is accompanied by an up-to-date FDT,
/// May result in smaller but more frequent FDT updates.
Automatic,
ObjectsBeingTransferred,
}

///
Expand Down Expand Up @@ -165,7 +165,7 @@ impl Default for Config {
toi_max_length: TOIMaxLength::ToiMax112,
toi_initial_value: Some(1),
groups: None,
fdt_publish_mode: FDTPublishMode::Manual,
fdt_publish_mode: FDTPublishMode::FullFDT,
}
}
}
Expand Down Expand Up @@ -282,7 +282,7 @@ impl Sender {

/// Add an object to the FDT
///
/// After calling this function, a call to `publish()` to publish your modifications
/// If FDT is configured in FullFDT mode, after calling this function, a call to `publish()` to publish your modifications
///
/// If a TOI as been set to the ObjectDesc, there is no need to release it
///
Expand Down Expand Up @@ -369,7 +369,7 @@ impl Sender {
/// An updated version of the FDT will be generated and transferred
/// Multiple modifications can be made (ex: several call to 'add_object()`) before publishing a new FDT version
///
/// Required only if fdt_publish_mode is set to manual
/// Required only if fdt_publish_mode is set to FullFDT
pub fn publish(&mut self, now: SystemTime) -> Result<()> {
self.fdt.publish(now)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/flute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ mod tests {
assert!(sender.is_added(toi));
}

if config.fdt_publish_mode == sender::FDTPublishMode::Manual {
if config.fdt_publish_mode == sender::FDTPublishMode::FullFDT {
sender.publish(std::time::SystemTime::now()).unwrap();
}

Expand Down Expand Up @@ -348,7 +348,7 @@ mod tests {
init();

let config = sender::Config {
fdt_publish_mode: sender::FDTPublishMode::Automatic,
fdt_publish_mode: sender::FDTPublishMode::ObjectsBeingTransferred,
..Default::default()
};

Expand Down
Loading