@@ -164,7 +164,12 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx,
FifoElem elem;
unsigned int nb_finished = 0;
- if (av_fifo_read(tq->fifo, &elem, 1) >= 0) {
+ while (av_fifo_read(tq->fifo, &elem, 1) >= 0) {
+ if (tq->finished[elem.stream_idx] & FINISHED_RECV) {
+ objpool_release(tq->obj_pool, &elem.obj);
+ continue;
+ }
+
tq->obj_move(data, elem.obj);
objpool_release(tq->obj_pool, &elem.obj);
*stream_idx = elem.stream_idx;
@@ -197,7 +202,14 @@ int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
pthread_mutex_lock(&tq->lock);
while (1) {
+ size_t can_read = av_fifo_can_read(tq->fifo);
+
ret = receive_locked(tq, stream_idx, data);
+
+ // signal other threads if the fifo state changed
+ if (can_read != av_fifo_can_read(tq->fifo))
+ pthread_cond_broadcast(&tq->cond);
+
if (ret == AVERROR(EAGAIN)) {
pthread_cond_wait(&tq->cond, &tq->lock);
continue;
@@ -206,9 +218,6 @@ int tq_receive(ThreadQueue *tq, int *stream_idx, void *data)
break;
}
- if (ret == 0)
- pthread_cond_broadcast(&tq->cond);
-
pthread_mutex_unlock(&tq->lock);
return ret;