diff --git a/libavcodec/avcodec.c b/libavcodec/avcodec.c index 214dca4566b25..6065f1b6896a7 100644 --- a/libavcodec/avcodec.c +++ b/libavcodec/avcodec.c @@ -381,10 +381,13 @@ void avcodec_flush_buffers(AVCodecContext *avctx) avci->draining = 0; avci->draining_done = 0; - av_frame_unref(avci->buffer_frame); - av_packet_unref(avci->buffer_pkt); + if (avci->buffer_frame) + av_frame_unref(avci->buffer_frame); + if (avci->buffer_pkt) + av_packet_unref(avci->buffer_pkt); - if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME) + if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME && + !avci->is_frame_mt) ff_thread_flush(avctx); else if (ffcodec(avctx->codec)->flush) ffcodec(avctx->codec)->flush(avctx); diff --git a/libavcodec/avcodec_internal.h b/libavcodec/avcodec_internal.h index 816f39ae76216..2f0aaab93bd77 100644 --- a/libavcodec/avcodec_internal.h +++ b/libavcodec/avcodec_internal.h @@ -84,16 +84,23 @@ void ff_thread_free(struct AVCodecContext *s); void ff_thread_flush(struct AVCodecContext *avctx); /** - * Submit a new frame to a decoding thread. - * Returns the next available frame in picture. *got_picture_ptr - * will be 0 if none is available. - * The return value on success is the size of the consumed packet for - * compatibility with FFCodec.decode. This means the decoder - * has to consume the full packet. + * Submit available packets for decoding to worker threads, return a + * decoded frame if available. Returns AVERROR(EAGAIN) if none is available. * - * Parameters are the same as FFCodec.decode. + * Parameters are the same as FFCodec.receive_frame. */ -int ff_thread_decode_frame(struct AVCodecContext *avctx, struct AVFrame *frame, - int *got_picture_ptr, struct AVPacket *avpkt); +int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame); + +/** + * Do the actual decoding and obtain a decoded frame from the decoder, if + * available. When frame threading is used, this is invoked by the worker + * threads, otherwise by the top layer directly. + */ +int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame); + +/** + * Get a packet for decoding. This gets invoked by the worker threads. + */ +int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt); #endif // AVCODEC_AVCODEC_INTERNAL_H diff --git a/libavcodec/decode.c b/libavcodec/decode.c index 7cc2543c5c519..8d830f3f09e35 100644 --- a/libavcodec/decode.c +++ b/libavcodec/decode.c @@ -207,6 +207,11 @@ static int decode_bsfs_init(AVCodecContext *avctx) return ret; } +#if !HAVE_THREADS +#define ff_thread_get_packet(avctx, pkt) (AVERROR_BUG) +#define ff_thread_receive_frame(avctx, frame) (AVERROR_BUG) +#endif + static int decode_get_packet(AVCodecContext *avctx, AVPacket *pkt) { AVCodecInternal *avci = avctx->internal; @@ -240,6 +245,13 @@ int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt) if (avci->draining) return AVERROR_EOF; + /* If we are a worker thread, get the next packet from the threading + * context. Otherwise we are the main (user-facing) context, so we get the + * next packet from the input filterchain. + */ + if (avctx->internal->is_frame_mt) + return ff_thread_get_packet(avctx, pkt); + while (1) { int ret = decode_get_packet(avctx, pkt); if (ret == AVERROR(EAGAIN) && @@ -413,15 +425,11 @@ static inline int decode_simple_internal(AVCodecContext *avctx, AVFrame *frame, return AVERROR_EOF; if (!pkt->data && - !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY || - avctx->active_thread_type & FF_THREAD_FRAME)) + !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY)) return AVERROR_EOF; got_frame = 0; - if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME) { - consumed = ff_thread_decode_frame(avctx, frame, &got_frame, pkt); - } else { frame->pict_type = dc->initial_pict_type; frame->flags |= dc->intra_only_flag; consumed = codec->cb.decode(avctx, frame, &got_frame, pkt); @@ -436,7 +444,6 @@ FF_DISABLE_DEPRECATION_WARNINGS FF_ENABLE_DEPRECATION_WARNINGS #endif } - } emms_c(); if (avctx->codec->type == AVMEDIA_TYPE_VIDEO) { @@ -603,12 +610,12 @@ static int decode_simple_receive_frame(AVCodecContext *avctx, AVFrame *frame) return 0; } -static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame) +int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame) { AVCodecInternal *avci = avctx->internal; DecodeContext *dc = decode_ctx(avci); const FFCodec *const codec = ffcodec(avctx->codec); - int ret, ok; + int ret; av_assert0(!frame->buf[0]); @@ -636,6 +643,20 @@ static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame) if (ret == AVERROR_EOF) avci->draining_done = 1; + return ret; +} + +static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame) +{ + AVCodecInternal *avci = avctx->internal; + DecodeContext *dc = decode_ctx(avci); + int ret, ok; + + if (avctx->active_thread_type & FF_THREAD_FRAME) + ret = ff_thread_receive_frame(avctx, frame); + else + ret = ff_decode_receive_frame_internal(avctx, frame); + /* preserve ret */ ok = detect_colorspace(avctx, frame); if (ok < 0) { @@ -2151,7 +2172,8 @@ void ff_decode_flush_buffers(AVCodecContext *avctx) dc->pts_correction_last_pts = dc->pts_correction_last_dts = INT64_MIN; - av_bsf_flush(avci->bsf); + if (avci->bsf) + av_bsf_flush(avci->bsf); dc->nb_draining_errors = 0; dc->draining_started = 0; diff --git a/libavcodec/internal.h b/libavcodec/internal.h index d7b0b9f8801af..98ab2797cef78 100644 --- a/libavcodec/internal.h +++ b/libavcodec/internal.h @@ -53,6 +53,13 @@ typedef struct AVCodecInternal { */ int is_copy; + /** + * This field is set to 1 when frame threading is being used and the parent + * AVCodecContext of this AVCodecInternal is a worker-thread context (i.e. + * one of those actually doing the decoding), 0 otherwise. + */ + int is_frame_mt; + /** * Audio encoders can set this flag during init to indicate that they * want the small last frame to be padded to a multiple of pad_samples. diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c index 85a3dce929157..019e33b7b2997 100644 --- a/libavcodec/pthread_frame.c +++ b/libavcodec/pthread_frame.c @@ -32,6 +32,7 @@ #include "hwaccel_internal.h" #include "hwconfig.h" #include "internal.h" +#include "packet_internal.h" #include "pthread_internal.h" #include "refstruct.h" #include "thread.h" @@ -64,6 +65,12 @@ enum { INITIALIZED, ///< Thread has been properly set up }; +typedef struct DecodedFrames { + AVFrame **f; + size_t nb_f; + size_t nb_f_allocated; +} DecodedFrames; + typedef struct ThreadFrameProgress { atomic_int progress[2]; } ThreadFrameProgress; @@ -88,8 +95,10 @@ typedef struct PerThreadContext { AVPacket *avpkt; ///< Input packet (for decoding) or output (for encoding). - AVFrame *frame; ///< Output frame (for decoding) or input (for encoding). - int got_frame; ///< The output of got_picture_ptr from the last avcodec_decode_video() call. + /** + * Decoded frames from a single decode iteration. + */ + DecodedFrames df; int result; ///< The result of the last codec decode/encode() call. atomic_int state; @@ -130,14 +139,17 @@ typedef struct FrameThreadContext { pthread_cond_t async_cond; int async_lock; + DecodedFrames df; + int result; + + /** + * Packet to be submitted to the next thread for decoding. + */ + AVPacket *next_pkt; + int next_decoding; ///< The next context to submit a packet to. int next_finished; ///< The next context to return output from. - int delaying; /**< - * Set for the first N packets, where N is the number of threads. - * While it is set, ff_thread_en/decode_frame won't return any results. - */ - /* hwaccel state for thread-unsafe hwaccels is temporarily stored here in * order to transfer its ownership to the next decoding thread without the * need for extra synchronization */ @@ -180,6 +192,52 @@ static void thread_set_name(PerThreadContext *p) ff_thread_setname(name); } +// get a free frame to decode into +static AVFrame *decoded_frames_get_free(DecodedFrames *df) +{ + if (df->nb_f == df->nb_f_allocated) { + AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1, + sizeof(*df->f)); + if (!tmp) + return NULL; + df->f = tmp; + + df->f[df->nb_f] = av_frame_alloc(); + if (!df->f[df->nb_f]) + return NULL; + + df->nb_f_allocated++; + } + + av_assert0(!df->f[df->nb_f]->buf[0]); + + return df->f[df->nb_f]; +} + +static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst) +{ + AVFrame *tmp_frame = df->f[0]; + av_frame_move_ref(dst, tmp_frame); + memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f)); + df->f[--df->nb_f] = tmp_frame; +} + +static void decoded_frames_flush(DecodedFrames *df) +{ + for (size_t i = 0; i < df->nb_f; i++) + av_frame_unref(df->f[i]); + df->nb_f = 0; +} + +static void decoded_frames_free(DecodedFrames *df) +{ + for (size_t i = 0; i < df->nb_f_allocated; i++) + av_frame_free(&df->f[i]); + av_freep(&df->f); + df->nb_f = 0; + df->nb_f_allocated = 0; +} + /** * Codec worker thread. * @@ -197,6 +255,8 @@ static attribute_align_arg void *frame_worker_thread(void *arg) pthread_mutex_lock(&p->mutex); while (1) { + int ret; + while (atomic_load(&p->state) == STATE_INPUT_READY && !p->die) pthread_cond_wait(&p->input_cond, &p->mutex); @@ -220,18 +280,31 @@ static attribute_align_arg void *frame_worker_thread(void *arg) p->hwaccel_serializing = 1; } - av_frame_unref(p->frame); - p->got_frame = 0; - p->frame->pict_type = p->initial_pict_type; - p->frame->flags |= p->intra_only_flag; - p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt); + ret = 0; + while (ret >= 0) { + AVFrame *frame; - if ((p->result < 0 || !p->got_frame) && p->frame->buf[0]) - av_frame_unref(p->frame); + /* get the frame which will store the output */ + frame = decoded_frames_get_free(&p->df); + if (!frame) { + p->result = AVERROR(ENOMEM); + goto alloc_fail; + } + + /* do the actual decoding */ + ret = ff_decode_receive_frame_internal(avctx, frame); + if (ret == 0) + p->df.nb_f++; + else if (ret < 0 && frame->buf[0]) + av_frame_unref(frame); + + p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret; + } if (atomic_load(&p->state) == STATE_SETTING_UP) ff_thread_finish_setup(avctx); +alloc_fail: if (p->hwaccel_serializing) { /* wipe hwaccel state for thread-unsafe hwaccels to avoid stale * pointers lying around; @@ -426,18 +499,21 @@ static int update_context_from_user(AVCodecContext *dst, const AVCodecContext *s } static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, - AVPacket *avpkt) + AVPacket *in_pkt) { FrameThreadContext *fctx = p->parent; PerThreadContext *prev_thread = fctx->prev_thread; const AVCodec *codec = p->avctx->codec; int ret; - if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY)) - return 0; - pthread_mutex_lock(&p->mutex); + av_packet_unref(p->avpkt); + av_packet_move_ref(p->avpkt, in_pkt); + + if (AVPACKET_IS_EMPTY(p->avpkt)) + p->avctx->internal->draining = 1; + ret = update_context_from_user(p->avctx, user_avctx); if (ret) { pthread_mutex_unlock(&p->mutex); @@ -448,7 +524,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, memory_order_relaxed); if (prev_thread) { - int err; if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) { pthread_mutex_lock(&prev_thread->progress_mutex); while (atomic_load(&prev_thread->state) == STATE_SETTING_UP) @@ -456,10 +531,16 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, pthread_mutex_unlock(&prev_thread->progress_mutex); } - err = update_context_from_thread(p->avctx, prev_thread->avctx, 0); - if (err) { - pthread_mutex_unlock(&p->mutex); - return err; + /* codecs without delay might not be prepared to be called repeatedly here during + * flushing (vp3/theora), and also don't need to be, since from this point on, they + * will always return EOF anyway */ + if (!p->avctx->internal->draining || + (codec->capabilities & AV_CODEC_CAP_DELAY)) { + ret = update_context_from_thread(p->avctx, prev_thread->avctx, 0); + if (ret) { + pthread_mutex_unlock(&p->mutex); + return ret; + } } } @@ -471,70 +552,47 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx, FFSWAP(void*, p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv); } - av_packet_unref(p->avpkt); - ret = av_packet_ref(p->avpkt, avpkt); - if (ret < 0) { - pthread_mutex_unlock(&p->mutex); - av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n"); - return ret; - } - atomic_store(&p->state, STATE_SETTING_UP); pthread_cond_signal(&p->input_cond); pthread_mutex_unlock(&p->mutex); fctx->prev_thread = p; - fctx->next_decoding++; + fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count; return 0; } -int ff_thread_decode_frame(AVCodecContext *avctx, - AVFrame *picture, int *got_picture_ptr, - AVPacket *avpkt) +int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame) { FrameThreadContext *fctx = avctx->internal->thread_ctx; - int finished = fctx->next_finished; - PerThreadContext *p; - int err; + int ret = 0; /* release the async lock, permitting blocked hwaccel threads to * go forward while we are in this function */ async_unlock(fctx); - /* - * Submit a packet to the next decoding thread. - */ - - p = &fctx->threads[fctx->next_decoding]; - err = submit_packet(p, avctx, avpkt); - if (err) - goto finish; - - /* - * If we're still receiving the initial packets, don't return a frame. - */ - - if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1))) - fctx->delaying = 0; + /* submit packets to threads while there are no buffered results to return */ + while (!fctx->df.nb_f && !fctx->result) { + PerThreadContext *p; - if (fctx->delaying) { - *got_picture_ptr=0; - if (avpkt->size) { - err = avpkt->size; + /* get a packet to be submitted to the next thread */ + av_packet_unref(fctx->next_pkt); + ret = ff_decode_get_packet(avctx, fctx->next_pkt); + if (ret < 0 && ret != AVERROR_EOF) goto finish; - } - } - /* - * Return the next available frame from the oldest thread. - * If we're at the end of the stream, then we have to skip threads that - * didn't output a frame/error, because we don't want to accidentally signal - * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0). - */ + ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx, + fctx->next_pkt); + if (ret < 0) + goto finish; - do { - p = &fctx->threads[finished++]; + /* do not return any frames until all threads have something to do */ + if (fctx->next_decoding != fctx->next_finished && + !avctx->internal->draining) + continue; + + p = &fctx->threads[fctx->next_finished]; + fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count; if (atomic_load(&p->state) != STATE_INPUT_READY) { pthread_mutex_lock(&p->progress_mutex); @@ -543,35 +601,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx, pthread_mutex_unlock(&p->progress_mutex); } - av_frame_move_ref(picture, p->frame); - *got_picture_ptr = p->got_frame; - picture->pkt_dts = p->avpkt->dts; - err = p->result; - - /* - * A later call with avkpt->size == 0 may loop over all threads, - * including this one, searching for a frame/error to return before being - * stopped by the "finished != fctx->next_finished" condition. - * Make sure we don't mistakenly return the same frame/error again. - */ - p->got_frame = 0; - p->result = 0; - - if (finished >= avctx->thread_count) finished = 0; - } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished); - - update_context_from_thread(avctx, p->avctx, 1); - - if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0; + update_context_from_thread(avctx, p->avctx, 1); + fctx->result = p->result; + p->result = 0; + if (p->df.nb_f) + FFSWAP(DecodedFrames, fctx->df, p->df); + } - fctx->next_finished = finished; + /* a thread may return multiple frames AND an error + * we first return all the frames, then the error */ + if (fctx->df.nb_f) { + decoded_frames_pop(&fctx->df, frame); + ret = 0; + } else { + ret = fctx->result; + fctx->result = 0; + } - /* return the size of the consumed packet if no error occurred */ - if (err >= 0) - err = avpkt->size; finish: async_lock(fctx); - return err; + return ret; } void ff_thread_report_progress(ThreadFrame *f, int n, int field) @@ -679,7 +728,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count pthread_cond_wait(&p->output_cond, &p->progress_mutex); pthread_mutex_unlock(&p->progress_mutex); } - p->got_frame = 0; } async_lock(fctx); @@ -732,6 +780,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) } ff_refstruct_unref(&ctx->internal->pool); + av_packet_free(&ctx->internal->in_pkt); av_packet_free(&ctx->internal->last_pkt_props); av_freep(&ctx->internal); av_buffer_unref(&ctx->hw_frames_ctx); @@ -739,7 +788,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) &ctx->nb_decoded_side_data); } - av_frame_free(&p->frame); + decoded_frames_free(&p->df); ff_pthread_free(p, per_thread_offsets); av_packet_free(&p->avpkt); @@ -747,6 +796,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count) av_freep(&p->avctx); } + decoded_frames_free(&fctx->df); + av_packet_free(&fctx->next_pkt); + av_freep(&fctx->threads); ff_pthread_free(fctx, thread_ctx_offsets); @@ -815,13 +867,17 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free, if (err < 0) return err; - if (!(p->frame = av_frame_alloc()) || - !(p->avpkt = av_packet_alloc())) + if (!(p->avpkt = av_packet_alloc())) return AVERROR(ENOMEM); + copy->internal->is_frame_mt = 1; if (!first) copy->internal->is_copy = 1; + copy->internal->in_pkt = av_packet_alloc(); + if (!copy->internal->in_pkt) + return AVERROR(ENOMEM); + copy->internal->last_pkt_props = av_packet_alloc(); if (!copy->internal->last_pkt_props) return AVERROR(ENOMEM); @@ -891,8 +947,11 @@ int ff_frame_thread_init(AVCodecContext *avctx) return err; } + fctx->next_pkt = av_packet_alloc(); + if (!fctx->next_pkt) + return AVERROR(ENOMEM); + fctx->async_lock = 1; - fctx->delaying = 1; if (codec->p.type == AVMEDIA_TYPE_VIDEO) avctx->delay = avctx->thread_count - 1; @@ -933,17 +992,18 @@ void ff_thread_flush(AVCodecContext *avctx) } fctx->next_decoding = fctx->next_finished = 0; - fctx->delaying = 1; fctx->prev_thread = NULL; + + decoded_frames_flush(&fctx->df); + fctx->result = 0; + for (i = 0; i < avctx->thread_count; i++) { PerThreadContext *p = &fctx->threads[i]; - // Make sure decode flush calls with size=0 won't return old frames - p->got_frame = 0; - av_frame_unref(p->frame); + + decoded_frames_flush(&p->df); p->result = 0; - if (ffcodec(avctx->codec)->flush) - ffcodec(avctx->codec)->flush(p->avctx); + avcodec_flush_buffers(p->avctx); } } @@ -1039,3 +1099,15 @@ enum ThreadingStatus ff_thread_sync_ref(AVCodecContext *avctx, size_t offset) return FF_THREAD_IS_COPY; } + +int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt) +{ + PerThreadContext *p = avctx->internal->thread_ctx; + + if (!AVPACKET_IS_EMPTY(p->avpkt)) { + av_packet_move_ref(pkt, p->avpkt); + return 0; + } + + return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN); +}