Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/receiver/fdtreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,7 @@ impl ObjectWriterBuilder for FdtWriterBuilder {
_endpoint: &UDPEndpoint,
_tsi: &u64,
_toi: &u128,
_content_location: &str,
_duration: &std::time::Duration,
_meta: &ObjectMetadata,
_now: std::time::SystemTime,
) {
}
Expand Down
4 changes: 2 additions & 2 deletions src/receiver/objectreceiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ pub struct ObjectReceiver {
total_allocated_blocks_size: usize,
#[cfg(feature = "opentelemetry")]
logger: Option<ObjectReceiverLogger>,
content_length: Option<usize>,
pub content_length: Option<usize>,
content_type: Option<String>,
cache_duration: Option<Duration>,
groups: Vec<String>,
last_timestamp: SystemTime,
e_tag: Option<String>,
pub e_tag: Option<String>,
}

impl ObjectReceiver {
Expand Down
56 changes: 30 additions & 26 deletions src/receiver/receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::fdtreceiver::FdtReceiver;
use super::objectreceiver;
use super::objectreceiver::ObjectReceiver;
use super::writer::ObjectWriterBuilder;
use super::writer::{ObjectMetadata, ObjectWriterBuilder};
use crate::common::udpendpoint::UDPEndpoint;
use crate::common::{alc, lct};
use crate::tools::error::FluteError;
Expand Down Expand Up @@ -53,7 +53,7 @@
#[derive(Debug, Clone)]
pub struct ObjectCompletedMeta {
expiration_date: SystemTime,
content_location: String,
metadata: ObjectMetadata,
}

///
Expand Down Expand Up @@ -442,30 +442,32 @@
.iter()
.map(|f| (f.toi.parse().unwrap_or_default(), f.content_md5.as_ref()))
.collect();
let remove_candidates: std::collections::HashMap<u128, ObjectCompletedMeta> = self
let mut remove_candidates: std::collections::HashMap<u128, ObjectCompletedMeta> = self

Check warning on line 445 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L445

Added line #L445 was not covered by tests
.objects_completed
.iter()
.filter_map(|(toi, meta)| match files_toi.contains_key(toi) {
true => None,
false => Some((*toi, meta.clone())),
})
.filter_map(
|(toi, object_completed)| match files_toi.contains_key(toi) {
true => None,
false => Some((*toi, object_completed.clone())),

Check warning on line 451 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L449-L451

Added lines #L449 - L451 were not covered by tests
},
)
.collect();

if !remove_candidates.is_empty() {
let content_locations: std::collections::HashSet<&str> =
files.iter().map(|f| f.content_location.as_str()).collect();
let duration = std::time::Duration::from_secs(4);
for (toi, meta) in &remove_candidates {
let content_location = meta.content_location.to_string();
if !content_locations.contains(content_location.as_str())
&& meta.expiration_date > now + duration
for (toi, object_completed) in &mut remove_candidates {
if !content_locations
.contains(object_completed.metadata.content_location.as_str())
&& object_completed.expiration_date > now + duration

Check warning on line 463 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L460-L463

Added lines #L460 - L463 were not covered by tests
{
object_completed.metadata.cache_duration = Some(duration);

Check warning on line 465 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L465

Added line #L465 was not covered by tests
self.writer.set_cache_duration(
&self.endpoint,
&self.tsi,
toi,
&meta.content_location,
&duration,
&object_completed.metadata,
now,
);
}
Expand All @@ -480,24 +482,24 @@
let cache_duration = file.get_cache_duration(expiration_date, server_time);
if let Some(obj) = self.objects_completed.get_mut(&toi) {
if let Some(cache_duration) = cache_duration {
let new_duration = now
let new_expiration_date = now

Check warning on line 485 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L485

Added line #L485 was not covered by tests
.checked_add(cache_duration)
.unwrap_or(now + std::time::Duration::from_secs(3600 * 24 * 360 * 10));

let diff = match new_duration < obj.expiration_date {
true => obj.expiration_date.duration_since(new_duration),
false => new_duration.duration_since(obj.expiration_date),
let diff = match new_expiration_date < obj.expiration_date {
true => obj.expiration_date.duration_since(new_expiration_date),
false => new_expiration_date.duration_since(obj.expiration_date),

Check warning on line 491 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L489-L491

Added lines #L489 - L491 were not covered by tests
}
.unwrap();

if diff.as_secs() > 1 {
obj.expiration_date = new_duration;
obj.expiration_date = new_expiration_date;
obj.metadata.cache_duration = Some(cache_duration);

Check warning on line 497 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L496-L497

Added lines #L496 - L497 were not covered by tests
self.writer.set_cache_duration(
&self.endpoint,
&self.tsi,
&toi,
&obj.content_location,
&cache_duration,
&obj.metadata,
now,
);
}
Expand Down Expand Up @@ -581,7 +583,7 @@
obj.toi,
ObjectCompletedMeta {
expiration_date: obj.cache_expiration_date.unwrap(),
content_location: obj.content_location.as_ref().unwrap().clone(),
metadata: obj.create_meta(),
},
);
} else {
Expand Down Expand Up @@ -713,15 +715,17 @@
if let Some(fdt) = self.fdt_current.front_mut() {
if let Some(instance) = fdt.fdt_instance() {
if instance.full_fdt == Some(true) {
let duration = std::time::Duration::from_secs(0);
for obj in &self.objects_completed {
log::info!("Remove from cache {}", &obj.1.content_location.to_string());
for obj in &mut self.objects_completed {
log::info!(

Check warning on line 719 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L718-L719

Added lines #L718 - L719 were not covered by tests
"Remove from cache {}",
&obj.1.metadata.content_location.to_string()
);
obj.1.metadata.cache_duration = Some(Duration::from_secs(0));

Check warning on line 723 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L723

Added line #L723 was not covered by tests
self.writer.set_cache_duration(
&self.endpoint,
&self.tsi,
obj.0,
&obj.1.content_location,
&duration,
&obj.1.metadata,

Check warning on line 728 in src/receiver/receiver.rs

View check run for this annotation

Codecov / codecov/patch

src/receiver/receiver.rs#L728

Added line #L728 was not covered by tests
self.last_timestamp.unwrap_or_else(|| SystemTime::now()),
);
}
Expand Down
3 changes: 1 addition & 2 deletions src/receiver/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ pub trait ObjectWriterBuilder {
endpoint: &UDPEndpoint,
tsi: &u64,
toi: &u128,
content_location: &str,
duration: &Duration,
meta: &ObjectMetadata,
now: std::time::SystemTime,
);
/// Called when an FDT is received
Expand Down
3 changes: 1 addition & 2 deletions src/receiver/writer/objectwriterbuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ impl ObjectWriterBuilder for ObjectWriterBufferBuilder {
_endpoint: &UDPEndpoint,
_tsi: &u64,
_toi: &u128,
_content_location: &str,
_duration: &std::time::Duration,
_meta: &ObjectMetadata,
_now: std::time::SystemTime,
) {
}
Expand Down
3 changes: 1 addition & 2 deletions src/receiver/writer/objectwriterfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ impl ObjectWriterBuilder for ObjectWriterFSBuilder {
_endpoint: &UDPEndpoint,
_tsi: &u64,
_toi: &u128,
_content_location: &str,
_duration: &std::time::Duration,
_meta: &ObjectMetadata,
_now: std::time::SystemTime,
) {
}
Expand Down
Loading