Skip to content

Commit 9245138

Browse files
committed
implemented blocking when queue is full (for file processing)
1 parent 4b81f55 commit 9245138

File tree

3 files changed

+54
-40
lines changed

3 files changed

+54
-40
lines changed

Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ repository = "https://github.com/insight-platform/FFmpeg-Input"
77
readme = "README.md"
88
keywords = ["FFmpeg", "Video"]
99
categories = ["computer-vision"]
10-
version = "0.1.23"
10+
version = "0.1.24"
1111
edition = "2021"
12-
license="Apache-2.0"
12+
license = "Apache-2.0"
1313
rust-version = "1.62"
1414

1515
[lib]
@@ -22,15 +22,15 @@ env_logger = "0.11"
2222
parking_lot = "0.12"
2323

2424
[dependencies.ffmpeg-next]
25-
version = "6"
25+
version = "7"
2626
features = ["default"]
2727

2828
[dependencies.pyo3]
2929
version = "0.20"
3030
features = ["extension-module"]
3131

3232
[build-dependencies]
33-
pyo3-build-config = { version = "0.20"}
33+
pyo3-build-config = { version = "0.20" }
3434

3535
[profile.release]
3636
opt-level = 3

src/lib.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,15 @@ impl Drop for FFMpegSource {
138138
}
139139
}
140140

141+
#[allow(clippy::too_many_arguments)]
141142
fn handle(
142143
uri: String,
143144
params: HashMap<String, String>,
144145
tx: Sender<VideoFrameEnvelope>,
145146
signal: Arc<Mutex<bool>>,
146147
decode: bool,
147148
autoconvert_raw_formats_to_rgb24: bool,
149+
block_if_queue_full: bool,
148150
log_level: Arc<Mutex<Option<Level>>>,
149151
) {
150152
let mut queue_full_skipped_count = 0;
@@ -317,41 +319,48 @@ fn handle(
317319
codec, fps, avg_fps, width, height, key_frame, raw_frame.len(),
318320
pts, dts, corrupted, pixel_format);
319321

320-
if !tx.is_full() {
321-
let frame_processed_ts = i64::try_from(
322-
SystemTime::now()
323-
.duration_since(SystemTime::UNIX_EPOCH)
324-
.unwrap()
325-
.as_millis(),
326-
)
327-
.expect("Milliseconds must fit i64");
328-
329-
let res = tx.send(VideoFrameEnvelope {
330-
codec,
331-
frame_width: i64::from(width),
332-
frame_height: i64::from(height),
333-
key_frame,
334-
pts,
335-
dts,
336-
corrupted,
337-
time_base: (time_base_r.0 as i64, time_base_r.1 as i64),
338-
fps,
339-
avg_fps,
340-
pixel_format,
341-
queue_full_skipped_count,
342-
payload: raw_frame,
343-
frame_received_ts,
344-
frame_processed_ts,
345-
queue_len: i64::try_from(tx.len()).unwrap(),
346-
});
347-
348-
if let Err(e) = res {
349-
error!("Unable to send data to upstream. Error is: {:?}", e);
350-
break;
322+
let frame_processed_ts = i64::try_from(
323+
SystemTime::now()
324+
.duration_since(SystemTime::UNIX_EPOCH)
325+
.unwrap()
326+
.as_millis(),
327+
)
328+
.expect("Milliseconds must fit i64");
329+
330+
let frame_envelope = VideoFrameEnvelope {
331+
codec,
332+
frame_width: i64::from(width),
333+
frame_height: i64::from(height),
334+
key_frame,
335+
pts,
336+
dts,
337+
corrupted,
338+
time_base: (time_base_r.0 as i64, time_base_r.1 as i64),
339+
fps,
340+
avg_fps,
341+
pixel_format,
342+
queue_full_skipped_count,
343+
payload: raw_frame,
344+
frame_received_ts,
345+
frame_processed_ts,
346+
queue_len: i64::try_from(tx.len()).unwrap(),
347+
};
348+
349+
if !block_if_queue_full {
350+
if !tx.is_full() {
351+
let res = tx.send(frame_envelope);
352+
353+
if let Err(e) = res {
354+
error!("Unable to send data to upstream. Error is: {:?}", e);
355+
break;
356+
}
357+
} else {
358+
dbg!("Sink queue is full, the frame is skipped.");
359+
queue_full_skipped_count += 1;
351360
}
352361
} else {
353-
warn!("Sink queue is full, the frame is skipped.");
354-
queue_full_skipped_count += 1;
362+
tx.send(frame_envelope)
363+
.expect("Unable to send data to upstream");
355364
}
356365
}
357366
}
@@ -378,6 +387,7 @@ impl FFMpegSource {
378387
queue_len = 32,
379388
decode = false,
380389
autoconvert_raw_formats_to_rgb24 = false,
390+
block_if_queue_full = false,
381391
ffmpeg_log_level = FFmpegLogLevel::Info)
382392
)]
383393
pub fn new(
@@ -386,6 +396,7 @@ impl FFMpegSource {
386396
queue_len: i64,
387397
decode: bool,
388398
autoconvert_raw_formats_to_rgb24: bool,
399+
block_if_queue_full: bool,
389400
ffmpeg_log_level: FFmpegLogLevel,
390401
) -> Self {
391402
assert!(queue_len > 0, "Queue length must be a positive number");
@@ -406,6 +417,7 @@ impl FFMpegSource {
406417
thread_exit_signal,
407418
decode,
408419
autoconvert_raw_formats_to_rgb24,
420+
block_if_queue_full,
409421
thread_ll,
410422
)
411423
}));

test.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,22 @@
11
from ffmpeg_input import FFMpegSource, FFmpegLogLevel
2+
from time import sleep
23

34
if __name__ == '__main__':
4-
s = FFMpegSource("/home/ivan/road-traffic-processed.mp4", params={"fflags": "+genpts"},
5-
queue_len=100, decode=False,
5+
s = FFMpegSource("/home/ivan/file.mp4", params={},
6+
queue_len=10, decode=False,
7+
block_if_queue_full=True,
68
ffmpeg_log_level=FFmpegLogLevel.Debug)
79
s.log_level = FFmpegLogLevel.Trace
810
while True:
911
try:
12+
sleep(1)
1013
p = s.video_frame()
1114
print("Codec: {}, Geometry: {}x{}".format(p.codec, p.frame_width, p.frame_height))
1215
print("System ts, when the frame was received from the source:", p.frame_received_ts)
1316
print("Current queue length:", p.queue_len)
1417
print("Time base, pts, dts:", p.time_base, p.pts, p.dts)
1518
print("Skipped frames because of queue overflow:", p.queue_full_skipped_count)
1619
print("Payload length:", len(p.payload_as_bytes()))
17-
break
1820
except BrokenPipeError:
1921
print("EOS")
2022
break

0 commit comments

Comments
 (0)