Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/common/fdtinstance.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::time::SystemTime;

use crate::tools::{
self,
error::{FluteError, Result},
use crate::{
receiver::writer::ObjectCacheControl,
tools::{
self,
error::{FluteError, Result},
},
};

use quick_xml::de::from_reader;
Expand Down Expand Up @@ -530,32 +533,35 @@ impl FdtInstance {
}

impl File {
pub fn get_cache_duration(
pub fn get_object_cache_control(
&self,
fdt_expiration_time: Option<SystemTime>,
server_time: SystemTime,
) -> Option<std::time::Duration> {
) -> ObjectCacheControl {
if let Some(cc) = &self.cache_control {
return match cc.value {
CacheControlChoice::NoCache(_) => Some(std::time::Duration::ZERO),
CacheControlChoice::MaxStale(_) => {
Some(std::time::Duration::from_secs(10 * 3600 * 24 * 360))
}
let ret = match cc.value {
CacheControlChoice::NoCache(_) => Some(ObjectCacheControl::NoCache),
CacheControlChoice::MaxStale(_) => Some(ObjectCacheControl::MaxStale),
CacheControlChoice::Expires(time) => {
let expiration_time = match tools::ntp_to_system_time((time as u64) << 32) {
Ok(res) => res,
_ => return None,
};
Some(
expiration_time
.duration_since(server_time)
.unwrap_or_default(),
)
match tools::ntp_to_system_time((time as u64) << 32) {
Ok(res) => Some(ObjectCacheControl::ExpiresAt(res)),
Err(_) => {
log::warn!("Invalid NTP timestamp in Cache-Control Expires");
None
}
}
}
};

if let Some(ret) = ret {
return ret;
}
}

fdt_expiration_time.map(|v| v.duration_since(server_time).unwrap_or_default())
// If no Cache-Control is set, we use the FDT expiration time
let guess_cache_duration: Option<ObjectCacheControl> =
fdt_expiration_time.map(|v| ObjectCacheControl::ExpiresAtHint(v));

guess_cache_duration.unwrap_or(ObjectCacheControl::NoCache)
}

pub fn get_transfer_length(&self) -> u64 {
Expand Down
2 changes: 1 addition & 1 deletion src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder {
}))
}

fn set_cache_duration(
fn update_cache_control(
&self,
_endpoint: &UDPEndpoint,
_tsi: &u64,
Expand Down
20 changes: 7 additions & 13 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use super::blockwriter::BlockWriter;
use super::writer::ObjectWriterBuilder;
use crate::common::udpendpoint::UDPEndpoint;
use crate::common::{alc, fdtinstance::FdtInstance, lct, oti, partition};
use crate::receiver::writer::{ObjectMetadata, ObjectWriter, ObjectWriterBuilderResult};
use crate::receiver::writer::{
ObjectCacheControl, ObjectMetadata, ObjectWriter, ObjectWriterBuilderResult,
};
use crate::tools::error::{FluteError, Result};
use std::collections::VecDeque;
use std::rc::Rc;
Expand Down Expand Up @@ -62,15 +64,14 @@ pub struct ObjectReceiver {
block_writer: Option<BlockWriter>,
pub fdt_instance_id: Option<u32>,
last_activity: Instant,
pub cache_expiration_date: Option<SystemTime>,
pub content_location: Option<String>,
nb_allocated_blocks: usize,
total_allocated_blocks_size: usize,
#[cfg(feature = "opentelemetry")]
logger: Option<ObjectReceiverLogger>,
pub content_length: Option<usize>,
content_type: Option<String>,
cache_duration: Option<Duration>,
pub cache_control: Option<ObjectCacheControl>,
groups: Vec<String>,
last_timestamp: SystemTime,
pub e_tag: Option<String>,
Expand Down Expand Up @@ -111,7 +112,6 @@ impl ObjectReceiver {
toi: *toi,
endpoint: endpoint.clone(),
last_activity: Instant::now(),
cache_expiration_date: None,
content_location: match *toi == lct::TOI_FDT {
true => Some("flute://fdt".to_string()),
false => None,
Expand All @@ -122,7 +122,7 @@ impl ObjectReceiver {
logger: None,
content_length: None,
content_type: None,
cache_duration: None,
cache_control: None,
groups: Vec::new(),
last_timestamp: now,
e_tag: None,
Expand Down Expand Up @@ -294,7 +294,6 @@ impl ObjectReceiver {
fdt_instance_id: u32,
fdt: &FdtInstance,
now: std::time::SystemTime,
server_time: std::time::SystemTime,
) -> bool {
debug_assert!(self.toi != lct::TOI_FDT);
self.last_timestamp = now;
Expand Down Expand Up @@ -359,12 +358,7 @@ impl ObjectReceiver {
self.content_md5 = file.content_md5.clone();
self.fdt_instance_id = Some(fdt_instance_id);

self.cache_duration = file.get_cache_duration(fdt.get_expiration_date(), server_time);
self.cache_expiration_date = self.cache_duration.map(|v| {
now.checked_add(v)
.unwrap_or(now + std::time::Duration::from_secs(3600 * 24 * 360 * 10))
});

self.cache_control = Some(file.get_object_cache_control(fdt.get_expiration_date()));
self.content_length = file.content_length.map(|c| c as usize);
self.content_type = file.content_type.clone();
self.groups = groups;
Expand All @@ -387,7 +381,7 @@ impl ObjectReceiver {
.unwrap_or("file:///".to_string()),
content_length: self.content_length.clone(),
content_type: self.content_type.clone(),
cache_duration: self.cache_duration.clone(),
cache_control: self.cache_control.unwrap_or(ObjectCacheControl::NoCache),
groups: match self.groups.is_empty() {
true => None,
false => Some(self.groups.clone()),
Expand Down
77 changes: 29 additions & 48 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::objectreceiver::ObjectReceiver;
use super::writer::{ObjectMetadata, ObjectWriterBuilder};
use crate::common::udpendpoint::UDPEndpoint;
use crate::common::{alc, lct};
use crate::receiver::writer::ObjectCacheControl;
use crate::tools::error::FluteError;
use crate::tools::error::Result;
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
Expand Down Expand Up @@ -52,7 +53,6 @@ impl Default for Config {

#[derive(Debug, Clone)]
pub struct ObjectCompletedMeta {
expiration_date: SystemTime,
metadata: ObjectMetadata,
}

Expand Down Expand Up @@ -409,19 +409,18 @@ impl Receiver {
fn attach_latest_fdt_to_objects(&mut self, now: std::time::SystemTime) -> Option<()> {
let fdt = self.fdt_current.front_mut()?;
let fdt_id = fdt.fdt_id;
let server_time = fdt.get_server_time(now);
let fdt_instance = fdt.fdt_instance()?;
log::debug!("TSI={} Attach FDT id {}", self.tsi, fdt_id);
let mut check_state = Vec::new();
for obj in &mut self.objects {
let success = obj.1.attach_fdt(fdt_id, fdt_instance, now, server_time);
let success = obj.1.attach_fdt(fdt_id, fdt_instance, now);
if success {
check_state.push(*obj.0);
}
}

for toi in check_state {
self.check_object_state(toi, now);
self.check_object_state(toi);
}

Some(())
Expand All @@ -432,10 +431,9 @@ impl Receiver {
now: std::time::SystemTime,
) -> Option<()> {
let fdt = self.fdt_current.front_mut()?;
let server_time = fdt.get_server_time(now);
let fdt_instance = fdt.fdt_instance()?;
let files = fdt_instance.file.as_ref()?;
let expiration_date = fdt_instance.get_expiration_date();
let fdt_expiration_date = fdt_instance.get_expiration_date();

if let Some(true) = fdt_instance.full_fdt {
let files_toi: std::collections::HashMap<u128, Option<&String>> = files
Expand All @@ -456,14 +454,14 @@ impl Receiver {
if !remove_candidates.is_empty() {
let content_locations: std::collections::HashSet<&str> =
files.iter().map(|f| f.content_location.as_str()).collect();
let duration = std::time::Duration::from_secs(4);
let cache_control =
ObjectCacheControl::ExpiresAt(now + std::time::Duration::from_secs(4));
for (toi, object_completed) in &mut remove_candidates {
if !content_locations
.contains(object_completed.metadata.content_location.as_str())
&& object_completed.expiration_date > now + duration
{
object_completed.metadata.cache_duration = Some(duration);
self.writer.set_cache_duration(
object_completed.metadata.cache_control = cache_control;
self.writer.update_cache_control(
&self.endpoint,
&self.tsi,
toi,
Expand All @@ -479,30 +477,17 @@ impl Receiver {

for file in files {
let toi: u128 = file.toi.parse().unwrap_or_default();
let cache_duration = file.get_cache_duration(expiration_date, server_time);
let cache_control = file.get_object_cache_control(fdt_expiration_date);
if let Some(obj) = self.objects_completed.get_mut(&toi) {
if let Some(cache_duration) = cache_duration {
let new_expiration_date = now
.checked_add(cache_duration)
.unwrap_or(now + std::time::Duration::from_secs(3600 * 24 * 360 * 10));

let diff = match new_expiration_date < obj.expiration_date {
true => obj.expiration_date.duration_since(new_expiration_date),
false => new_expiration_date.duration_since(obj.expiration_date),
}
.unwrap();

if diff.as_secs() > 1 {
obj.expiration_date = new_expiration_date;
obj.metadata.cache_duration = Some(cache_duration);
self.writer.set_cache_duration(
&self.endpoint,
&self.tsi,
&toi,
&obj.metadata,
now,
);
}
if obj.metadata.cache_control.should_update(cache_control) {
obj.metadata.cache_control = cache_control;
self.writer.update_cache_control(
&self.endpoint,
&self.tsi,
&toi,
&obj.metadata,
now,
);
}
}
}
Expand Down Expand Up @@ -545,12 +530,12 @@ impl Receiver {
};

obj.push(pkt, now);
self.check_object_state(pkt.lct.toi, now);
self.check_object_state(pkt.lct.toi);

Ok(())
}

fn check_object_state(&mut self, toi: u128, now: SystemTime) {
fn check_object_state(&mut self, toi: u128) {
let obj = self.objects.get_mut(&toi);
if obj.is_none() {
return;
Expand All @@ -571,22 +556,17 @@ impl Receiver {
obj.toi
);

if obj.cache_expiration_date.is_some() {
debug_assert!(obj.content_location.is_some());
log::debug!(
"Insert {:?} for a duration of {:?}",
obj.content_location,
obj.cache_expiration_date.unwrap().duration_since(now)
);
if obj.cache_control != Some(ObjectCacheControl::NoCache) {
self.objects_completed.insert(
obj.toi,
ObjectCompletedMeta {
expiration_date: obj.cache_expiration_date.unwrap(),
metadata: obj.create_meta(),
},
);
} else {
log::error!("No cache expiration date for {:?}", obj.content_location);
if obj.cache_control.is_none() {
log::error!("No cache expiration date for {:?}", obj.content_location);
}
}
}
objectreceiver::State::Interrupted => {
Expand Down Expand Up @@ -672,11 +652,10 @@ impl Receiver {
let mut is_attached = false;
for (fdt_index, fdt) in (&mut self.fdt_current.iter_mut()).enumerate() {
let fdt_id = fdt.fdt_id;
let server_time = fdt.get_server_time(now);
fdt.update_expired_state(now);
if fdt.state() == fdtreceiver::FDTState::Complete {
if let Some(fdt_instance) = fdt.fdt_instance() {
let success = obj.attach_fdt(fdt_id, fdt_instance, now, server_time);
let success = obj.attach_fdt(fdt_id, fdt_instance, now);
if success {
is_attached = true;
if fdt_index != 0 {
Expand Down Expand Up @@ -714,13 +693,15 @@ impl Drop for Receiver {
if let Some(fdt) = self.fdt_current.front_mut() {
if let Some(instance) = fdt.fdt_instance() {
if instance.full_fdt == Some(true) {
let now = self.last_timestamp.unwrap_or_else(|| SystemTime::now());
for obj in &mut self.objects_completed {
log::info!(
"Remove from cache {}",
&obj.1.metadata.content_location.to_string()
);
obj.1.metadata.cache_duration = Some(Duration::from_secs(0));
self.writer.set_cache_duration(

obj.1.metadata.cache_control = ObjectCacheControl::ExpiresAt(now);
self.writer.update_cache_control(
&self.endpoint,
&self.tsi,
obj.0,
Expand Down
Loading
Loading