Skip to content

Commit

Permalink
demux: reclaim demux_packets to reduce memory allocator pressure
Browse files Browse the repository at this point in the history
This update introduces a demux_packet_pool, allowing for the reuse of
previously allocated packets when needed.

sizeof(AVPacket) is not a part of the lavc public ABI, which prevents us
to allocate memory in larger blocks. However, we can substantially
decrease the amount of alloc/free operations during playback by reusing
both mpv's demux_packet and AVPacket.

This adjustment addresses the root cause of issue #12076, which,
although resolved upstream, did not fully tackle the persistent problem
of allocating small blocks of aligned memory. This issue largely stems
from the FFmpeg design of the AVPacket API. After this change memory
will no longer be allocated once cache limits is reached.

The demux_packet_pool is shared as a global pool of packets for a given
MPContext.

This change significantly speeds up the demuxer deinitialization,
benefiting file switching scenarios, especially when a large demuxer
cache is used.

See: #12294
See: #12563
Signed-off-by: Kacper Michajłow <kasper93@gmail.com>
  • Loading branch information
kasper93 committed Jan 26, 2025
1 parent 3550ec5 commit 91c083f
Show file tree
Hide file tree
Showing 24 changed files with 300 additions and 56 deletions.
1 change: 1 addition & 0 deletions common/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ struct mpv_global {
struct mp_client_api *client_api;
char *configdir;
struct stats_base *stats;
struct demux_packet_pool *packet_pool;
};

#endif
5 changes: 4 additions & 1 deletion common/recorder.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/msg.h"
#include "demux/demux.h"
#include "demux/packet.h"
#include "demux/packet_pool.h"
#include "demux/stheader.h"

#include "recorder.h"
Expand All @@ -41,6 +42,7 @@
struct mp_recorder {
struct mpv_global *global;
struct mp_log *log;
struct demux_packet_pool *packet_pool;

struct mp_recorder_sink **streams;
int num_streams;
Expand Down Expand Up @@ -137,6 +139,7 @@ struct mp_recorder *mp_recorder_create(struct mpv_global *global,

priv->global = global;
priv->log = mp_log_new(priv, global->log, "recorder");
priv->packet_pool = demux_packet_pool_get(global);

if (!num_streams) {
MP_ERR(priv, "No streams.\n");
Expand Down Expand Up @@ -412,7 +415,7 @@ void mp_recorder_feed_packet(struct mp_recorder_sink *rst,
return;
}

pkt = demux_copy_packet(pkt);
pkt = demux_copy_packet(rst->owner->packet_pool, pkt);
if (!pkt)
return;
MP_TARRAY_APPEND(rst, rst->packets, rst->num_packets, pkt);
Expand Down
5 changes: 4 additions & 1 deletion demux/cache.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/msg.h"
#include "common/av_common.h"
#include "demux.h"
#include "demux/packet_pool.h"
#include "misc/io_utils.h"
#include "options/path.h"
#include "options/m_config.h"
Expand Down Expand Up @@ -54,6 +55,7 @@ const struct m_sub_options demux_cache_conf = {

struct demux_cache {
struct mp_log *log;
struct demux_packet_pool *packet_pool;
struct demux_cache_opts *opts;

char *filename;
Expand Down Expand Up @@ -97,6 +99,7 @@ struct demux_cache *demux_cache_create(struct mpv_global *global,
talloc_set_destructor(cache, cache_destroy);
cache->opts = mp_get_config_group(cache, global, &demux_cache_conf);
cache->log = log;
cache->packet_pool = demux_packet_pool_get(global);
cache->fd = -1;

char *cache_dir = cache->opts->cache_dir;
Expand Down Expand Up @@ -291,7 +294,7 @@ struct demux_packet *demux_cache_read(struct demux_cache *cache, uint64_t pos)
if (!read_raw(cache, &hd, sizeof(hd)))
return NULL;

struct demux_packet *dp = new_demux_packet(hd.data_len);
struct demux_packet *dp = new_demux_packet(cache->packet_pool, hd.data_len);
if (!dp)
goto fail;

Expand Down
21 changes: 10 additions & 11 deletions demux/demux.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

#include "stream/stream.h"
#include "demux.h"
#include "packet_pool.h"
#include "timeline.h"
#include "stheader.h"
#include "cue.h"
Expand Down Expand Up @@ -150,6 +151,7 @@ const struct m_sub_options demux_conf = {
struct demux_internal {
struct mp_log *log;
struct mpv_global *global;
struct demux_packet_pool *packet_pool;
struct stats_ctx *stats;

bool can_cache; // not a slave demuxer; caching makes sense
Expand Down Expand Up @@ -701,7 +703,7 @@ static void remove_head_packet(struct demux_queue *queue)
if (!queue->head)
queue->tail = NULL;

talloc_free(dp);
demux_packet_pool_push(queue->ds->in->packet_pool, dp);
}

static void free_index(struct demux_queue *queue)
Expand All @@ -726,13 +728,7 @@ static void clear_queue(struct demux_queue *queue)

free_index(queue);

struct demux_packet *dp = queue->head;
while (dp) {
struct demux_packet *dn = dp->next;
assert(ds->reader_head != dp);
talloc_free(dp);
dp = dn;
}
demux_packet_pool_prepend(in->packet_pool, queue->head, queue->tail);
queue->head = queue->tail = NULL;
queue->keyframe_first = NULL;
queue->keyframe_latest = NULL;
Expand Down Expand Up @@ -2043,7 +2039,7 @@ static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
}

if (drop) {
talloc_free(dp);
demux_packet_pool_push(in->packet_pool, dp);
return;
}

Expand Down Expand Up @@ -2613,7 +2609,7 @@ static struct demux_packet *read_packet_from_cache(struct demux_internal *in,
}
} else {
// The returned packet is mutated etc. and will be owned by the user.
pkt = demux_copy_packet(pkt);
pkt = demux_copy_packet(in->packet_pool, pkt);
}

return pkt;
Expand All @@ -2638,7 +2634,8 @@ static int dequeue_packet(struct demux_stream *ds, double min_pts,
if (ds->attached_picture_added)
return -1;
ds->attached_picture_added = true;
struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture);
struct demux_packet *pkt = demux_copy_packet(in->packet_pool,
ds->sh->attached_picture);
MP_HANDLE_OOM(pkt);
pkt->stream = ds->sh->index;
*res = pkt;
Expand Down Expand Up @@ -3281,6 +3278,7 @@ static struct demuxer *open_given_type(struct mpv_global *global,
.filepos = -1,
.global = global,
.log = mp_log_new(demuxer, log, desc->name),
.packet_pool = demux_packet_pool_get(global),
.glog = log,
.filename = talloc_strdup(demuxer, sinfo->filename),
.is_network = sinfo->is_network,
Expand All @@ -3297,6 +3295,7 @@ static struct demuxer *open_given_type(struct mpv_global *global,
*in = (struct demux_internal){
.global = global,
.log = demuxer->log,
.packet_pool = demux_packet_pool_get(global),
.stats = stats_ctx_create(in, global, "demuxer"),
.can_cache = params && params->is_top_level,
.can_record = params && params->stream_record,
Expand Down
1 change: 1 addition & 0 deletions demux/demux.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ typedef struct demuxer {
void *priv; // demuxer-specific internal data
struct mpv_global *global;
struct mp_log *log, *glog;
struct demux_packet_pool *packet_pool;
struct demuxer_params *params;

// internal to demux.c
Expand Down
4 changes: 2 additions & 2 deletions demux/demux_lavf.c
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ static void handle_new_stream(demuxer_t *demuxer, int i)
!(st->disposition & AV_DISPOSITION_TIMED_THUMBNAILS))
{
sh->attached_picture =
new_demux_packet_from_avpacket(&st->attached_pic);
new_demux_packet_from_avpacket(demuxer->packet_pool, &st->attached_pic);
if (sh->attached_picture) {
sh->attached_picture->pts = 0;
talloc_steal(sh, sh->attached_picture);
Expand Down Expand Up @@ -1221,7 +1221,7 @@ static bool demux_lavf_read_packet(struct demuxer *demux,
return true;
}

struct demux_packet *dp = new_demux_packet_from_avpacket(pkt);
struct demux_packet *dp = new_demux_packet_from_avpacket(demux->packet_pool, pkt);
if (!dp) {
av_packet_unref(pkt);
return true;
Expand Down
2 changes: 1 addition & 1 deletion demux/demux_mf.c
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ static bool demux_mf_read_packet(struct demuxer *demuxer,
stream_seek(stream, 0);
bstr data = stream_read_complete(stream, NULL, MF_MAX_FILE_SIZE);
if (data.len) {
demux_packet_t *dp = new_demux_packet(data.len);
demux_packet_t *dp = new_demux_packet(demuxer->packet_pool, data.len);
if (dp) {
memcpy(dp->buffer, data.start, data.len);
dp->pts = mf->curr_frame / mf->sh->codec->fps;
Expand Down
26 changes: 16 additions & 10 deletions demux/demux_mkv.c
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
#include "video/csputils.h"
#include "video/mp_image.h"
#include "demux.h"
#include "packet_pool.h"
#include "stheader.h"
#include "ebml.h"
#include "matroska.h"
Expand Down Expand Up @@ -1485,7 +1486,8 @@ static void add_coverart(struct demuxer *demuxer)
continue;
struct sh_stream *sh = demux_alloc_sh_stream(STREAM_VIDEO);
sh->codec->codec = codec;
sh->attached_picture = new_demux_packet_from(att->data, att->data_size);
sh->attached_picture = new_demux_packet_from(demuxer->packet_pool,
att->data, att->data_size);
if (sh->attached_picture) {
sh->attached_picture->pts = 0;
talloc_steal(sh, sh->attached_picture);
Expand Down Expand Up @@ -2221,7 +2223,8 @@ static void probe_x264_garbage(demuxer_t *demuxer)
if (!nblock.len)
continue;

sh->codec->first_packet = new_demux_packet_from(nblock.start, nblock.len);
sh->codec->first_packet = new_demux_packet_from(demuxer->packet_pool,
nblock.start, nblock.len);
talloc_steal(mkv_d, sh->codec->first_packet);

if (nblock.start != sblock.start)
Expand Down Expand Up @@ -2621,8 +2624,9 @@ static bool handle_realaudio(demuxer_t *demuxer, mkv_track_t *track,
goto error;
// Release all the audio packets
for (int x = 0; x < sph * w / apk_usize; x++) {
dp = new_demux_packet_from(track->audio_buf + x * apk_usize,
apk_usize);
dp = new_demux_packet_from(demuxer->packet_pool,
track->audio_buf + x * apk_usize,
apk_usize);
if (!dp)
goto error;
/* Put timestamp only on packets that correspond to original
Expand Down Expand Up @@ -2759,7 +2763,8 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
int size = dp->len;
uint8_t *parsed;
if (libav_parse_wavpack(track, dp->buffer, &parsed, &size) >= 0) {
struct demux_packet *new = new_demux_packet_from(parsed, size);
struct demux_packet *new = new_demux_packet_from(demuxer->packet_pool,
parsed, size);
if (new) {
demux_packet_copy_attribs(new, dp);
talloc_free(dp);
Expand All @@ -2771,7 +2776,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,

if (strcmp(stream->codec->codec, "prores") == 0) {
size_t newlen = dp->len + 8;
struct demux_packet *new = new_demux_packet(newlen);
struct demux_packet *new = new_demux_packet(demuxer->packet_pool, newlen);
if (new) {
AV_WB32(new->buffer + 0, newlen);
AV_WB32(new->buffer + 4, MKBETAG('i', 'c', 'p', 'f'));
Expand Down Expand Up @@ -2817,7 +2822,8 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
dp->len -= len;
dp->pos += len;
if (size) {
struct demux_packet *new = new_demux_packet_from(data, size);
struct demux_packet *new = new_demux_packet_from(demuxer->packet_pool,
data, size);
if (!new)
break;
if (copy_sidedata)
Expand All @@ -2838,7 +2844,7 @@ static void mkv_parse_and_add_packet(demuxer_t *demuxer, mkv_track_t *track,
if (dp->len) {
add_packet(demuxer, stream, dp);
} else {
talloc_free(dp);
demux_packet_pool_push(demuxer->packet_pool, dp);
}
}

Expand Down Expand Up @@ -2991,9 +2997,9 @@ static int handle_block(demuxer_t *demuxer, struct block_info *block_info)

if (block.start != nblock.start || block.len != nblock.len) {
// (avoidable copy of the entire data)
dp = new_demux_packet_from(nblock.start, nblock.len);
dp = new_demux_packet_from(demuxer->packet_pool, nblock.start, nblock.len);
} else {
dp = new_demux_packet_from_buf(data);
dp = new_demux_packet_from_buf(demuxer->packet_pool, data);
}
if (!dp)
break;
Expand Down
3 changes: 2 additions & 1 deletion demux/demux_raw.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ static bool raw_read_packet(struct demuxer *demuxer, struct demux_packet **pkt)
if (demuxer->stream->eof)
return false;

struct demux_packet *dp = new_demux_packet(p->frame_size * p->read_frames);
struct demux_packet *dp = new_demux_packet(demuxer->packet_pool,
p->frame_size * p->read_frames);
if (!dp) {
MP_ERR(demuxer, "Can't read packet.\n");
return true;
Expand Down
Loading

0 comments on commit 91c083f

Please sign in to comment.