• Main Page
  • Related Pages
  • Modules
  • Data Structures
  • Files
  • Examples
  • File List
  • Globals

libavcodec/pthread.c

Go to the documentation of this file.
00001 /*
00002  * Copyright (c) 2004 Roman Shaposhnik
00003  * Copyright (c) 2008 Alexander Strange (astrange@ithinksw.com)
00004  *
00005  * Many thanks to Steven M. Schultz for providing clever ideas and
00006  * to Michael Niedermayer <michaelni@gmx.at> for writing initial
00007  * implementation.
00008  *
00009  * This file is part of FFmpeg.
00010  *
00011  * FFmpeg is free software; you can redistribute it and/or
00012  * modify it under the terms of the GNU Lesser General Public
00013  * License as published by the Free Software Foundation; either
00014  * version 2.1 of the License, or (at your option) any later version.
00015  *
00016  * FFmpeg is distributed in the hope that it will be useful,
00017  * but WITHOUT ANY WARRANTY; without even the implied warranty of
00018  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00019  * Lesser General Public License for more details.
00020  *
00021  * You should have received a copy of the GNU Lesser General Public
00022  * License along with FFmpeg; if not, write to the Free Software
00023  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
00024  */
00025 
00032 #include <pthread.h>
00033 
00034 #include "avcodec.h"
00035 #include "thread.h"
00036 
00037 typedef int (action_func)(AVCodecContext *c, void *arg);
00038 typedef int (action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr);
00039 
00040 typedef struct ThreadContext {
00041     pthread_t *workers;
00042     action_func *func;
00043     action_func2 *func2;
00044     void *args;
00045     int *rets;
00046     int rets_count;
00047     int job_count;
00048     int job_size;
00049 
00050     pthread_cond_t last_job_cond;
00051     pthread_cond_t current_job_cond;
00052     pthread_mutex_t current_job_lock;
00053     int current_job;
00054     int done;
00055 } ThreadContext;
00056 
00058 #define MAX_BUFFERS (32+1)
00059 
00063 typedef struct PerThreadContext {
00064     struct FrameThreadContext *parent;
00065 
00066     pthread_t      thread;
00067     pthread_cond_t input_cond;      
00068     pthread_cond_t progress_cond;   
00069     pthread_cond_t output_cond;     
00070 
00071     pthread_mutex_t mutex;          
00072     pthread_mutex_t progress_mutex; 
00073 
00074     AVCodecContext *avctx;          
00075 
00076     AVPacket       avpkt;           
00077     int            allocated_buf_size; 
00078 
00079     AVFrame frame;                  
00080     int     got_frame;              
00081     int     result;                 
00082 
00083     enum {
00084         STATE_INPUT_READY,          
00085         STATE_SETTING_UP,           
00086         STATE_GET_BUFFER,           
00090         STATE_SETUP_FINISHED        
00091     } state;
00092 
00097     AVFrame released_buffers[MAX_BUFFERS];
00098     int     num_released_buffers;
00099 
00103     int     progress[MAX_BUFFERS][2];
00104     uint8_t progress_used[MAX_BUFFERS];
00105 
00106     AVFrame *requested_frame;       
00107 } PerThreadContext;
00108 
00112 typedef struct FrameThreadContext {
00113     PerThreadContext *threads;     
00114     PerThreadContext *prev_thread; 
00115 
00116     pthread_mutex_t buffer_mutex;  
00117 
00118     int next_decoding;             
00119     int next_finished;             
00120 
00121     int delaying;                  
00126     int die;                       
00127 } FrameThreadContext;
00128 
00129 static void* attribute_align_arg worker(void *v)
00130 {
00131     AVCodecContext *avctx = v;
00132     ThreadContext *c = avctx->thread_opaque;
00133     int our_job = c->job_count;
00134     int thread_count = avctx->thread_count;
00135     int self_id;
00136 
00137     pthread_mutex_lock(&c->current_job_lock);
00138     self_id = c->current_job++;
00139     for (;;){
00140         while (our_job >= c->job_count) {
00141             if (c->current_job == thread_count + c->job_count)
00142                 pthread_cond_signal(&c->last_job_cond);
00143 
00144             pthread_cond_wait(&c->current_job_cond, &c->current_job_lock);
00145             our_job = self_id;
00146 
00147             if (c->done) {
00148                 pthread_mutex_unlock(&c->current_job_lock);
00149                 return NULL;
00150             }
00151         }
00152         pthread_mutex_unlock(&c->current_job_lock);
00153 
00154         c->rets[our_job%c->rets_count] = c->func ? c->func(avctx, (char*)c->args + our_job*c->job_size):
00155                                                    c->func2(avctx, c->args, our_job, self_id);
00156 
00157         pthread_mutex_lock(&c->current_job_lock);
00158         our_job = c->current_job++;
00159     }
00160 }
00161 
00162 static av_always_inline void avcodec_thread_park_workers(ThreadContext *c, int thread_count)
00163 {
00164     pthread_cond_wait(&c->last_job_cond, &c->current_job_lock);
00165     pthread_mutex_unlock(&c->current_job_lock);
00166 }
00167 
00168 static void thread_free(AVCodecContext *avctx)
00169 {
00170     ThreadContext *c = avctx->thread_opaque;
00171     int i;
00172 
00173     pthread_mutex_lock(&c->current_job_lock);
00174     c->done = 1;
00175     pthread_cond_broadcast(&c->current_job_cond);
00176     pthread_mutex_unlock(&c->current_job_lock);
00177 
00178     for (i=0; i<avctx->thread_count; i++)
00179          pthread_join(c->workers[i], NULL);
00180 
00181     pthread_mutex_destroy(&c->current_job_lock);
00182     pthread_cond_destroy(&c->current_job_cond);
00183     pthread_cond_destroy(&c->last_job_cond);
00184     av_free(c->workers);
00185     av_freep(&avctx->thread_opaque);
00186 }
00187 
00188 static int avcodec_thread_execute(AVCodecContext *avctx, action_func* func, void *arg, int *ret, int job_count, int job_size)
00189 {
00190     ThreadContext *c= avctx->thread_opaque;
00191     int dummy_ret;
00192 
00193     if (!(avctx->active_thread_type&FF_THREAD_SLICE) || avctx->thread_count <= 1)
00194         return avcodec_default_execute(avctx, func, arg, ret, job_count, job_size);
00195 
00196     if (job_count <= 0)
00197         return 0;
00198 
00199     pthread_mutex_lock(&c->current_job_lock);
00200 
00201     c->current_job = avctx->thread_count;
00202     c->job_count = job_count;
00203     c->job_size = job_size;
00204     c->args = arg;
00205     c->func = func;
00206     if (ret) {
00207         c->rets = ret;
00208         c->rets_count = job_count;
00209     } else {
00210         c->rets = &dummy_ret;
00211         c->rets_count = 1;
00212     }
00213     pthread_cond_broadcast(&c->current_job_cond);
00214 
00215     avcodec_thread_park_workers(c, avctx->thread_count);
00216 
00217     return 0;
00218 }
00219 
00220 static int avcodec_thread_execute2(AVCodecContext *avctx, action_func2* func2, void *arg, int *ret, int job_count)
00221 {
00222     ThreadContext *c= avctx->thread_opaque;
00223     c->func2 = func2;
00224     return avcodec_thread_execute(avctx, NULL, arg, ret, job_count, 0);
00225 }
00226 
00227 static int thread_init(AVCodecContext *avctx)
00228 {
00229     int i;
00230     ThreadContext *c;
00231     int thread_count = avctx->thread_count;
00232 
00233     if (thread_count <= 1)
00234         return 0;
00235 
00236     c = av_mallocz(sizeof(ThreadContext));
00237     if (!c)
00238         return -1;
00239 
00240     c->workers = av_mallocz(sizeof(pthread_t)*thread_count);
00241     if (!c->workers) {
00242         av_free(c);
00243         return -1;
00244     }
00245 
00246     avctx->thread_opaque = c;
00247     c->current_job = 0;
00248     c->job_count = 0;
00249     c->job_size = 0;
00250     c->done = 0;
00251     pthread_cond_init(&c->current_job_cond, NULL);
00252     pthread_cond_init(&c->last_job_cond, NULL);
00253     pthread_mutex_init(&c->current_job_lock, NULL);
00254     pthread_mutex_lock(&c->current_job_lock);
00255     for (i=0; i<thread_count; i++) {
00256         if(pthread_create(&c->workers[i], NULL, worker, avctx)) {
00257            avctx->thread_count = i;
00258            pthread_mutex_unlock(&c->current_job_lock);
00259            ff_thread_free(avctx);
00260            return -1;
00261         }
00262     }
00263 
00264     avcodec_thread_park_workers(c, thread_count);
00265 
00266     avctx->execute = avcodec_thread_execute;
00267     avctx->execute2 = avcodec_thread_execute2;
00268     return 0;
00269 }
00270 
00278 static attribute_align_arg void *frame_worker_thread(void *arg)
00279 {
00280     PerThreadContext *p = arg;
00281     FrameThreadContext *fctx = p->parent;
00282     AVCodecContext *avctx = p->avctx;
00283     AVCodec *codec = avctx->codec;
00284 
00285     while (1) {
00286         if (p->state == STATE_INPUT_READY && !fctx->die) {
00287             pthread_mutex_lock(&p->mutex);
00288             while (p->state == STATE_INPUT_READY && !fctx->die)
00289                 pthread_cond_wait(&p->input_cond, &p->mutex);
00290             pthread_mutex_unlock(&p->mutex);
00291         }
00292 
00293         if (fctx->die) break;
00294 
00295         if (!codec->update_thread_context && avctx->thread_safe_callbacks)
00296             ff_thread_finish_setup(avctx);
00297 
00298         pthread_mutex_lock(&p->mutex);
00299         avcodec_get_frame_defaults(&p->frame);
00300         p->got_frame = 0;
00301         p->result = codec->decode(avctx, &p->frame, &p->got_frame, &p->avpkt);
00302 
00303         if (p->state == STATE_SETTING_UP) ff_thread_finish_setup(avctx);
00304 
00305         p->state = STATE_INPUT_READY;
00306 
00307         pthread_mutex_lock(&p->progress_mutex);
00308         pthread_cond_signal(&p->output_cond);
00309         pthread_mutex_unlock(&p->progress_mutex);
00310 
00311         pthread_mutex_unlock(&p->mutex);
00312     }
00313 
00314     return NULL;
00315 }
00316 
00324 static int update_context_from_thread(AVCodecContext *dst, AVCodecContext *src, int for_user)
00325 {
00326     int err = 0;
00327 
00328     if (dst != src) {
00329         dst->sub_id    = src->sub_id;
00330         dst->time_base = src->time_base;
00331         dst->width     = src->width;
00332         dst->height    = src->height;
00333         dst->pix_fmt   = src->pix_fmt;
00334 
00335         dst->coded_width  = src->coded_width;
00336         dst->coded_height = src->coded_height;
00337 
00338         dst->has_b_frames = src->has_b_frames;
00339         dst->idct_algo    = src->idct_algo;
00340         dst->slice_count  = src->slice_count;
00341 
00342         dst->bits_per_coded_sample = src->bits_per_coded_sample;
00343         dst->sample_aspect_ratio   = src->sample_aspect_ratio;
00344         dst->dtg_active_format     = src->dtg_active_format;
00345 
00346         dst->profile = src->profile;
00347         dst->level   = src->level;
00348 
00349         dst->bits_per_raw_sample = src->bits_per_raw_sample;
00350         dst->ticks_per_frame     = src->ticks_per_frame;
00351         dst->color_primaries     = src->color_primaries;
00352 
00353         dst->color_trc   = src->color_trc;
00354         dst->colorspace  = src->colorspace;
00355         dst->color_range = src->color_range;
00356         dst->chroma_sample_location = src->chroma_sample_location;
00357     }
00358 
00359     if (for_user) {
00360         dst->coded_frame   = src->coded_frame;
00361         dst->has_b_frames += src->thread_count - 1;
00362     } else {
00363         if (dst->codec->update_thread_context)
00364             err = dst->codec->update_thread_context(dst, src);
00365     }
00366 
00367     return err;
00368 }
00369 
00376 static void update_context_from_user(AVCodecContext *dst, AVCodecContext *src)
00377 {
00378 #define copy_fields(s, e) memcpy(&dst->s, &src->s, (char*)&dst->e - (char*)&dst->s);
00379     dst->flags          = src->flags;
00380 
00381     dst->draw_horiz_band= src->draw_horiz_band;
00382     dst->get_buffer     = src->get_buffer;
00383     dst->release_buffer = src->release_buffer;
00384 
00385     dst->opaque   = src->opaque;
00386 #if FF_API_HURRY_UP
00387     dst->hurry_up = src->hurry_up;
00388 #endif
00389     dst->dsp_mask = src->dsp_mask;
00390     dst->debug    = src->debug;
00391     dst->debug_mv = src->debug_mv;
00392 
00393     dst->slice_flags = src->slice_flags;
00394     dst->flags2      = src->flags2;
00395 
00396     copy_fields(skip_loop_filter, bidir_refine);
00397 
00398     dst->frame_number     = src->frame_number;
00399     dst->reordered_opaque = src->reordered_opaque;
00400 #undef copy_fields
00401 }
00402 
00403 static void free_progress(AVFrame *f)
00404 {
00405     PerThreadContext *p = f->owner->thread_opaque;
00406     int *progress = f->thread_opaque;
00407 
00408     p->progress_used[(progress - p->progress[0]) / 2] = 0;
00409 }
00410 
00412 static void release_delayed_buffers(PerThreadContext *p)
00413 {
00414     FrameThreadContext *fctx = p->parent;
00415 
00416     while (p->num_released_buffers > 0) {
00417         AVFrame *f;
00418 
00419         pthread_mutex_lock(&fctx->buffer_mutex);
00420         f = &p->released_buffers[--p->num_released_buffers];
00421         free_progress(f);
00422         f->thread_opaque = NULL;
00423 
00424         f->owner->release_buffer(f->owner, f);
00425         pthread_mutex_unlock(&fctx->buffer_mutex);
00426     }
00427 }
00428 
00429 static int submit_packet(PerThreadContext *p, AVPacket *avpkt)
00430 {
00431     FrameThreadContext *fctx = p->parent;
00432     PerThreadContext *prev_thread = fctx->prev_thread;
00433     AVCodec *codec = p->avctx->codec;
00434     uint8_t *buf = p->avpkt.data;
00435 
00436     if (!avpkt->size && !(codec->capabilities & CODEC_CAP_DELAY)) return 0;
00437 
00438     pthread_mutex_lock(&p->mutex);
00439 
00440     release_delayed_buffers(p);
00441 
00442     if (prev_thread) {
00443         int err;
00444         if (prev_thread->state == STATE_SETTING_UP) {
00445             pthread_mutex_lock(&prev_thread->progress_mutex);
00446             while (prev_thread->state == STATE_SETTING_UP)
00447                 pthread_cond_wait(&prev_thread->progress_cond, &prev_thread->progress_mutex);
00448             pthread_mutex_unlock(&prev_thread->progress_mutex);
00449         }
00450 
00451         err = update_context_from_thread(p->avctx, prev_thread->avctx, 0);
00452         if (err) {
00453             pthread_mutex_unlock(&p->mutex);
00454             return err;
00455         }
00456     }
00457 
00458     av_fast_malloc(&buf, &p->allocated_buf_size, avpkt->size + FF_INPUT_BUFFER_PADDING_SIZE);
00459     p->avpkt = *avpkt;
00460     p->avpkt.data = buf;
00461     memcpy(buf, avpkt->data, avpkt->size);
00462     memset(buf + avpkt->size, 0, FF_INPUT_BUFFER_PADDING_SIZE);
00463 
00464     p->state = STATE_SETTING_UP;
00465     pthread_cond_signal(&p->input_cond);
00466     pthread_mutex_unlock(&p->mutex);
00467 
00468     /*
00469      * If the client doesn't have a thread-safe get_buffer(),
00470      * then decoding threads call back to the main thread,
00471      * and it calls back to the client here.
00472      */
00473 
00474     if (!p->avctx->thread_safe_callbacks &&
00475          p->avctx->get_buffer != avcodec_default_get_buffer) {
00476         while (p->state != STATE_SETUP_FINISHED && p->state != STATE_INPUT_READY) {
00477             pthread_mutex_lock(&p->progress_mutex);
00478             while (p->state == STATE_SETTING_UP)
00479                 pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00480 
00481             if (p->state == STATE_GET_BUFFER) {
00482                 p->result = p->avctx->get_buffer(p->avctx, p->requested_frame);
00483                 p->state  = STATE_SETTING_UP;
00484                 pthread_cond_signal(&p->progress_cond);
00485             }
00486             pthread_mutex_unlock(&p->progress_mutex);
00487         }
00488     }
00489 
00490     fctx->prev_thread = p;
00491 
00492     return 0;
00493 }
00494 
00495 int ff_thread_decode_frame(AVCodecContext *avctx,
00496                            AVFrame *picture, int *got_picture_ptr,
00497                            AVPacket *avpkt)
00498 {
00499     FrameThreadContext *fctx = avctx->thread_opaque;
00500     int finished = fctx->next_finished;
00501     PerThreadContext *p;
00502     int err;
00503 
00504     /*
00505      * Submit a packet to the next decoding thread.
00506      */
00507 
00508     p = &fctx->threads[fctx->next_decoding];
00509     update_context_from_user(p->avctx, avctx);
00510     err = submit_packet(p, avpkt);
00511     if (err) return err;
00512 
00513     fctx->next_decoding++;
00514 
00515     /*
00516      * If we're still receiving the initial packets, don't return a frame.
00517      */
00518 
00519     if (fctx->delaying && avpkt->size) {
00520         if (fctx->next_decoding >= (avctx->thread_count-1)) fctx->delaying = 0;
00521 
00522         *got_picture_ptr=0;
00523         return 0;
00524     }
00525 
00526     /*
00527      * Return the next available frame from the oldest thread.
00528      * If we're at the end of the stream, then we have to skip threads that
00529      * didn't output a frame, because we don't want to accidentally signal
00530      * EOF (avpkt->size == 0 && *got_picture_ptr == 0).
00531      */
00532 
00533     do {
00534         p = &fctx->threads[finished++];
00535 
00536         if (p->state != STATE_INPUT_READY) {
00537             pthread_mutex_lock(&p->progress_mutex);
00538             while (p->state != STATE_INPUT_READY)
00539                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
00540             pthread_mutex_unlock(&p->progress_mutex);
00541         }
00542 
00543         *picture = p->frame;
00544         *got_picture_ptr = p->got_frame;
00545         picture->pkt_dts = p->avpkt.dts;
00546 
00547         /*
00548          * A later call with avkpt->size == 0 may loop over all threads,
00549          * including this one, searching for a frame to return before being
00550          * stopped by the "finished != fctx->next_finished" condition.
00551          * Make sure we don't mistakenly return the same frame again.
00552          */
00553         p->got_frame = 0;
00554 
00555         if (finished >= avctx->thread_count) finished = 0;
00556     } while (!avpkt->size && !*got_picture_ptr && finished != fctx->next_finished);
00557 
00558     update_context_from_thread(avctx, p->avctx, 1);
00559 
00560     if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
00561 
00562     fctx->next_finished = finished;
00563 
00564     return p->result;
00565 }
00566 
00567 void ff_thread_report_progress(AVFrame *f, int n, int field)
00568 {
00569     PerThreadContext *p;
00570     int *progress = f->thread_opaque;
00571 
00572     if (!progress || progress[field] >= n) return;
00573 
00574     p = f->owner->thread_opaque;
00575 
00576     if (f->owner->debug&FF_DEBUG_THREADS)
00577         av_log(f->owner, AV_LOG_DEBUG, "%p finished %d field %d\n", progress, n, field);
00578 
00579     pthread_mutex_lock(&p->progress_mutex);
00580     progress[field] = n;
00581     pthread_cond_broadcast(&p->progress_cond);
00582     pthread_mutex_unlock(&p->progress_mutex);
00583 }
00584 
00585 void ff_thread_await_progress(AVFrame *f, int n, int field)
00586 {
00587     PerThreadContext *p;
00588     int *progress = f->thread_opaque;
00589 
00590     if (!progress || progress[field] >= n) return;
00591 
00592     p = f->owner->thread_opaque;
00593 
00594     if (f->owner->debug&FF_DEBUG_THREADS)
00595         av_log(f->owner, AV_LOG_DEBUG, "thread awaiting %d field %d from %p\n", n, field, progress);
00596 
00597     pthread_mutex_lock(&p->progress_mutex);
00598     while (progress[field] < n)
00599         pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00600     pthread_mutex_unlock(&p->progress_mutex);
00601 }
00602 
00603 void ff_thread_finish_setup(AVCodecContext *avctx) {
00604     PerThreadContext *p = avctx->thread_opaque;
00605 
00606     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) return;
00607 
00608     pthread_mutex_lock(&p->progress_mutex);
00609     p->state = STATE_SETUP_FINISHED;
00610     pthread_cond_broadcast(&p->progress_cond);
00611     pthread_mutex_unlock(&p->progress_mutex);
00612 }
00613 
00615 static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count)
00616 {
00617     int i;
00618 
00619     for (i = 0; i < thread_count; i++) {
00620         PerThreadContext *p = &fctx->threads[i];
00621 
00622         if (p->state != STATE_INPUT_READY) {
00623             pthread_mutex_lock(&p->progress_mutex);
00624             while (p->state != STATE_INPUT_READY)
00625                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
00626             pthread_mutex_unlock(&p->progress_mutex);
00627         }
00628     }
00629 }
00630 
00631 static void frame_thread_free(AVCodecContext *avctx, int thread_count)
00632 {
00633     FrameThreadContext *fctx = avctx->thread_opaque;
00634     AVCodec *codec = avctx->codec;
00635     int i;
00636 
00637     park_frame_worker_threads(fctx, thread_count);
00638 
00639     if (fctx->prev_thread)
00640         update_context_from_thread(fctx->threads->avctx, fctx->prev_thread->avctx, 0);
00641 
00642     fctx->die = 1;
00643 
00644     for (i = 0; i < thread_count; i++) {
00645         PerThreadContext *p = &fctx->threads[i];
00646 
00647         pthread_mutex_lock(&p->mutex);
00648         pthread_cond_signal(&p->input_cond);
00649         pthread_mutex_unlock(&p->mutex);
00650 
00651         pthread_join(p->thread, NULL);
00652 
00653         if (codec->close)
00654             codec->close(p->avctx);
00655 
00656         avctx->codec = NULL;
00657 
00658         release_delayed_buffers(p);
00659     }
00660 
00661     for (i = 0; i < thread_count; i++) {
00662         PerThreadContext *p = &fctx->threads[i];
00663 
00664         avcodec_default_free_buffers(p->avctx);
00665 
00666         pthread_mutex_destroy(&p->mutex);
00667         pthread_mutex_destroy(&p->progress_mutex);
00668         pthread_cond_destroy(&p->input_cond);
00669         pthread_cond_destroy(&p->progress_cond);
00670         pthread_cond_destroy(&p->output_cond);
00671         av_freep(&p->avpkt.data);
00672 
00673         if (i)
00674             av_freep(&p->avctx->priv_data);
00675 
00676         av_freep(&p->avctx);
00677     }
00678 
00679     av_freep(&fctx->threads);
00680     pthread_mutex_destroy(&fctx->buffer_mutex);
00681     av_freep(&avctx->thread_opaque);
00682 }
00683 
00684 static int frame_thread_init(AVCodecContext *avctx)
00685 {
00686     int thread_count = avctx->thread_count;
00687     AVCodec *codec = avctx->codec;
00688     AVCodecContext *src = avctx;
00689     FrameThreadContext *fctx;
00690     int i, err = 0;
00691 
00692     if (thread_count <= 1) {
00693         avctx->active_thread_type = 0;
00694         return 0;
00695     }
00696 
00697     avctx->thread_opaque = fctx = av_mallocz(sizeof(FrameThreadContext));
00698 
00699     fctx->threads = av_mallocz(sizeof(PerThreadContext) * thread_count);
00700     pthread_mutex_init(&fctx->buffer_mutex, NULL);
00701     fctx->delaying = 1;
00702 
00703     for (i = 0; i < thread_count; i++) {
00704         AVCodecContext *copy = av_malloc(sizeof(AVCodecContext));
00705         PerThreadContext *p  = &fctx->threads[i];
00706 
00707         pthread_mutex_init(&p->mutex, NULL);
00708         pthread_mutex_init(&p->progress_mutex, NULL);
00709         pthread_cond_init(&p->input_cond, NULL);
00710         pthread_cond_init(&p->progress_cond, NULL);
00711         pthread_cond_init(&p->output_cond, NULL);
00712 
00713         p->parent = fctx;
00714         p->avctx  = copy;
00715 
00716         *copy = *src;
00717         copy->thread_opaque = p;
00718         copy->pkt = &p->avpkt;
00719 
00720         if (!i) {
00721             src = copy;
00722 
00723             if (codec->init)
00724                 err = codec->init(copy);
00725 
00726             update_context_from_thread(avctx, copy, 1);
00727         } else {
00728             copy->is_copy   = 1;
00729             copy->priv_data = av_malloc(codec->priv_data_size);
00730             memcpy(copy->priv_data, src->priv_data, codec->priv_data_size);
00731 
00732             if (codec->init_thread_copy)
00733                 err = codec->init_thread_copy(copy);
00734         }
00735 
00736         if (err) goto error;
00737 
00738         pthread_create(&p->thread, NULL, frame_worker_thread, p);
00739     }
00740 
00741     return 0;
00742 
00743 error:
00744     frame_thread_free(avctx, i+1);
00745 
00746     return err;
00747 }
00748 
00749 void ff_thread_flush(AVCodecContext *avctx)
00750 {
00751     FrameThreadContext *fctx = avctx->thread_opaque;
00752 
00753     if (!avctx->thread_opaque) return;
00754 
00755     park_frame_worker_threads(fctx, avctx->thread_count);
00756     if (fctx->prev_thread) {
00757         if (fctx->prev_thread != &fctx->threads[0])
00758             update_context_from_thread(fctx->threads[0].avctx, fctx->prev_thread->avctx, 0);
00759         if (avctx->codec->flush)
00760             avctx->codec->flush(fctx->threads[0].avctx);
00761     }
00762 
00763     fctx->next_decoding = fctx->next_finished = 0;
00764     fctx->delaying = 1;
00765     fctx->prev_thread = NULL;
00766 }
00767 
00768 static int *allocate_progress(PerThreadContext *p)
00769 {
00770     int i;
00771 
00772     for (i = 0; i < MAX_BUFFERS; i++)
00773         if (!p->progress_used[i]) break;
00774 
00775     if (i == MAX_BUFFERS) {
00776         av_log(p->avctx, AV_LOG_ERROR, "allocate_progress() overflow\n");
00777         return NULL;
00778     }
00779 
00780     p->progress_used[i] = 1;
00781 
00782     return p->progress[i];
00783 }
00784 
00785 int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f)
00786 {
00787     PerThreadContext *p = avctx->thread_opaque;
00788     int *progress, err;
00789 
00790     f->owner = avctx;
00791 
00792     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) {
00793         f->thread_opaque = NULL;
00794         return avctx->get_buffer(avctx, f);
00795     }
00796 
00797     if (p->state != STATE_SETTING_UP &&
00798         (avctx->codec->update_thread_context || !avctx->thread_safe_callbacks)) {
00799         av_log(avctx, AV_LOG_ERROR, "get_buffer() cannot be called after ff_thread_finish_setup()\n");
00800         return -1;
00801     }
00802 
00803     pthread_mutex_lock(&p->parent->buffer_mutex);
00804     f->thread_opaque = progress = allocate_progress(p);
00805 
00806     if (!progress) {
00807         pthread_mutex_unlock(&p->parent->buffer_mutex);
00808         return -1;
00809     }
00810 
00811     progress[0] =
00812     progress[1] = -1;
00813 
00814     if (avctx->thread_safe_callbacks ||
00815         avctx->get_buffer == avcodec_default_get_buffer) {
00816         err = avctx->get_buffer(avctx, f);
00817     } else {
00818         p->requested_frame = f;
00819         p->state = STATE_GET_BUFFER;
00820         pthread_mutex_lock(&p->progress_mutex);
00821         pthread_cond_signal(&p->progress_cond);
00822 
00823         while (p->state != STATE_SETTING_UP)
00824             pthread_cond_wait(&p->progress_cond, &p->progress_mutex);
00825 
00826         err = p->result;
00827 
00828         pthread_mutex_unlock(&p->progress_mutex);
00829 
00830         if (!avctx->codec->update_thread_context)
00831             ff_thread_finish_setup(avctx);
00832     }
00833 
00834     pthread_mutex_unlock(&p->parent->buffer_mutex);
00835 
00836     /*
00837      * Buffer age is difficult to keep track of between
00838      * multiple threads, and the optimizations it allows
00839      * are not worth the effort. It is disabled for now.
00840      */
00841     f->age = INT_MAX;
00842 
00843     return err;
00844 }
00845 
00846 void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f)
00847 {
00848     PerThreadContext *p = avctx->thread_opaque;
00849     FrameThreadContext *fctx;
00850 
00851     if (!(avctx->active_thread_type&FF_THREAD_FRAME)) {
00852         avctx->release_buffer(avctx, f);
00853         return;
00854     }
00855 
00856     if (p->num_released_buffers >= MAX_BUFFERS) {
00857         av_log(p->avctx, AV_LOG_ERROR, "too many thread_release_buffer calls!\n");
00858         return;
00859     }
00860 
00861     if(avctx->debug & FF_DEBUG_BUFFERS)
00862         av_log(avctx, AV_LOG_DEBUG, "thread_release_buffer called on pic %p, %d buffers used\n",
00863                                     f, f->owner->internal_buffer_count);
00864 
00865     fctx = p->parent;
00866     pthread_mutex_lock(&fctx->buffer_mutex);
00867     p->released_buffers[p->num_released_buffers++] = *f;
00868     pthread_mutex_unlock(&fctx->buffer_mutex);
00869     memset(f->data, 0, sizeof(f->data));
00870 }
00871 
00881 static void validate_thread_parameters(AVCodecContext *avctx)
00882 {
00883     int frame_threading_supported = (avctx->codec->capabilities & CODEC_CAP_FRAME_THREADS)
00884                                 && !(avctx->flags & CODEC_FLAG_TRUNCATED)
00885                                 && !(avctx->flags & CODEC_FLAG_LOW_DELAY)
00886                                 && !(avctx->flags2 & CODEC_FLAG2_CHUNKS);
00887     if (avctx->thread_count == 1) {
00888         avctx->active_thread_type = 0;
00889     } else if (frame_threading_supported && (avctx->thread_type & FF_THREAD_FRAME)) {
00890         avctx->active_thread_type = FF_THREAD_FRAME;
00891     } else if (avctx->codec->capabilities & CODEC_CAP_SLICE_THREADS &&
00892                avctx->thread_type & FF_THREAD_SLICE) {
00893         avctx->active_thread_type = FF_THREAD_SLICE;
00894     }
00895 }
00896 
00897 int ff_thread_init(AVCodecContext *avctx)
00898 {
00899     if (avctx->thread_opaque) {
00900         av_log(avctx, AV_LOG_ERROR, "avcodec_thread_init is ignored after avcodec_open\n");
00901         return -1;
00902     }
00903 
00904     if (avctx->codec) {
00905         validate_thread_parameters(avctx);
00906 
00907         if (avctx->active_thread_type&FF_THREAD_SLICE)
00908             return thread_init(avctx);
00909         else if (avctx->active_thread_type&FF_THREAD_FRAME)
00910             return frame_thread_init(avctx);
00911     }
00912 
00913     return 0;
00914 }
00915 
00916 void ff_thread_free(AVCodecContext *avctx)
00917 {
00918     if (avctx->active_thread_type&FF_THREAD_FRAME)
00919         frame_thread_free(avctx, avctx->thread_count);
00920     else
00921         thread_free(avctx);
00922 }

Generated on Fri Feb 22 2013 07:24:28 for FFmpeg by  doxygen 1.7.1