Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,121 @@ while sender.nb_objects() > 0 {
}
```

## Carouseling

The FLUTE library supports carouseling, a mechanism that continuously re-transmits files in a loop. This is useful for scenarios such as broadcasting where a receiver may join at any time and still receive the full file.

A file remains in the carousel and is re-transferred repeatedly until explicitly removed. The repetition behavior is controlled by the `CarouselRepeatMode` when creating an object, which offers two modes:


### Fixed Delay After Each Transfer

This mode waits for a fixed delay after the end of each transfer before starting the next one using `CarouselRepeatMode::DelayBetweenTransfers`.

```
| Transfer Object | Fixed Delay | Transfer Object | Fixed Delay | ...
```

```rust
use flute::sender::Sender;
use flute::sender::ObjectDesc;
use flute::sender::CarouselRepeatMode;
use flute::core::lct::Cenc;
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;

// Create UDP Socket
let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

// Create FLUTE Sender
let tsi = 1;
let oti = Default::default();
let config = Default::default();
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, tsi, &oti, &config);

// 10s delay after each transfer
let carousel_mode = CarouselRepeatMode::DelayBetweenTransfers(std::time::Duration::from_secs(10))

// Create an Object
let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
&url::Url::parse("file:///hello.txt").unwrap(), 1,
carousel_mode, None, None, None, Cenc::Null, true, None, true).unwrap();

// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
while sender.nb_objects() > 0 {
if let Some(pkt) = sender.read(SystemTime::now()) {
udp_socket.send(&pkt).unwrap();
} else {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
```

### Fix interval between 2 transfer start

`CarouselRepeatMode::IntervalBetweenStartTimes` : This mode ensures each new transfer starts at a fixed interval, regardless of the duration of the previous one.

> ⚠️ **Note**: If the transfer of an object takes longer than the specified interval, the actual interval will be longer.
> It is the application's responsibility to ensure that the FLUTE channel bitrate is high enough to meet the interval timing.

```
| Transfer Object 1 | Adaptative Delay | Transfer Object 1 | Adaptative Delay |
| ------------Fixed Interval-----------| ----------- Fixed Interval-----------|
```

```rust
use flute::sender::Sender;
use flute::sender::ObjectDesc;
use flute::sender::CarouselRepeatMode;
use flute::core::lct::Cenc;
use flute::core::UDPEndpoint;
use std::net::UdpSocket;
use std::time::SystemTime;

// Create UDP Socket
let udp_socket = UdpSocket::bind("0.0.0.0:0").unwrap();
udp_socket.connect("224.0.0.1:3400").expect("Connection failed");

// Create FLUTE Sender
let tsi = 1;
let oti = Default::default();
let config = Default::default();
let endpoint = UDPEndpoint::new(None, "224.0.0.1".to_string(), 3400);
let mut sender = Sender::new(endpoint, tsi, &oti, &config);

// Configure a fixed interval between 2 transfer start
let carousel_mode = CarouselRepeatMode::IntervalBetweenStartTimes(std::time::Duration::from_secs(10))

// Create an Object
let mut obj = ObjectDesc::create_from_buffer(b"hello world".to_vec(), "text/plain",
&url::Url::parse("file:///hello.txt").unwrap(), 1,
carousel_mode, None, None, None, Cenc::Null, true, None, true).unwrap();

// Add object(s) (files) to the FLUTE sender (priority queue 0)
sender.add_object(0, obj);

// Always call publish after adding objects
sender.publish(SystemTime::now());

// Send FLUTE packets over UDP/IP
while sender.nb_objects() > 0 {
if let Some(pkt) = sender.read(SystemTime::now()) {
udp_socket.send(&pkt).unwrap();
} else {
std::thread::sleep(std::time::Duration::from_millis(1));
}
}
```

# Python bindings

[![PyPI version](https://badge.fury.io/py/flute-alc.svg)](https://badge.fury.io/py/flute-alc)
Expand Down
13 changes: 8 additions & 5 deletions src/sender/fdt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::sender::FDTPublishMode;
use super::toiallocator::{Toi, ToiAllocator};
use super::{objectdesc, ObjectDesc};
use crate::common::{fdtinstance::FdtInstance, lct, oti};
use crate::sender::objectdesc::CarouselRepeatMode;
use crate::sender::observer;
use crate::sender::TOIMaxLength;
use crate::tools;
Expand All @@ -25,7 +26,7 @@ pub struct Fdt {
complete: Option<bool>,
cenc: lct::Cenc,
duration: std::time::Duration,
carousel: std::time::Duration,
carousel_mode: CarouselRepeatMode,
inband_sct: bool,
last_publish: Option<SystemTime>,
observers: ObserverList,
Expand All @@ -41,7 +42,7 @@ impl Fdt {
default_oti: &oti::Oti,
cenc: lct::Cenc,
duration: std::time::Duration,
carousel: std::time::Duration,
carousel_mode: CarouselRepeatMode,
inband_sct: bool,
observers: ObserverList,
toi_max_length: TOIMaxLength,
Expand All @@ -60,7 +61,7 @@ impl Fdt {
complete: None,
cenc,
duration,
carousel,
carousel_mode,
inband_sct,
last_publish: None,
observers,
Expand Down Expand Up @@ -227,7 +228,7 @@ impl Fdt {
"text/xml",
&url::Url::parse("file:///").unwrap(),
1,
Some(self.carousel),
Some(self.carousel_mode),
None,
None,
self.groups.clone(),
Expand Down Expand Up @@ -439,7 +440,9 @@ mod tests {
&oti,
lct::Cenc::Null,
std::time::Duration::from_secs(3600),
std::time::Duration::from_secs(1),
crate::sender::objectdesc::CarouselRepeatMode::DelayBetweenTransfers(
std::time::Duration::from_secs(1),
),
true,
ObserverList::new(),
crate::sender::TOIMaxLength::ToiMax112,
Expand Down
43 changes: 32 additions & 11 deletions src/sender/filedesc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::FDTPublishMode;
use crate::common::oti::SchemeSpecific;
use crate::common::{fdtinstance, oti, partition};
use crate::error::{FluteError, Result};
use crate::sender::objectdesc::CarouselRepeatMode;
use std::sync::atomic::AtomicBool;
use std::sync::RwLock;
use std::time::SystemTime;
Expand All @@ -14,7 +15,8 @@ struct TransferInfo {
transferring: bool,
transfer_count: u32,
total_nb_transfer: u64,
last_transfer: Option<SystemTime>,
last_transfer_end_time: Option<SystemTime>,
last_transfer_start_time: Option<SystemTime>,
next_transfer_timestamp: Option<SystemTime>,
packet_transmission_tick: Option<std::time::Duration>,
transfer_start_time: Option<SystemTime>,
Expand All @@ -23,6 +25,7 @@ struct TransferInfo {
impl TransferInfo {
fn init(&mut self, object: &ObjectDesc, oti: &oti::Oti, now: SystemTime) {
self.transferring = true;
self.last_transfer_start_time = Some(now);
let mut packet_transmission_tick = None;
if let Some(target_acquisition_latency) = object.target_acquisition.as_ref() {
packet_transmission_tick = match target_acquisition_latency {
Expand Down Expand Up @@ -57,7 +60,7 @@ impl TransferInfo {
self.next_transfer_timestamp = Some(now)
}

if self.transfer_count == object.max_transfer_count && object.carousel_delay.is_some() {
if self.transfer_count == object.max_transfer_count && object.carousel_mode.is_some() {
self.transfer_count = 0;
}
}
Expand All @@ -66,7 +69,7 @@ impl TransferInfo {
self.transferring = false;
self.transfer_count += 1;
self.total_nb_transfer += 1;
self.last_transfer = Some(now);
self.last_transfer_end_time = Some(now);
}

fn tick(&mut self) {
Expand Down Expand Up @@ -175,7 +178,8 @@ impl FileDesc {
transfer_info: RwLock::new(TransferInfo {
transferring: false,
transfer_count: 0,
last_transfer: None,
last_transfer_start_time: None,
last_transfer_end_time: None,
total_nb_transfer: 0,
next_transfer_timestamp: None,
packet_transmission_tick: None,
Expand Down Expand Up @@ -214,7 +218,7 @@ impl FileDesc {
if self.object.max_transfer_count > info.transfer_count {
return false;
}
self.object.carousel_delay.is_none()
self.object.carousel_mode.is_none()
}

pub fn is_transferring(&self) -> bool {
Expand All @@ -234,14 +238,15 @@ impl FileDesc {

pub fn reset_last_transfer(&self, start_time: Option<SystemTime>) {
let mut info = self.transfer_info.write().unwrap();
info.last_transfer = None;
info.last_transfer_end_time = None;
info.last_transfer_start_time = None;
if start_time.is_some() {
info.transfer_start_time = start_time;
}
}

pub fn is_last_transfer(&self) -> bool {
if self.object.carousel_delay.is_some() {
if self.object.carousel_mode.is_some() {
return false;
}

Expand Down Expand Up @@ -271,17 +276,33 @@ impl FileDesc {
}
}

if info.transferring {
return false;
}

if self.object.max_transfer_count > info.transfer_count {
return true;
}

if self.object.carousel_delay.is_none() || info.last_transfer.is_none() {
if self.object.carousel_mode.is_none()
|| info.last_transfer_end_time.is_none()
|| info.last_transfer_start_time.is_none()
{
return true;
}

let delay = self.object.carousel_delay.as_ref().unwrap();
let last_transfer = info.last_transfer.as_ref().unwrap();
now.duration_since(*last_transfer).unwrap_or_default() > *delay
let carousel_mode = self.object.carousel_mode.as_ref().unwrap();
let (last_time, interval) = match carousel_mode {
CarouselRepeatMode::DelayBetweenTransfers(interval) => {
(info.last_transfer_end_time.as_ref().unwrap(), interval)
}
CarouselRepeatMode::IntervalBetweenStartTimes(interval) => {
(info.last_transfer_start_time.as_ref().unwrap(), interval)
}
};

let last_transfer_interval = now.duration_since(*last_time).unwrap_or_default();
last_transfer_interval > *interval
}

pub fn is_published(&self) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use objectdesc::ObjectDataSource;
pub use objectdesc::ObjectDataStream;
pub use objectdesc::ObjectDataStreamTrait;
pub use objectdesc::TargetAcquisition;
pub use objectdesc::CarouselRepeatMode;
pub use observer::Event;
pub use observer::FileInfo;
pub use observer::Subscriber;
Expand Down
Loading
Loading