diff mbox series

[FFmpeg-devel,13/13,v3] fftools/ffmpeg: convert to a threaded architecture

Message ID 20231201111621.10989-1-anton@khirnov.net
State New
Headers show
Series None | expand

Commit Message

Anton Khirnov Dec. 1, 2023, 11:15 a.m. UTC
Change the main loop and every component (demuxers, decoders, filters,
encoders, muxers) to use the previously added transcode scheduler. Every
instance of every such component was already running in a separate
thread, but now they can actually run in parallel.

Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
JEEB to be more correct and deterministic.
---
Fixed the hang. Also updated the public branch.
---
 fftools/ffmpeg.c                              | 374 +--------
 fftools/ffmpeg.h                              |  97 +--
 fftools/ffmpeg_dec.c                          | 321 ++------
 fftools/ffmpeg_demux.c                        | 268 ++++---
 fftools/ffmpeg_enc.c                          | 368 ++-------
 fftools/ffmpeg_filter.c                       | 722 +++++-------------
 fftools/ffmpeg_mux.c                          | 324 ++------
 fftools/ffmpeg_mux.h                          |  24 +-
 fftools/ffmpeg_mux_init.c                     |  88 +--
 fftools/ffmpeg_opt.c                          |   6 +-
 .../fate/ffmpeg-fix_sub_duration_heartbeat    |  36 +-
 11 files changed, 598 insertions(+), 2030 deletions(-)

Comments

Nicolas George Dec. 1, 2023, 2:24 p.m. UTC | #1
Anton Khirnov (12023-12-01):
> Change the main loop and every component (demuxers, decoders, filters,
> encoders, muxers) to use the previously added transcode scheduler. Every
> instance of every such component was already running in a separate
> thread, but now they can actually run in parallel.
> 
> Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
> JEEB to be more correct and deterministic.
> ---
> Fixed the hang. Also updated the public branch.

Still breaking sub2video with time shift, see the test case I already
sent twice.
Anton Khirnov Dec. 1, 2023, 2:27 p.m. UTC | #2
Quoting Nicolas George (2023-12-01 15:24:52)
> Anton Khirnov (12023-12-01):
> > Change the main loop and every component (demuxers, decoders, filters,
> > encoders, muxers) to use the previously added transcode scheduler. Every
> > instance of every such component was already running in a separate
> > thread, but now they can actually run in parallel.
> > 
> > Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by
> > JEEB to be more correct and deterministic.
> > ---
> > Fixed the hang. Also updated the public branch.
> 
> Still breaking sub2video with time shift, see the test case I already
> sent twice.

See my email from wednesday, it's not actually broken.
Nicolas George Dec. 1, 2023, 2:42 p.m. UTC | #3
Anton Khirnov (12023-12-01):
> See my email from wednesday, it's not actually broken.

I do not have a mail from you from Wednesday. When something succeeds
with the current code and fails with “Error while add the frame to
buffer source(Cannot allocate memory)”, that is broken.
Anton Khirnov Dec. 1, 2023, 2:46 p.m. UTC | #4
Quoting Nicolas George (2023-12-01 15:42:38)
> Anton Khirnov (12023-12-01):
> > See my email from wednesday, it's not actually broken.
> 
> I do not have a mail from you from Wednesday.

http://lists.ffmpeg.org/pipermail/ffmpeg-devel/2023-November/317536.html

> When something succeeds with the current code and fails with “Error
> while add the frame to buffer source(Cannot allocate memory)”, that is
> broken.

Not necessarily, when the current code is broken (which you agreed with
in the last thread).
Nicolas George Dec. 1, 2023, 2:50 p.m. UTC | #5
Anton Khirnov (12023-12-01):
> > When something succeeds with the current code and fails with “Error
> > while add the frame to buffer source(Cannot allocate memory)”, that is
> > broken.
> Not necessarily, when the current code is broken (which you agreed with
> in the last thread).

I do not know what you are talking about, I agreed to no such thing.

The test case I gave you is correct and with the current code it works.
Your changes breaks it, removing an important features for users. Please
include it in your testing routine and only submit again when you have
fixed it.
Anton Khirnov Dec. 1, 2023, 2:58 p.m. UTC | #6
Quoting Nicolas George (2023-12-01 15:50:39)
> Anton Khirnov (12023-12-01):
> > > When something succeeds with the current code and fails with “Error
> > > while add the frame to buffer source(Cannot allocate memory)”, that is
> > > broken.
> > Not necessarily, when the current code is broken (which you agreed with
> > in the last thread).
> 
> I do not know what you are talking about, I agreed to no such thing.

http://lists.ffmpeg.org/pipermail/ffmpeg-devel/2023-November/316787.html

The current code is broken because its output depends on the order in
which the frames from different inputs arrive at the filtergraph. It
just so happens that it is deterministically broken currently. After
this patchset it becomes non-deterministically broken, which forces me
to do something about it.

> The test case I gave you is correct and with the current code it works.
> Your changes breaks it, removing an important features for users. Please
> include it in your testing routine and only submit again when you have
> fixed it.

Your testcase offsets two streams by 60 seconds. That implies 60 seconds
of buffering. You would get this same amount of bufering in the muxer if
you did the same offsetting with transcoding or remuxing two streams
from the same source.
One can also avoid this buffering entirely by simply opening the file
twice.

So I don't think your demand is reasonable, unless you're also
suggesting a specific way of implementing this.
Nicolas George Dec. 1, 2023, 3:25 p.m. UTC | #7
Anton Khirnov (12023-12-01):
> http://lists.ffmpeg.org/pipermail/ffmpeg-devel/2023-November/316787.html

So not Wednesday but Tursday three weeks ago.

I did not agree that the current code was broken.

> The current code is broken because its output depends on the order in
> which the frames from different inputs arrive at the filtergraph. It
> just so happens that it is deterministically broken currently. After
> this patchset it becomes non-deterministically broken, which forces me
> to do something about it.

That is not true. The current code works and gives correct result if the
file is properly muxed: it cannot be said to be broken.

> Your testcase offsets two streams by 60 seconds.

Indeed.

>						   That implies 60 seconds
> of buffering. You would get this same amount of bufering in the muxer if
> you did the same offsetting with transcoding or remuxing two streams
> from the same source.
> One can also avoid this buffering entirely by simply opening the file
> twice.

You are wrong. You would be right if the offset had been in the opposite
direction. But in the case I chose, it is the subtitles stream that is
delayed, and 60 seconds of subtitles means a few dozens frames at most,
not many hundreds.

Your change to the sub2video hearbeat makes it continuous, and turns the
few dozens frames into many hundreds: this is what is breaking.

So I say it again: this test case is useful and currently works, include
it in your test case so that your patch series keeps the feature
working.

I can consider sending a patch to add it to FATE, but not before Monday.

Also, note that in the grand-parent message from the one you quoted
above, I gave you a solution to make it work. You told that it was
already what you did, but obviously it is not, so let us resolve this
misunderstanding.
Anton Khirnov Dec. 1, 2023, 7:49 p.m. UTC | #8
Quoting Nicolas George (2023-12-01 16:25:04)
> Anton Khirnov (12023-12-01):
> > http://lists.ffmpeg.org/pipermail/ffmpeg-devel/2023-November/316787.html
> 
> So not Wednesday but Tursday three weeks ago.

The Wednesday email was the one I linked to two emails ago. Here it is
again:
http://lists.ffmpeg.org/pipermail/ffmpeg-devel/2023-November/317536.html

> I did not agree that the current code was broken.
> 
> > The current code is broken because its output depends on the order in
> > which the frames from different inputs arrive at the filtergraph. It
> > just so happens that it is deterministically broken currently. After
> > this patchset it becomes non-deterministically broken, which forces me
> > to do something about it.
> 
> That is not true. The current code works and gives correct result if the
> file is properly muxed: it cannot be said to be broken.

I can definitely say it is broken and I already told you why. But if you
want something more specific:
* the output of your example with the current master changes depending
  on the number of decoder frame threads; my patch fixes that
* in fate-filter-overlay-dvdsub-2397 subtitles appear two frames too
  early; again, my patch fixes that

> > Your testcase offsets two streams by 60 seconds.
> 
> Indeed.
> 
> >						   That implies 60 seconds
> > of buffering. You would get this same amount of bufering in the muxer if
> > you did the same offsetting with transcoding or remuxing two streams
> > from the same source.
> > One can also avoid this buffering entirely by simply opening the file
> > twice.
> 
> You are wrong. You would be right if the offset had been in the opposite
> direction. But in the case I chose, it is the subtitles stream that is
> delayed, and 60 seconds of subtitles means a few dozens frames at most,
> not many hundreds.
> 
> Your change to the sub2video hearbeat makes it continuous, and turns the
> few dozens frames into many hundreds: this is what is breaking.
> 
> So I say it again: this test case is useful and currently works, include
> it in your test case so that your patch series keeps the feature
> working.
> 
> I can consider sending a patch to add it to FATE, but not before Monday.
> 
> Also, note that in the grand-parent message from the one you quoted
> above, I gave you a solution to make it work. You told that it was
> already what you did, but obviously it is not, so let us resolve this
> misunderstanding.

IIUC your suggestion was to send heartbeat packets from demuxer to
decoder, then have the decoder forward them to filtergraph.

That is EXACTLY what I'm doing in the final patch, see [1]. It also does
not address this problem at all, because it is caused by the heartbeat
processing code making decisions based on
av_buffersrc_get_nb_failed_requests(), which fundamentally depends on
what frames previously arrived on the video input.

[1] https://git.khirnov.net/libav.git/tree/fftools/ffmpeg_demux.c?h=ffmpeg_threading#n527
    https://git.khirnov.net/libav.git/tree/fftools/ffmpeg_dec.c?h=ffmpeg_threading#n406
Nicolas George Dec. 4, 2023, 3:25 p.m. UTC | #9
Anton Khirnov (12023-12-01):
> I can definitely say it is broken and I already told you why. But if you
> want something more specific:
> * the output of your example with the current master changes depending
>   on the number of decoder frame threads; my patch fixes that
> * in fate-filter-overlay-dvdsub-2397 subtitles appear two frames too
>   early; again, my patch fixes that

Ok, some cases are broken. Fine, this is a hard task, some cases are
impossible. That does not allow you to break cases that are currently
working.

> IIUC your suggestion was to send heartbeat packets from demuxer to
> decoder, then have the decoder forward them to filtergraph.
> 
> That is EXACTLY what I'm doing in the final patch, see [1]. It also does
> not address this problem at all, because it is caused by the heartbeat
> processing code making decisions based on
> av_buffersrc_get_nb_failed_requests(), which fundamentally depends on
> what frames previously arrived on the video input.

Then fix it. I have given you a command that currently works and
produces valid output: make sure it still works with your changes. And
that will require sending heartbeat frames only when they are needed,
keeping the current logic in place.
Anton Khirnov Dec. 4, 2023, 4:25 p.m. UTC | #10
Quoting Nicolas George (2023-12-04 16:25:52)
> Anton Khirnov (12023-12-01):
> > I can definitely say it is broken and I already told you why. But if you
> > want something more specific:
> > * the output of your example with the current master changes depending
> >   on the number of decoder frame threads; my patch fixes that
> > * in fate-filter-overlay-dvdsub-2397 subtitles appear two frames too
> >   early; again, my patch fixes that
> 
> Ok, some cases are broken. Fine, this is a hard task, some cases are
> impossible. That does not allow you to break cases that are currently
> working.

Nothing is being broken. Your highly contrived and currently broken
testcase buffers a bounded number of extra frames in order to stop being
broken. If that extra buffering is an actual problem for someone, it can
be easily avoided by opening the file twice.

> > IIUC your suggestion was to send heartbeat packets from demuxer to
> > decoder, then have the decoder forward them to filtergraph.
> > 
> > That is EXACTLY what I'm doing in the final patch, see [1]. It also does
> > not address this problem at all, because it is caused by the heartbeat
> > processing code making decisions based on
> > av_buffersrc_get_nb_failed_requests(), which fundamentally depends on
> > what frames previously arrived on the video input.
> 
> Then fix it. I have given you a command that currently works and
> produces valid output:

As I said before, your command does NOT work. Its output changes
unpredictably depending on unrelated parameters.

> make sure it still works with your changes.

After my changes it actually does work reliably.

I maintain that your demand to "fix" your testcase (i.e. reduce its
memory consumption) is highly unreasonable – unless you specify how
exactly that is supposed to be accomplished while preserving
determinism.
Nicolas George Dec. 4, 2023, 4:37 p.m. UTC | #11
Anton Khirnov (12023-12-04):
> broken. If that extra buffering is an actual problem for someone, it can
> be easily avoided by opening the file twice.

Not a solution if the file is streamed or generated.

> As I said before, your command does NOT work. Its output changes
> unpredictably depending on unrelated parameters.

It still produces correct output in most of the cases, which is what
matters to users.

> I maintain that your demand to "fix" your testcase (i.e. reduce its
> memory consumption) is highly unreasonable –

My demand is not that you REDUCE the memory consumption, my demand is
that you DO NOT INCREASE IT HUNDREDFOLD.

That is a perfectly reasonable demand.

>						unless you specify how
> exactly that is supposed to be accomplished while preserving
> determinism.

Fixing the bugs introduced by threading is the job of the person who
wants to introduce threading. I can offer the help of my expertise about
lavfi and subtitles, of course. But your attitude to just pretend the
problem does not exist is unacceptable.
Anton Khirnov Dec. 4, 2023, 5:07 p.m. UTC | #12
Quoting Nicolas George (2023-12-04 17:37:23)
> Anton Khirnov (12023-12-04):
> > broken. If that extra buffering is an actual problem for someone, it can
> > be easily avoided by opening the file twice.
> 
> Not a solution if the file is streamed or generated.
> 
> > As I said before, your command does NOT work. Its output changes
> > unpredictably depending on unrelated parameters.
> 
> It still produces correct output in most of the cases, which is what
> matters to users.

$ for i in $(seq 1 4 64); do
.   echo -ne "$i\t";
.   ./ffmpeg -v fatal -threads $i -i sub.mkv -preset ultrafast \
.     -lavfi '[0:s]setpts=PTS+60/TB[s] ; [0:v][s]overlay' \
.     -bitexact -y -f matroska md5:
. done
1       287e929cba6c67c6ce8e35954548048d
5       7e234d83cca90d4b0ee4d563e21a8bd8
9       abfd093a36b1661db022616d97c45fad
13      f6be9d63a7dce69cd16581895923b196
17      2c7144c01f6294e65e305f602b58a718
21      99564d81199a1f453ccff24ac2db7eac
25      02ec0aadb03a0205ccacb4c873ee9ad9
29      76643205916887fd444525ea8bb610fb
33      6267a2baeb1554bc5f219890ac8b37aa
37      f955daadf62c91ddce5705561e32804f
41      523c61ebad03b6ba297812cb7939b679
45      96c48966f459442707f29f854aa59c3b
49      ad92b3f79c7bb31fdc272b2a81985334
53      362a7d6c0a681a049adf4802e0dc8771
57      1529ffd4f487ea13b9a37f4f1b9879eb
61      53db192ac534d348b3ed63fdcbac945a

Which of these are you saying is correct?

> > I maintain that your demand to "fix" your testcase (i.e. reduce its
> > memory consumption) is highly unreasonable –
> 
> My demand is not that you REDUCE the memory consumption, my demand is
> that you DO NOT INCREASE IT HUNDREDFOLD.
> 
> That is a perfectly reasonable demand.
> 
> >						unless you specify how
> > exactly that is supposed to be accomplished while preserving
> > determinism.
> 
> Fixing the bugs introduced by threading

The only bug that's been established to exist so far is in your
heartbeat code, which produces random output as per above.

Buffering is by itself not a bug, otherwise you'd have to say the lavf
interleaving queue is a bug.

So for the last time - either suggest a specific and practical way of
reducing memory consumption or stop interfering with my work.
Nicolas George Dec. 6, 2023, 12:55 p.m. UTC | #13
Anton Khirnov (12023-12-04):
> Which of these are you saying is correct?

I do not know? Do you think I am able to reverse MD5 mentally? I am
flattered, but I am sorry to confess I am not.

Why do you not look at the resulting videos to judge for yourself? But
to do that, you will need to remember (or learn two things):

First, most people do not have that many CPU threads available, and if
they do they will spend them on encoding more than decoding.

Second, and most important: for subtitles, in many many cases, a few
frames of shift do not matter because the timing in the source material
is not that accurate.

So the answer to your question is: probably most of the ones generated
with a sane number of threads are correct, in the sense that the result
is within the acceptable accuracy of subtitles sync and useful for the
user.

Of course, if the use case is one where perfect accuracy is necessary,
users need to revert to a slower and more bulky procedure (like you
suggested: open the file twice, which might require storing it entirely)
to get it.

So really, what you pretend is not breaking anything is really removing
one of the options currently available to users in the compromise
between speed, latency and accuracy.

So I demand you stop pretending you are not breaking anything, stop
pretending it is currently broken, just so you can move forward without
bothering to search for a solution: that starts to feels like laziness
and it always felt like rudeness because I spend a lot of effort in
getting this to work in the cases where it can.

> The only bug that's been established to exist so far is in your
> heartbeat code, which produces random output as per above.

As I explained many times, this is not a bug.

> Buffering is by itself not a bug, otherwise you'd have to say the lavf
> interleaving queue is a bug.

Once again, buffering thousands of frames and crashing because out of
memory when the current code succeeds and produces an useful result is a
regression and the patch series cannot be applied until that regression
is fixed.

> So for the last time - either suggest a specific and practical way of
> reducing memory consumption or stop interfering with my work.

The specific and practical way is to let the current logic in place.
There might be a few tweaks to make it more accurate, like looking into
this comment:

    /* subtitles seem to be usually muxed ahead of other streams;
       if not, subtracting a larger time here is necessary */
    pts2 = av_rescale_q(pts, tb, ifp->time_base) - 1;

But first, we need you to stop behaving as if my previous efforts did
not mater just because it does not overlap with your narrow use cases.
James Almer Dec. 6, 2023, 1:21 p.m. UTC | #14
On 12/6/2023 9:55 AM, Nicolas George wrote:
> Anton Khirnov (12023-12-04):
>> Which of these are you saying is correct?
> 
> I do not know? Do you think I am able to reverse MD5 mentally? I am
> flattered, but I am sorry to confess I am not.
> 
> Why do you not look at the resulting videos to judge for yourself? But

I honestly can't believe you're arguing this. At this point you're just 
being defensive of your position without really taking into account what 
you were challenged with.

> to do that, you will need to remember (or learn two things):

And being condescending will not help your case.

> 
> First, most people do not have that many CPU threads available, and if
> they do they will spend them on encoding more than decoding.
> 
> Second, and most important: for subtitles, in many many cases, a few
> frames of shift do not matter because the timing in the source material
> is not that accurate.
> 
> So the answer to your question is: probably most of the ones generated
> with a sane number of threads are correct, in the sense that the result
> is within the acceptable accuracy of subtitles sync and useful for the
> user.

How can you argue it's fine when you request bitexact output and do NOT 
get bitexact output? Go ahead and add that command line as a FATE test. 
See the runners turn yellow. Will you argue it's fine and not broken?

Number of threads should not matter, the output has to be deterministic. 
Saying "Maybe the user likes what he gets. Varying amount of artifacts 
here and there or a few frames of shift here and there of difference 
between runs. It's fine!" is laughable.

> 
> Of course, if the use case is one where perfect accuracy is necessary,
> users need to revert to a slower and more bulky procedure (like you
> suggested: open the file twice, which might require storing it entirely)
> to get it.
> 
> So really, what you pretend is not breaking anything is really removing
> one of the options currently available to users in the compromise
> between speed, latency and accuracy.
> 
> So I demand you stop pretending you are not breaking anything, stop
> pretending it is currently broken, just so you can move forward without
> bothering to search for a solution: that starts to feels like laziness
> and it always felt like rudeness because I spend a lot of effort in
> getting this to work in the cases where it can.
> 
>> The only bug that's been established to exist so far is in your
>> heartbeat code, which produces random output as per above.
> 
> As I explained many times, this is not a bug.

If i request -bitexact, i want bitexact output, regardless of running on 
a core i3 or a Threadripper. There's nothing more to it.

> 
>> Buffering is by itself not a bug, otherwise you'd have to say the lavf
>> interleaving queue is a bug.
> 
> Once again, buffering thousands of frames and crashing because out of
> memory when the current code succeeds and produces an useful result is a
> regression and the patch series cannot be applied until that regression
> is fixed.

Calling random output that happens to be "acceptable" within the 
subjective expectations of the user as useful sounds to me like you're 
trying to find an excuse to keep buggy code with unpredictable results 
around, just because it's been there for a long time.

> 
>> So for the last time - either suggest a specific and practical way of
>> reducing memory consumption or stop interfering with my work.
> 
> The specific and practical way is to let the current logic in place.
> There might be a few tweaks to make it more accurate, like looking into
> this comment:
> 
>      /* subtitles seem to be usually muxed ahead of other streams;
>         if not, subtracting a larger time here is necessary */
>      pts2 = av_rescale_q(pts, tb, ifp->time_base) - 1;
> 
> But first, we need you to stop behaving as if my previous efforts did
> not mater just because it does not overlap with your narrow use cases.

Your previous efforts mattered, but evidently did not yield completely 
acceptable results, and this overhaul has exposed it.

So, like Anton has asked several times, suggest a way to keep 
deterministic and bitexact output without exponentially increasing 
memory consumption due to buffering.
Nicolas George Dec. 6, 2023, 1:38 p.m. UTC | #15
James Almer (12023-12-06):
> I honestly can't believe you're arguing this.

Yet I do, so I suggest you think a little harder to understand why I do.

> And being condescending will not help your case.

Can you tell that to Anton too please?

> If i request -bitexact, i want bitexact output, regardless of running on a
> core i3 or a Threadripper. There's nothing more to it.

I had not noticed the -bitexact on the test command line. I will grant
the change is acceptable if bit-exact is requested.

> Calling random output that happens to be "acceptable" within the subjective
> expectations of the user as useful sounds to me like you're trying to find
> an excuse to keep buggy code with unpredictable results around, just because
> it's been there for a long time.

Well, you are wrong, and what I explained is the real reason: most
subtitles are not timed that accurately. The subtitles on HBO's Last
Week Tonight, for example, can randomly lag or be early by several
seconds. Even serious subtitles, like the ones for scripted shows on
Netflix/Amazon/Crunchyroll/whatever vary by a few tenths of seconds,
i.e. several frames.

And I have used this code. And I look carefully at subtitles. If the
result was lower quality than the source material, I would have noticed
and I would have endeavored to fix it. There never was need.

Now, can Anton claim similar experience working with subtitles from the
real world? Most of this discussions points to the answer being no.

> So, like Anton has asked several times, suggest a way to keep deterministic
> and bitexact output without exponentially increasing memory consumption due
> to buffering.

I will spend time and effort searching for a solution when we agree to
work together.

“Do this or I will break your code” is an unacceptable behavior, whether
it is directed at me or at Paul or at anybody else, and I do not spend
effort when unacceptable behavior is tolerated.
Paul B Mahol Dec. 7, 2023, 5:26 p.m. UTC | #16
On Wed, Dec 6, 2023 at 2:38 PM Nicolas George <george@nsup.org> wrote:

> James Almer (12023-12-06):
> > I honestly can't believe you're arguing this.
>
> Yet I do, so I suggest you think a little harder to understand why I do.
>
> > And being condescending will not help your case.
>
> Can you tell that to Anton too please?
>
> > If i request -bitexact, i want bitexact output, regardless of running on
> a
> > core i3 or a Threadripper. There's nothing more to it.
>
> I had not noticed the -bitexact on the test command line. I will grant
> the change is acceptable if bit-exact is requested.
>
> > Calling random output that happens to be "acceptable" within the
> subjective
> > expectations of the user as useful sounds to me like you're trying to
> find
> > an excuse to keep buggy code with unpredictable results around, just
> because
> > it's been there for a long time.
>
> Well, you are wrong, and what I explained is the real reason: most
> subtitles are not timed that accurately. The subtitles on HBO's Last
> Week Tonight, for example, can randomly lag or be early by several
> seconds. Even serious subtitles, like the ones for scripted shows on
> Netflix/Amazon/Crunchyroll/whatever vary by a few tenths of seconds,
> i.e. several frames.
>
> And I have used this code. And I look carefully at subtitles. If the
> result was lower quality than the source material, I would have noticed
> and I would have endeavored to fix it. There never was need.
>
> Now, can Anton claim similar experience working with subtitles from the
> real world? Most of this discussions points to the answer being no.
>
> > So, like Anton has asked several times, suggest a way to keep
> deterministic
> > and bitexact output without exponentially increasing memory consumption
> due
> > to buffering.
>
> I will spend time and effort searching for a solution when we agree to
> work together.
>
> “Do this or I will break your code” is an unacceptable behavior, whether
> it is directed at me or at Paul or at anybody else, and I do not spend
> effort when unacceptable behavior is tolerated.
>
>
From 3.4 version of ffmpeg to 6.1 version demuxing  truehd (-c:a copy)
files dropped by factor of 2x speed.
But simple transcode from doc/examples is still several times faster than
that.

I bet using mutexes and condition variables is far from perfect solution or
fftools/ code is buggy.

This is similar to -lavfi sources dropouts in performance but more used by
users of truehd/any small packets format.


> --
>   Nicolas George
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
Paul B Mahol Dec. 21, 2023, 11:53 a.m. UTC | #17
On Thu, Dec 7, 2023 at 6:26 PM Paul B Mahol <onemda@gmail.com> wrote:

>
>
> On Wed, Dec 6, 2023 at 2:38 PM Nicolas George <george@nsup.org> wrote:
>
>> James Almer (12023-12-06):
>> > I honestly can't believe you're arguing this.
>>
>> Yet I do, so I suggest you think a little harder to understand why I do.
>>
>> > And being condescending will not help your case.
>>
>> Can you tell that to Anton too please?
>>
>> > If i request -bitexact, i want bitexact output, regardless of running
>> on a
>> > core i3 or a Threadripper. There's nothing more to it.
>>
>> I had not noticed the -bitexact on the test command line. I will grant
>> the change is acceptable if bit-exact is requested.
>>
>> > Calling random output that happens to be "acceptable" within the
>> subjective
>> > expectations of the user as useful sounds to me like you're trying to
>> find
>> > an excuse to keep buggy code with unpredictable results around, just
>> because
>> > it's been there for a long time.
>>
>> Well, you are wrong, and what I explained is the real reason: most
>> subtitles are not timed that accurately. The subtitles on HBO's Last
>> Week Tonight, for example, can randomly lag or be early by several
>> seconds. Even serious subtitles, like the ones for scripted shows on
>> Netflix/Amazon/Crunchyroll/whatever vary by a few tenths of seconds,
>> i.e. several frames.
>>
>> And I have used this code. And I look carefully at subtitles. If the
>> result was lower quality than the source material, I would have noticed
>> and I would have endeavored to fix it. There never was need.
>>
>> Now, can Anton claim similar experience working with subtitles from the
>> real world? Most of this discussions points to the answer being no.
>>
>> > So, like Anton has asked several times, suggest a way to keep
>> deterministic
>> > and bitexact output without exponentially increasing memory consumption
>> due
>> > to buffering.
>>
>> I will spend time and effort searching for a solution when we agree to
>> work together.
>>
>> “Do this or I will break your code” is an unacceptable behavior, whether
>> it is directed at me or at Paul or at anybody else, and I do not spend
>> effort when unacceptable behavior is tolerated.
>>
>>
> From 3.4 version of ffmpeg to 6.1 version demuxing  truehd (-c:a copy)
> files dropped by factor of 2x speed.
> But simple transcode from doc/examples is still several times faster than
> that.
>
> I bet using mutexes and condition variables is far from perfect solution
> or fftools/ code is buggy.
>
> This is similar to -lavfi sources dropouts in performance but more used by
> users of truehd/any small packets format.
>

I found out if I increase queue size of thread for frames/packets in
fftools/ from 1 to >1 it increases speed in decoding by 10%.
Looks like other numbers greater than 2 do not make much any difference.

Still current state is sub-optimal.
Anton Khirnov Dec. 22, 2023, 10:26 a.m. UTC | #18
Quoting Paul B Mahol (2023-12-21 12:53:58)
> On Thu, Dec 7, 2023 at 6:26 PM Paul B Mahol <onemda@gmail.com> wrote:
> 
> >
> >
> > On Wed, Dec 6, 2023 at 2:38 PM Nicolas George <george@nsup.org> wrote:
> >
> >> James Almer (12023-12-06):
> >> > I honestly can't believe you're arguing this.
> >>
> >> Yet I do, so I suggest you think a little harder to understand why I do.
> >>
> >> > And being condescending will not help your case.
> >>
> >> Can you tell that to Anton too please?
> >>
> >> > If i request -bitexact, i want bitexact output, regardless of running
> >> on a
> >> > core i3 or a Threadripper. There's nothing more to it.
> >>
> >> I had not noticed the -bitexact on the test command line. I will grant
> >> the change is acceptable if bit-exact is requested.
> >>
> >> > Calling random output that happens to be "acceptable" within the
> >> subjective
> >> > expectations of the user as useful sounds to me like you're trying to
> >> find
> >> > an excuse to keep buggy code with unpredictable results around, just
> >> because
> >> > it's been there for a long time.
> >>
> >> Well, you are wrong, and what I explained is the real reason: most
> >> subtitles are not timed that accurately. The subtitles on HBO's Last
> >> Week Tonight, for example, can randomly lag or be early by several
> >> seconds. Even serious subtitles, like the ones for scripted shows on
> >> Netflix/Amazon/Crunchyroll/whatever vary by a few tenths of seconds,
> >> i.e. several frames.
> >>
> >> And I have used this code. And I look carefully at subtitles. If the
> >> result was lower quality than the source material, I would have noticed
> >> and I would have endeavored to fix it. There never was need.
> >>
> >> Now, can Anton claim similar experience working with subtitles from the
> >> real world? Most of this discussions points to the answer being no.
> >>
> >> > So, like Anton has asked several times, suggest a way to keep
> >> deterministic
> >> > and bitexact output without exponentially increasing memory consumption
> >> due
> >> > to buffering.
> >>
> >> I will spend time and effort searching for a solution when we agree to
> >> work together.
> >>
> >> “Do this or I will break your code” is an unacceptable behavior, whether
> >> it is directed at me or at Paul or at anybody else, and I do not spend
> >> effort when unacceptable behavior is tolerated.
> >>
> >>
> > From 3.4 version of ffmpeg to 6.1 version demuxing  truehd (-c:a copy)
> > files dropped by factor of 2x speed.
> > But simple transcode from doc/examples is still several times faster than
> > that.
> >
> > I bet using mutexes and condition variables is far from perfect solution
> > or fftools/ code is buggy.
> >
> > This is similar to -lavfi sources dropouts in performance but more used by
> > users of truehd/any small packets format.
> >
> 
> I found out if I increase queue size of thread for frames/packets in
> fftools/ from 1 to >1 it increases speed in decoding by 10%.
> Looks like other numbers greater than 2 do not make much any difference.
> 
> Still current state is sub-optimal.

It most likely is, but I expect the optimal value would depend on the
specific configuration. More testing welcome.
diff mbox series

Patch

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index b8a97258a0..30b594fd97 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -117,7 +117,7 @@  typedef struct BenchmarkTimeStamps {
 static BenchmarkTimeStamps get_benchmark_time_stamps(void);
 static int64_t getmaxrss(void);
 
-unsigned nb_output_dumped = 0;
+atomic_uint nb_output_dumped = 0;
 
 static BenchmarkTimeStamps current_time;
 AVIOContext *progress_avio = NULL;
@@ -138,30 +138,6 @@  static struct termios oldtty;
 static int restore_tty;
 #endif
 
-/* sub2video hack:
-   Convert subtitles to video with alpha to insert them in filter graphs.
-   This is a temporary solution until libavfilter gets real subtitles support.
- */
-
-static void sub2video_heartbeat(InputFile *infile, int64_t pts, AVRational tb)
-{
-    /* When a frame is read from a file, examine all sub2video streams in
-       the same file and send the sub2video frame again. Otherwise, decoded
-       video frames could be accumulating in the filter graph while a filter
-       (possibly overlay) is desperately waiting for a subtitle frame. */
-    for (int i = 0; i < infile->nb_streams; i++) {
-        InputStream *ist = infile->streams[i];
-
-        if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        for (int j = 0; j < ist->nb_filters; j++)
-            ifilter_sub2video_heartbeat(ist->filters[j], pts, tb);
-    }
-}
-
-/* end of sub2video hack */
-
 static void term_exit_sigsafe(void)
 {
 #if HAVE_TERMIOS_H
@@ -499,23 +475,13 @@  void update_benchmark(const char *fmt, ...)
     }
 }
 
-void close_output_stream(OutputStream *ost)
-{
-    OutputFile *of = output_files[ost->file_index];
-    ost->finished |= ENCODER_FINISHED;
-
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-}
-
-static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time)
+static void print_report(int is_last_report, int64_t timer_start, int64_t cur_time, int64_t pts)
 {
     AVBPrint buf, buf_script;
     int64_t total_size = of_filesize(output_files[0]);
     int vid;
     double bitrate;
     double speed;
-    int64_t pts = AV_NOPTS_VALUE;
     static int64_t last_time = -1;
     static int first_report = 1;
     uint64_t nb_frames_dup = 0, nb_frames_drop = 0;
@@ -533,7 +499,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             last_time = cur_time;
         }
         if (((cur_time - last_time) < stats_period && !first_report) ||
-            (first_report && nb_output_dumped < nb_output_files))
+            (first_report && atomic_load(&nb_output_dumped) < nb_output_files))
             return;
         last_time = cur_time;
     }
@@ -544,7 +510,7 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
     av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC);
     av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC);
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : -1;
+        const float q = ost->enc ? atomic_load(&ost->quality) / (float) FF_QP2LAMBDA : -1;
 
         if (vid && ost->type == AVMEDIA_TYPE_VIDEO) {
             av_bprintf(&buf, "q=%2.1f ", q);
@@ -565,22 +531,18 @@  static void print_report(int is_last_report, int64_t timer_start, int64_t cur_ti
             if (is_last_report)
                 av_bprintf(&buf, "L");
 
-            nb_frames_dup  = ost->filter->nb_frames_dup;
-            nb_frames_drop = ost->filter->nb_frames_drop;
+            nb_frames_dup  = atomic_load(&ost->filter->nb_frames_dup);
+            nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop);
 
             vid = 1;
         }
-        /* compute min output value */
-        if (ost->last_mux_dts != AV_NOPTS_VALUE) {
-            if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts)
-                pts = ost->last_mux_dts;
-            if (copy_ts) {
-                if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
-                    copy_ts_first_pts = pts;
-                if (copy_ts_first_pts != AV_NOPTS_VALUE)
-                    pts -= copy_ts_first_pts;
-            }
-        }
+    }
+
+    if (copy_ts) {
+        if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1)
+            copy_ts_first_pts = pts;
+        if (copy_ts_first_pts != AV_NOPTS_VALUE)
+            pts -= copy_ts_first_pts;
     }
 
     us    = FFABS64U(pts) % AV_TIME_BASE;
@@ -783,81 +745,6 @@  int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy)
     return 0;
 }
 
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt)
-{
-    OutputFile *of = output_files[ost->file_index];
-    int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base,
-                                      AV_TIME_BASE_Q);
-
-    if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & AV_PKT_FLAG_KEY))
-        // we are only interested in heartbeats on streams configured, and
-        // only on random access points.
-        return 0;
-
-    for (int i = 0; i < of->nb_streams; i++) {
-        OutputStream *iter_ost = of->streams[i];
-        InputStream  *ist      = iter_ost->ist;
-        int ret = AVERROR_BUG;
-
-        if (iter_ost == ost || !ist || !ist->decoding_needed ||
-            ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE)
-            // We wish to skip the stream that causes the heartbeat,
-            // output streams without an input stream, streams not decoded
-            // (as fix_sub_duration is only done for decoded subtitles) as
-            // well as non-subtitle streams.
-            continue;
-
-        if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0)
-            return ret;
-    }
-
-    return 0;
-}
-
-/* pkt = NULL means EOF (needed to flush decoder buffers) */
-static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    InputFile *f = input_files[ist->file_index];
-    int64_t dts_est = AV_NOPTS_VALUE;
-    int ret = 0;
-    int eof_reached = 0;
-
-    if (ist->decoding_needed) {
-        ret = dec_packet(ist, pkt, no_eof);
-        if (ret < 0 && ret != AVERROR_EOF)
-            return ret;
-    }
-    if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed))
-        eof_reached = 1;
-
-    if (pkt && pkt->opaque_ref) {
-        DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data;
-        dts_est = pd->dts_est;
-    }
-
-    if (f->recording_time != INT64_MAX) {
-        int64_t start_time = 0;
-        if (copy_ts) {
-            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
-            start_time += start_at_zero ? 0 : f->start_time_effective;
-        }
-        if (dts_est >= f->recording_time + start_time)
-            pkt = NULL;
-    }
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (ost->enc || (!pkt && no_eof))
-            continue;
-
-        ret = of_streamcopy(ost, pkt, dts_est);
-        if (ret < 0)
-            return ret;
-    }
-
-    return !eof_reached;
-}
-
 static void print_stream_maps(void)
 {
     av_log(NULL, AV_LOG_INFO, "Stream mapping:\n");
@@ -934,43 +821,6 @@  static void print_stream_maps(void)
     }
 }
 
-/**
- * Select the output stream to process.
- *
- * @retval 0 an output stream was selected
- * @retval AVERROR(EAGAIN) need to wait until more input is available
- * @retval AVERROR_EOF no more streams need output
- */
-static int choose_output(OutputStream **post)
-{
-    int64_t opts_min = INT64_MAX;
-    OutputStream *ost_min = NULL;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        int64_t opts;
-
-        if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) {
-            opts = ost->filter->last_pts;
-        } else {
-            opts = ost->last_mux_dts == AV_NOPTS_VALUE ?
-                   INT64_MIN : ost->last_mux_dts;
-        }
-
-        if (!ost->initialized && !ost->finished) {
-            ost_min = ost;
-            break;
-        }
-        if (!ost->finished && opts < opts_min) {
-            opts_min = opts;
-            ost_min  = ost;
-        }
-    }
-    if (!ost_min)
-        return AVERROR_EOF;
-    *post = ost_min;
-    return ost_min->unavailable ? AVERROR(EAGAIN) : 0;
-}
-
 static void set_tty_echo(int on)
 {
 #if HAVE_TERMIOS_H
@@ -1042,149 +892,21 @@  static int check_keyboard_interaction(int64_t cur_time)
     return 0;
 }
 
-static void reset_eagain(void)
-{
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
-        ost->unavailable = 0;
-}
-
-static void decode_flush(InputFile *ifile)
-{
-    for (int i = 0; i < ifile->nb_streams; i++) {
-        InputStream *ist = ifile->streams[i];
-
-        if (ist->discard || !ist->decoding_needed)
-            continue;
-
-        dec_packet(ist, NULL, 1);
-    }
-}
-
-/*
- * Return
- * - 0 -- one packet was read and processed
- * - AVERROR(EAGAIN) -- no packets were available for selected file,
- *   this function should be called again
- * - AVERROR_EOF -- this function should not be called again
- */
-static int process_input(int file_index, AVPacket *pkt)
-{
-    InputFile *ifile = input_files[file_index];
-    InputStream *ist;
-    int ret, i;
-
-    ret = ifile_get_packet(ifile, pkt);
-
-    if (ret == 1) {
-        /* the input file is looped: flush the decoders */
-        decode_flush(ifile);
-        return AVERROR(EAGAIN);
-    }
-    if (ret < 0) {
-        if (ret != AVERROR_EOF) {
-            av_log(ifile, AV_LOG_ERROR,
-                   "Error retrieving a packet from demuxer: %s\n", av_err2str(ret));
-            if (exit_on_error)
-                return ret;
-        }
-
-        for (i = 0; i < ifile->nb_streams; i++) {
-            ist = ifile->streams[i];
-            if (!ist->discard) {
-                ret = process_input_packet(ist, NULL, 0);
-                if (ret>0)
-                    return 0;
-                else if (ret < 0)
-                    return ret;
-            }
-
-            /* mark all outputs that don't go through lavfi as finished */
-            for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-                OutputStream *ost = ist->outputs[oidx];
-                OutputFile    *of = output_files[ost->file_index];
-
-                ret = of_output_packet(of, ost, NULL);
-                if (ret < 0)
-                    return ret;
-            }
-        }
-
-        ifile->eof_reached = 1;
-        return AVERROR(EAGAIN);
-    }
-
-    reset_eagain();
-
-    ist = ifile->streams[pkt->stream_index];
-
-    sub2video_heartbeat(ifile, pkt->pts, pkt->time_base);
-
-    ret = process_input_packet(ist, pkt, 0);
-
-    av_packet_unref(pkt);
-
-    return ret < 0 ? ret : 0;
-}
-
-/**
- * Run a single step of transcoding.
- *
- * @return  0 for success, <0 for error
- */
-static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
-{
-    InputStream  *ist = NULL;
-    int ret;
-
-    if (ost->filter) {
-        if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0)
-            return ret;
-        if (!ist)
-            return 0;
-    } else {
-        ist = ost->ist;
-        av_assert0(ist);
-    }
-
-    ret = process_input(ist->file_index, demux_pkt);
-    if (ret == AVERROR(EAGAIN)) {
-        return 0;
-    }
-
-    if (ret < 0)
-        return ret == AVERROR_EOF ? 0 : ret;
-
-    // process_input() above might have caused output to become available
-    // in multiple filtergraphs, so we process all of them
-    for (int i = 0; i < nb_filtergraphs; i++) {
-        ret = reap_filters(filtergraphs[i], 0);
-        if (ret < 0)
-            return ret;
-    }
-
-    return 0;
-}
-
 /*
  * The following code is the main loop of the file converter
  */
-static int transcode(Scheduler *sch, int *err_rate_exceeded)
+static int transcode(Scheduler *sch)
 {
     int ret = 0, i;
-    InputStream *ist;
-    int64_t timer_start;
-    AVPacket *demux_pkt = NULL;
+    int64_t timer_start, transcode_ts = 0;
 
     print_stream_maps();
 
-    *err_rate_exceeded = 0;
     atomic_store(&transcode_init_done, 1);
 
-    demux_pkt = av_packet_alloc();
-    if (!demux_pkt) {
-        ret = AVERROR(ENOMEM);
-        goto fail;
-    }
+    ret = sch_start(sch);
+    if (ret < 0)
+        return ret;
 
     if (stdin_interaction) {
         av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
@@ -1192,8 +914,7 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
 
     timer_start = av_gettime_relative();
 
-    while (!received_sigterm) {
-        OutputStream *ost;
+    while (!sch_wait(sch, stats_period, &transcode_ts)) {
         int64_t cur_time= av_gettime_relative();
 
         /* if 'q' pressed, exits */
@@ -1201,49 +922,11 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
             if (check_keyboard_interaction(cur_time) < 0)
                 break;
 
-        ret = choose_output(&ost);
-        if (ret == AVERROR(EAGAIN)) {
-            reset_eagain();
-            av_usleep(10000);
-            ret = 0;
-            continue;
-        } else if (ret < 0) {
-            av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write to, finishing.\n");
-            ret = 0;
-            break;
-        }
-
-        ret = transcode_step(ost, demux_pkt);
-        if (ret < 0 && ret != AVERROR_EOF) {
-            av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
-            break;
-        }
-
         /* dump report by using the output first video and audio streams */
-        print_report(0, timer_start, cur_time);
+        print_report(0, timer_start, cur_time, transcode_ts);
     }
 
-    /* at the end of stream, we must flush the decoder buffers */
-    for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) {
-        float err_rate;
-
-        if (!input_files[ist->file_index]->eof_reached) {
-            int err = process_input_packet(ist, NULL, 0);
-            ret = err_merge(ret, err);
-        }
-
-        err_rate = (ist->frames_decoded || ist->decode_errors) ?
-                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
-        if (err_rate > max_error_rate) {
-            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
-                   err_rate, max_error_rate);
-            *err_rate_exceeded = 1;
-        } else if (err_rate)
-            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
-    }
-    ret = err_merge(ret, enc_flush());
-
-    term_exit();
+    ret = sch_stop(sch);
 
     /* write the trailer if needed */
     for (i = 0; i < nb_output_files; i++) {
@@ -1251,11 +934,10 @@  static int transcode(Scheduler *sch, int *err_rate_exceeded)
         ret = err_merge(ret, err);
     }
 
-    /* dump report by using the first video and audio streams */
-    print_report(1, timer_start, av_gettime_relative());
+    term_exit();
 
-fail:
-    av_packet_free(&demux_pkt);
+    /* dump report by using the first video and audio streams */
+    print_report(1, timer_start, av_gettime_relative(), transcode_ts);
 
     return ret;
 }
@@ -1308,7 +990,7 @@  int main(int argc, char **argv)
 {
     Scheduler *sch = NULL;
 
-    int ret, err_rate_exceeded;
+    int ret;
     BenchmarkTimeStamps ti;
 
     init_dynload();
@@ -1350,7 +1032,7 @@  int main(int argc, char **argv)
     }
 
     current_time = ti = get_benchmark_time_stamps();
-    ret = transcode(sch, &err_rate_exceeded);
+    ret = transcode(sch);
     if (ret >= 0 && do_benchmark) {
         int64_t utime, stime, rtime;
         current_time = get_benchmark_time_stamps();
@@ -1362,8 +1044,8 @@  int main(int argc, char **argv)
                utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0);
     }
 
-    ret = received_nb_signals ? 255 :
-          err_rate_exceeded   ?  69 : ret;
+    ret = received_nb_signals                 ? 255 :
+          (ret == FFMPEG_ERROR_RATE_EXCEEDED) ?  69 : ret;
 
 finish:
     if (ret == AVERROR_EXIT)
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index a89038b765..ba82b7490d 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -61,6 +61,8 @@ 
 #define FFMPEG_OPT_TOP 1
 #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1
 
+#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D')
+
 enum VideoSyncMethod {
     VSYNC_AUTO = -1,
     VSYNC_PASSTHROUGH,
@@ -82,13 +84,16 @@  enum HWAccelID {
 };
 
 enum FrameOpaque {
-    FRAME_OPAQUE_REAP_FILTERS = 1,
-    FRAME_OPAQUE_CHOOSE_INPUT,
-    FRAME_OPAQUE_SUB_HEARTBEAT,
+    FRAME_OPAQUE_SUB_HEARTBEAT = 1,
     FRAME_OPAQUE_EOF,
     FRAME_OPAQUE_SEND_COMMAND,
 };
 
+enum PacketOpaque {
+    PKT_OPAQUE_SUB_HEARTBEAT = 1,
+    PKT_OPAQUE_FIX_SUB_DURATION,
+};
+
 typedef struct HWDevice {
     const char *name;
     enum AVHWDeviceType type;
@@ -309,11 +314,8 @@  typedef struct OutputFilter {
 
     enum AVMediaType     type;
 
-    /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */
-    int64_t last_pts;
-
-    uint64_t nb_frames_dup;
-    uint64_t nb_frames_drop;
+    atomic_uint_least64_t nb_frames_dup;
+    atomic_uint_least64_t nb_frames_drop;
 } OutputFilter;
 
 typedef struct FilterGraph {
@@ -426,11 +428,6 @@  typedef struct InputFile {
 
     float readrate;
     int accurate_seek;
-
-    /* when looping the input file, this queue is used by decoders to report
-     * the last frame timestamp back to the demuxer thread */
-    AVThreadMessageQueue *audio_ts_queue;
-    int                   audio_ts_queue_size;
 } InputFile;
 
 enum forced_keyframes_const {
@@ -532,8 +529,6 @@  typedef struct OutputStream {
     InputStream *ist;
 
     AVStream *st;            /* stream in the output file */
-    /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q */
-    int64_t last_mux_dts;
 
     AVRational enc_timebase;
 
@@ -578,13 +573,6 @@  typedef struct OutputStream {
     AVDictionary *sws_dict;
     AVDictionary *swr_opts;
     char *apad;
-    OSTFinished finished;        /* no more packets should be written for this stream */
-    int unavailable;                     /* true if the steram is unavailable (possibly temporarily) */
-
-    // init_output_stream() has been called for this stream
-    // The encoder and the bitstream filters have been initialized and the stream
-    // parameters are set in the AVStream.
-    int initialized;
 
     const char *attachment_filename;
 
@@ -598,9 +586,8 @@  typedef struct OutputStream {
     uint64_t samples_encoded;
 
     /* packet quality factor */
-    int quality;
+    atomic_int quality;
 
-    int sq_idx_encode;
     int sq_idx_mux;
 
     EncStats enc_stats_pre;
@@ -658,7 +645,6 @@  extern FilterGraph **filtergraphs;
 extern int        nb_filtergraphs;
 
 extern char *vstats_filename;
-extern char *sdp_filename;
 
 extern float dts_delta_threshold;
 extern float dts_error_threshold;
@@ -691,7 +677,7 @@  extern const AVIOInterruptCB int_cb;
 extern const OptionDef options[];
 extern HWDevice *filter_hw_device;
 
-extern unsigned nb_output_dumped;
+extern atomic_uint nb_output_dumped;
 
 extern int ignore_unknown_streams;
 extern int copy_unknown_streams;
@@ -737,10 +723,6 @@  FrameData *frame_data(AVFrame *frame);
 
 const FrameData *frame_data_c(AVFrame *frame);
 
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference);
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb);
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb);
-
 /**
  * Set up fallback filtering parameters from a decoder context. They will only
  * be used if no frames are ever sent on this input, otherwise the actual
@@ -761,26 +743,9 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch);
 
 void fg_free(FilterGraph **pfg);
 
-/**
- * Perform a step of transcoding for the specified filter graph.
- *
- * @param[in]  graph     filter graph to consider
- * @param[out] best_ist  input stream where a frame would allow to continue
- * @return  0 for success, <0 for error
- */
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist);
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters);
 
-/**
- * Get and encode new output from specified filtergraph, without causing
- * activity.
- *
- * @return  0 for success, <0 for severe errors
- */
-int reap_filters(FilterGraph *fg, int flush);
-
 int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch);
 
 void enc_stats_write(OutputStream *ost, EncStats *es,
@@ -807,25 +772,11 @@  int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input);
 int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx);
 void dec_free(Decoder **pdec);
 
-/**
- * Submit a packet for decoding
- *
- * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and
- * mark all downstreams as finished.
- *
- * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush
- * decoders and await further input.
- */
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof);
-
 int enc_alloc(Encoder **penc, const AVCodec *codec,
               Scheduler *sch, unsigned sch_idx);
 void enc_free(Encoder **penc);
 
-int enc_open(OutputStream *ost, const AVFrame *frame);
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub);
-int enc_frame(OutputStream *ost, AVFrame *frame);
-int enc_flush(void);
+int enc_open(void *opaque, const AVFrame *frame);
 
 /*
  * Initialize muxing state for the given stream, should be called
@@ -840,30 +791,11 @@  void of_free(OutputFile **pof);
 
 void of_enc_stats_close(void);
 
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt);
-
-/**
- * @param dts predicted packet dts in AV_TIME_BASE_Q
- */
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts);
-
 int64_t of_filesize(OutputFile *of);
 
 int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
 void ifile_close(InputFile **f);
 
-/**
- * Get next input packet from the demuxer.
- *
- * @param pkt the packet is written here when this function returns 0
- * @return
- * - 0 when a packet has been read successfully
- * - 1 when stream end was reached, but the stream is looped;
- *     caller should flush decoders and read from this demuxer again
- * - a negative error code on failure
- */
-int ifile_get_packet(InputFile *f, AVPacket *pkt);
-
 int ist_output_add(InputStream *ist, OutputStream *ost);
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
 
@@ -880,9 +812,6 @@  InputStream *ist_iter(InputStream *prev);
  * pass NULL to start iteration */
 OutputStream *ost_iter(OutputStream *prev);
 
-void close_output_stream(OutputStream *ost);
-int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt);
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts);
 void update_benchmark(const char *fmt, ...);
 
 #define SPECIFIER_OPT_FMT_str  "%s"
diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c
index 90ea0d6d93..5dde82a276 100644
--- a/fftools/ffmpeg_dec.c
+++ b/fftools/ffmpeg_dec.c
@@ -54,24 +54,6 @@  struct Decoder {
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending coded packets from the main thread to
-     * the decoder thread.
-     *
-     * An empty packet is sent to flush the decoder without terminating
-     * decoding.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending decoded frames from the decoder thread
-     * to the main thread.
-     *
-     * An empty frame is sent to signal that a single packet has been fully
-     * processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -80,24 +62,6 @@  typedef struct DecThreadContext {
     AVPacket        *pkt;
 } DecThreadContext;
 
-static int dec_thread_stop(Decoder *d)
-{
-    void *ret;
-
-    if (!d->queue_in)
-        return 0;
-
-    tq_send_finish(d->queue_in, 0);
-    tq_receive_finish(d->queue_out, 0);
-
-    pthread_join(d->thread, &ret);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-
-    return (intptr_t)ret;
-}
-
 void dec_free(Decoder **pdec)
 {
     Decoder *dec = *pdec;
@@ -105,8 +69,6 @@  void dec_free(Decoder **pdec)
     if (!dec)
         return;
 
-    dec_thread_stop(dec);
-
     av_frame_free(&dec->frame);
     av_packet_free(&dec->pkt);
 
@@ -148,25 +110,6 @@  fail:
     return AVERROR(ENOMEM);
 }
 
-static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame)
-{
-    int i, ret = 0;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        ret = ifilter_send_frame(ist->filters[i], decoded_frame,
-                                 i < ist->nb_filters - 1 ||
-                                 ist->dec->type == AVMEDIA_TYPE_SUBTITLE);
-        if (ret == AVERROR_EOF)
-            ret = 0; /* ignore */
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Failed to inject frame into filter network: %s\n", av_err2str(ret));
-            break;
-        }
-    }
-    return ret;
-}
-
 static AVRational audio_samplerate_update(void *logctx, Decoder *d,
                                           const AVFrame *frame)
 {
@@ -421,28 +364,14 @@  static int process_subtitle(InputStream *ist, AVFrame *frame)
     if (!subtitle)
         return 0;
 
-    ret = send_frame_to_filters(ist, frame);
+    ret = sch_dec_send(d->sch, d->sch_idx, frame);
     if (ret < 0)
-        return ret;
+        av_frame_unref(frame);
 
-    subtitle = (AVSubtitle*)frame->buf[0]->data;
-    if (!subtitle->num_rects)
-        return 0;
-
-    for (int oidx = 0; oidx < ist->nb_outputs; oidx++) {
-        OutputStream *ost = ist->outputs[oidx];
-        if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE)
-            continue;
-
-        ret = enc_subtitle(output_files[ost->file_index], ost, subtitle);
-        if (ret < 0)
-            return ret;
-    }
-
-    return 0;
+    return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
 }
 
-int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
+static int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
 {
     Decoder *d = ist->decoder;
     int ret = AVERROR_BUG;
@@ -468,12 +397,24 @@  int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts)
 static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
                                AVFrame *frame)
 {
-    Decoder          *d = ist->decoder;
+    Decoder *d = ist->decoder;
     AVPacket *flush_pkt = NULL;
     AVSubtitle subtitle;
     int got_output;
     int ret;
 
+    if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) {
+        frame->pts       = pkt->pts;
+        frame->time_base = pkt->time_base;
+        frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+    } else if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION) {
+        return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, pkt->time_base,
+                                                            AV_TIME_BASE_Q));
+    }
+
     if (!pkt) {
         flush_pkt = av_packet_alloc();
         if (!flush_pkt)
@@ -496,7 +437,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
 
     ist->frames_decoded++;
 
-    // XXX the queue for transferring data back to the main thread runs
+    // XXX the queue for transferring data to consumers runs
     // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that
     // inside the frame
     // eventually, subtitles should be switched to use AVFrames natively
@@ -509,26 +450,7 @@  static int transcode_subtitles(InputStream *ist, const AVPacket *pkt,
     frame->width  = ist->dec_ctx->width;
     frame->height = ist->dec_ctx->height;
 
-    ret = tq_send(d->queue_out, 0, frame);
-    if (ret < 0)
-        av_frame_unref(frame);
-
-    return ret;
-}
-
-static int send_filter_eof(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    int i, ret;
-
-    for (i = 0; i < ist->nb_filters; i++) {
-        int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
-                          d->last_frame_pts + d->last_frame_duration_est;
-        ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb);
-        if (ret < 0)
-            return ret;
-    }
-    return 0;
+    return process_subtitle(ist, frame);
 }
 
 static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
@@ -635,9 +557,11 @@  static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame)
 
         ist->frames_decoded++;
 
-        ret = tq_send(d->queue_out, 0, frame);
-        if (ret < 0)
-            return ret;
+        ret = sch_dec_send(d->sch, d->sch_idx, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret == AVERROR_EOF ? AVERROR_EXIT : ret;
+        }
     }
 }
 
@@ -679,7 +603,6 @@  fail:
 void *decoder_thread(void *arg)
 {
     InputStream *ist = arg;
-    InputFile *ifile = input_files[ist->file_index];
     Decoder       *d = ist->decoder;
     DecThreadContext dt;
     int ret = 0, input_status = 0;
@@ -691,19 +614,31 @@  void *decoder_thread(void *arg)
     dec_thread_set_name(ist);
 
     while (!input_status) {
-        int dummy, flush_buffers;
+        int flush_buffers, have_data;
 
-        input_status = tq_receive(d->queue_in, &dummy, dt.pkt);
-        flush_buffers = input_status >= 0 && !dt.pkt->buf;
-        if (!dt.pkt->buf)
+        input_status  = sch_dec_receive(d->sch, d->sch_idx, dt.pkt);
+        have_data     = input_status >= 0 &&
+            (dt.pkt->buf || dt.pkt->side_data_elems ||
+             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT ||
+             (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION);
+        flush_buffers = input_status >= 0 && !have_data;
+        if (!have_data)
             av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n",
                    flush_buffers ? "flush" : "EOF");
 
-        ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame);
+        ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame);
 
         av_packet_unref(dt.pkt);
         av_frame_unref(dt.frame);
 
+        // AVERROR_EOF  - EOF from the decoder
+        // AVERROR_EXIT - EOF from the scheduler
+        // we treat them differently when flushing
+        if (ret == AVERROR_EXIT) {
+            ret = AVERROR_EOF;
+            flush_buffers = 0;
+        }
+
         if (ret == AVERROR_EOF) {
             av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n",
                    flush_buffers ? "resetting" : "finishing");
@@ -711,11 +646,10 @@  void *decoder_thread(void *arg)
             if (!flush_buffers)
                 break;
 
-            /* report last frame duration to the demuxer thread */
+            /* report last frame duration to the scheduler */
             if (ist->dec->type == AVMEDIA_TYPE_AUDIO) {
-                Timestamp ts = { .ts = d->last_frame_pts + d->last_frame_duration_est,
-                                 .tb = d->last_frame_tb };
-                av_thread_message_queue_send(ifile->audio_ts_queue, &ts, 0);
+                dt.pkt->pts       = d->last_frame_pts + d->last_frame_duration_est;
+                dt.pkt->time_base = d->last_frame_tb;
             }
 
             avcodec_flush_buffers(ist->dec_ctx);
@@ -724,149 +658,47 @@  void *decoder_thread(void *arg)
                    av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the entire packet was processed
-        ret = tq_send(d->queue_out, 0, dt.frame);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
+    // on success send EOF timestamp to our downstreams
+    if (ret >= 0) {
+        float err_rate;
+
+        av_frame_unref(dt.frame);
+
+        dt.frame->opaque    = (void*)(intptr_t)FRAME_OPAQUE_EOF;
+        dt.frame->pts       = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE :
+                              d->last_frame_pts + d->last_frame_duration_est;
+        dt.frame->time_base = d->last_frame_tb;
+
+        ret = sch_dec_send(d->sch, d->sch_idx, dt.frame);
+        if (ret < 0 && ret != AVERROR_EOF) {
+            av_log(NULL, AV_LOG_FATAL,
+                   "Error signalling EOF timestamp: %s\n", av_err2str(ret));
+            goto finish;
+        }
+        ret = 0;
+
+        err_rate = (ist->frames_decoded || ist->decode_errors) ?
+                   ist->decode_errors / (ist->frames_decoded + ist->decode_errors) : 0.f;
+        if (err_rate > max_error_rate) {
+            av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds maximum %g\n",
+                   err_rate, max_error_rate);
+            ret = FFMPEG_ERROR_RATE_EXCEEDED;
+        } else if (err_rate)
+            av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", err_rate);
+    }
+
 finish:
-    tq_receive_finish(d->queue_in,  0);
-    tq_send_finish   (d->queue_out, 0);
-
-    // make sure the demuxer does not get stuck waiting for audio durations
-    // that will never arrive
-    if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO)
-        av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, AVERROR_EOF);
-
     dec_thread_uninit(&dt);
 
-    av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof)
-{
-    Decoder *d = ist->decoder;
-    int ret = 0, thread_ret;
-
-    // thread already joined
-    if (!d->queue_in)
-        return AVERROR_EOF;
-
-    // send the packet/flush request/EOF to the decoder thread
-    if (pkt || no_eof) {
-        av_packet_unref(d->pkt);
-
-        if (pkt) {
-            ret = av_packet_ref(d->pkt, pkt);
-            if (ret < 0)
-                goto finish;
-        }
-
-        ret = tq_send(d->queue_in, 0, d->pkt);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(d->queue_in, 0);
-
-    // retrieve all decoded data for the packet
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(d->queue_out, &dummy, d->frame);
-        if (ret < 0)
-            goto finish;
-
-        // packet fully processed
-        if (!d->frame->buf[0])
-            return 0;
-
-        // process the decoded frame
-        if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) {
-            ret = process_subtitle(ist, d->frame);
-        } else {
-            ret = send_frame_to_filters(ist, d->frame);
-        }
-        av_frame_unref(d->frame);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = dec_thread_stop(d);
-    if (thread_ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-    // non-EOF errors here are all fatal
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to our downstreams
-    ret = send_filter_eof(ist);
-    if (ret < 0) {
-        av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n");
-        return ret;
-    }
-
-    return AVERROR_EOF;
-}
-
-static int dec_thread_start(InputStream *ist)
-{
-    Decoder *d = ist->decoder;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->queue_in = tq_alloc(1, 1, op, pkt_move);
-    if (!d->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    d->queue_out = tq_alloc(1, 4, op, frame_move);
-    if (!d->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&d->thread, NULL, decoder_thread, ist);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&d->queue_in);
-    tq_free(&d->queue_out);
-    return ret;
-}
-
 static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts)
 {
     InputStream *ist = s->opaque;
@@ -1118,12 +950,5 @@  int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx)
     if (ret < 0)
         return ret;
 
-    ret = dec_thread_start(ist);
-    if (ret < 0) {
-        av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     return 0;
 }
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 2234dbe076..91cd7a1125 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -22,8 +22,6 @@ 
 #include "ffmpeg.h"
 #include "ffmpeg_sched.h"
 #include "ffmpeg_utils.h"
-#include "objpool.h"
-#include "thread_queue.h"
 
 #include "libavutil/avassert.h"
 #include "libavutil/avstring.h"
@@ -35,7 +33,6 @@ 
 #include "libavutil/pixdesc.h"
 #include "libavutil/time.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -66,7 +63,11 @@  typedef struct DemuxStream {
 
     double ts_scale;
 
+    // scheduler returned EOF for this stream
+    int finished;
+
     int streamcopy_needed;
+    int have_sub2video;
 
     int wrap_correction_done;
     int saw_first_ts;
@@ -101,6 +102,7 @@  typedef struct Demuxer {
 
     /* number of times input stream should be looped */
     int loop;
+    int have_audio_dec;
     /* duration of the looped segment of the input file */
     Timestamp duration;
     /* pts with the smallest/largest values ever seen */
@@ -113,11 +115,12 @@  typedef struct Demuxer {
     double readrate_initial_burst;
 
     Scheduler            *sch;
-    ThreadQueue          *thread_queue;
-    int                   thread_queue_size;
-    pthread_t             thread;
+
+    AVPacket             *pkt_heartbeat;
 
     int                   read_started;
+    int                   nb_streams_used;
+    int                   nb_streams_finished;
 } Demuxer;
 
 static DemuxStream *ds_from_ist(InputStream *ist)
@@ -153,7 +156,7 @@  static void report_new_stream(Demuxer *d, const AVPacket *pkt)
     d->nb_streams_warn = pkt->stream_index + 1;
 }
 
-static int seek_to_start(Demuxer *d)
+static int seek_to_start(Demuxer *d, Timestamp end_pts)
 {
     InputFile    *ifile = &d->f;
     AVFormatContext *is = ifile->ctx;
@@ -163,21 +166,10 @@  static int seek_to_start(Demuxer *d)
     if (ret < 0)
         return ret;
 
-    if (ifile->audio_ts_queue_size) {
-        int got_ts = 0;
-
-        while (got_ts < ifile->audio_ts_queue_size) {
-            Timestamp ts;
-            ret = av_thread_message_queue_recv(ifile->audio_ts_queue, &ts, 0);
-            if (ret < 0)
-                return ret;
-            got_ts++;
-
-            if (d->max_pts.ts == AV_NOPTS_VALUE ||
-                av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) < 0)
-                d->max_pts = ts;
-        }
-    }
+    if (end_pts.ts != AV_NOPTS_VALUE &&
+        (d->max_pts.ts == AV_NOPTS_VALUE ||
+         av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, end_pts.tb) < 0))
+        d->max_pts = end_pts;
 
     if (d->max_pts.ts != AV_NOPTS_VALUE) {
         int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : d->min_pts.ts;
@@ -404,7 +396,7 @@  static int ts_fixup(Demuxer *d, AVPacket *pkt)
     duration = av_rescale_q(d->duration.ts, d->duration.tb, pkt->time_base);
     if (pkt->pts != AV_NOPTS_VALUE) {
         // audio decoders take precedence for estimating total file duration
-        int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : pkt->duration;
+        int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration;
 
         pkt->pts += duration;
 
@@ -440,7 +432,7 @@  static int ts_fixup(Demuxer *d, AVPacket *pkt)
     return 0;
 }
 
-static int input_packet_process(Demuxer *d, AVPacket *pkt)
+static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned *send_flags)
 {
     InputFile     *f = &d->f;
     InputStream *ist = f->streams[pkt->stream_index];
@@ -451,6 +443,16 @@  static int input_packet_process(Demuxer *d, AVPacket *pkt)
     if (ret < 0)
         return ret;
 
+    if (f->recording_time != INT64_MAX) {
+        int64_t start_time = 0;
+        if (copy_ts) {
+            start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0;
+            start_time += start_at_zero ? 0 : f->start_time_effective;
+        }
+        if (ds->dts >= f->recording_time + start_time)
+            *send_flags |= DEMUX_SEND_STREAMCOPY_EOF;
+    }
+
     ds->data_size += pkt->size;
     ds->nb_packets++;
 
@@ -465,6 +467,8 @@  static int input_packet_process(Demuxer *d, AVPacket *pkt)
                av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
     }
 
+    pkt->stream_index = ds->sch_idx_stream;
+
     return 0;
 }
 
@@ -488,6 +492,65 @@  static void readrate_sleep(Demuxer *d)
     }
 }
 
+static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags,
+                   const char *pkt_desc)
+{
+    int ret;
+
+    ret = sch_demux_send(d->sch, d->f.index, pkt, flags);
+    if (ret == AVERROR_EOF) {
+        av_packet_unref(pkt);
+
+        av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are done\n");
+        ds->finished = 1;
+
+        if (++d->nb_streams_finished == d->nb_streams_used) {
+            av_log(d, AV_LOG_VERBOSE, "All consumers are done\n");
+            return AVERROR_EOF;
+        }
+    } else if (ret < 0) {
+        if (ret != AVERROR_EXIT)
+            av_log(d, AV_LOG_ERROR,
+                   "Unable to send %s packet to consumers: %s\n",
+                   pkt_desc, av_err2str(ret));
+        return ret;
+    }
+
+    return 0;
+}
+
+static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned flags)
+{
+    InputFile  *f = &d->f;
+    int ret;
+
+    // send heartbeat for sub2video streams
+    if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) {
+        for (int i = 0; i < f->nb_streams; i++) {
+            DemuxStream *ds1 = ds_from_ist(f->streams[i]);
+
+            if (ds1->finished || !ds1->have_sub2video)
+                continue;
+
+            d->pkt_heartbeat->pts          = pkt->pts;
+            d->pkt_heartbeat->time_base    = pkt->time_base;
+            d->pkt_heartbeat->stream_index = ds1->sch_idx_stream;
+            d->pkt_heartbeat->opaque       = (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT;
+
+            ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat");
+            if (ret < 0)
+                return ret;
+        }
+    }
+
+    ret = do_send(d, ds, pkt, flags, "demuxed");
+    if (ret < 0)
+        return ret;
+
+
+    return 0;
+}
+
 static void discard_unused_programs(InputFile *ifile)
 {
     for (int j = 0; j < ifile->ctx->nb_programs; j++) {
@@ -527,9 +590,13 @@  static void *input_thread(void *arg)
 
     discard_unused_programs(f);
 
+    d->read_started    = 1;
     d->wallclock_start = av_gettime_relative();
 
     while (1) {
+        DemuxStream *ds;
+        unsigned send_flags = 0;
+
         ret = av_read_frame(f->ctx, pkt);
 
         if (ret == AVERROR(EAGAIN)) {
@@ -538,11 +605,13 @@  static void *input_thread(void *arg)
         }
         if (ret < 0) {
             if (d->loop) {
-                /* signal looping to the consumer thread */
+                /* signal looping to our consumers */
                 pkt->stream_index = -1;
-                ret = tq_send(d->thread_queue, 0, pkt);
+
+                ret = sch_demux_send(d->sch, f->index, pkt, 0);
                 if (ret >= 0)
-                    ret = seek_to_start(d);
+                    ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts,
+                                                        .tb = pkt->time_base });
                 if (ret >= 0)
                     continue;
 
@@ -551,9 +620,11 @@  static void *input_thread(void *arg)
 
             if (ret == AVERROR_EOF)
                 av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
-            else
+            else {
                 av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
                        av_err2str(ret));
+                ret = exit_on_error ? ret : 0;
+            }
 
             break;
         }
@@ -565,8 +636,9 @@  static void *input_thread(void *arg)
 
         /* the following test is needed in case new streams appear
            dynamically in stream : we ignore them */
-        if (pkt->stream_index >= f->nb_streams ||
-            f->streams[pkt->stream_index]->discard) {
+        ds = pkt->stream_index < f->nb_streams ?
+             ds_from_ist(f->streams[pkt->stream_index]) : NULL;
+        if (!ds || ds->ist.discard || ds->finished) {
             report_new_stream(d, pkt);
             av_packet_unref(pkt);
             continue;
@@ -583,122 +655,26 @@  static void *input_thread(void *arg)
             }
         }
 
-        ret = input_packet_process(d, pkt);
+        ret = input_packet_process(d, pkt, &send_flags);
         if (ret < 0)
             break;
 
         if (f->readrate)
             readrate_sleep(d);
 
-        ret = tq_send(d->thread_queue, 0, pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(f, AV_LOG_ERROR,
-                       "Unable to send packet to main thread: %s\n",
-                       av_err2str(ret));
+        ret = demux_send(d, ds, pkt, send_flags);
+        if (ret < 0)
             break;
-        }
     }
 
+    // EOF/EXIT is normal termination
+    if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
+        ret = 0;
+
 finish:
-    av_assert0(ret < 0);
-    tq_send_finish(d->thread_queue, 0);
-
     av_packet_free(&pkt);
 
-    av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
-
-    return NULL;
-}
-
-static void thread_stop(Demuxer *d)
-{
-    InputFile *f = &d->f;
-
-    if (!d->thread_queue)
-        return;
-
-    tq_receive_finish(d->thread_queue, 0);
-
-    pthread_join(d->thread, NULL);
-
-    tq_free(&d->thread_queue);
-
-    av_thread_message_queue_free(&f->audio_ts_queue);
-}
-
-static int thread_start(Demuxer *d)
-{
-    int ret;
-    InputFile *f = &d->f;
-    ObjPool *op;
-
-    if (d->thread_queue_size <= 0)
-        d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
-    if (!d->thread_queue) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    if (d->loop) {
-        int nb_audio_dec = 0;
-
-        for (int i = 0; i < f->nb_streams; i++) {
-            InputStream *ist = f->streams[i];
-            nb_audio_dec += !!(ist->decoding_needed &&
-                               ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO);
-        }
-
-        if (nb_audio_dec) {
-            ret = av_thread_message_queue_alloc(&f->audio_ts_queue,
-                                                nb_audio_dec, sizeof(Timestamp));
-            if (ret < 0)
-                goto fail;
-            f->audio_ts_queue_size = nb_audio_dec;
-        }
-    }
-
-    if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
-        av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
-        ret = AVERROR(ret);
-        goto fail;
-    }
-
-    d->read_started = 1;
-
-    return 0;
-fail:
-    tq_free(&d->thread_queue);
-    return ret;
-}
-
-int ifile_get_packet(InputFile *f, AVPacket *pkt)
-{
-    Demuxer *d = demuxer_from_ifile(f);
-    int ret, dummy;
-
-    if (!d->thread_queue) {
-        ret = thread_start(d);
-        if (ret < 0)
-            return ret;
-    }
-
-    ret = tq_receive(d->thread_queue, &dummy, pkt);
-    if (ret < 0)
-        return ret;
-
-    if (pkt->stream_index == -1) {
-        av_assert0(!pkt->data && !pkt->side_data_elems);
-        return 1;
-    }
-
-    return 0;
+    return (void*)(intptr_t)ret;
 }
 
 static void demux_final_stats(Demuxer *d)
@@ -769,8 +745,6 @@  void ifile_close(InputFile **pf)
     if (!f)
         return;
 
-    thread_stop(d);
-
     if (d->read_started)
         demux_final_stats(d);
 
@@ -780,6 +754,8 @@  void ifile_close(InputFile **pf)
 
     avformat_close_input(&f->ctx);
 
+    av_packet_free(&d->pkt_heartbeat);
+
     av_freep(pf);
 }
 
@@ -802,7 +778,11 @@  static int ist_use(InputStream *ist, int decoding_needed)
         ds->sch_idx_stream = ret;
     }
 
-    ist->discard          = 0;
+    if (ist->discard) {
+        ist->discard = 0;
+        d->nb_streams_used++;
+    }
+
     ist->st->discard      = ist->user_set_discard;
     ist->decoding_needed |= decoding_needed;
     ds->streamcopy_needed |= !decoding_needed;
@@ -823,6 +803,8 @@  static int ist_use(InputStream *ist, int decoding_needed)
         ret = dec_open(ist, d->sch, ds->sch_idx_dec);
         if (ret < 0)
             return ret;
+
+        d->have_audio_dec |= is_audio;
     }
 
     return 0;
@@ -848,6 +830,7 @@  int ist_output_add(InputStream *ist, OutputStream *ost)
 
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
 {
+    Demuxer      *d = demuxer_from_ifile(input_files[ist->file_index]);
     DemuxStream *ds = ds_from_ist(ist);
     int ret;
 
@@ -866,6 +849,15 @@  int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple)
     if (ret < 0)
         return ret;
 
+    if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) {
+        if (!d->pkt_heartbeat) {
+            d->pkt_heartbeat = av_packet_alloc();
+            if (!d->pkt_heartbeat)
+                return AVERROR(ENOMEM);
+        }
+        ds->have_sub2video = 1;
+    }
+
     return ds->sch_idx_dec;
 }
 
@@ -1607,8 +1599,6 @@  int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                "since neither -readrate nor -re were given\n");
     }
 
-    d->thread_queue_size = o->thread_queue_size;
-
     /* Add all the streams from the given input file to the demuxer */
     for (int i = 0; i < ic->nb_streams; i++) {
         ret = ist_add(o, d, ic->streams[i]);
diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c
index 9871381c0e..9383b167f7 100644
--- a/fftools/ffmpeg_enc.c
+++ b/fftools/ffmpeg_enc.c
@@ -41,12 +41,6 @@ 
 #include "libavformat/avformat.h"
 
 struct Encoder {
-    AVFrame *sq_frame;
-
-    // packet for receiving encoded output
-    AVPacket *pkt;
-    AVFrame  *sub_frame;
-
     // combined size of all the packets received from the encoder
     uint64_t data_size;
 
@@ -54,25 +48,9 @@  struct Encoder {
     uint64_t packets_encoded;
 
     int opened;
-    int finished;
 
     Scheduler      *sch;
     unsigned        sch_idx;
-
-    pthread_t       thread;
-    /**
-     * Queue for sending frames from the main thread to
-     * the encoder thread.
-     */
-    ThreadQueue    *queue_in;
-    /**
-     * Queue for sending encoded packets from the encoder thread
-     * to the main thread.
-     *
-     * An empty packet is sent to signal that a previously sent
-     * frame has been fully processed.
-     */
-    ThreadQueue    *queue_out;
 };
 
 // data that is local to the decoder thread and not visible outside of it
@@ -81,24 +59,6 @@  typedef struct EncoderThread {
     AVPacket  *pkt;
 } EncoderThread;
 
-static int enc_thread_stop(Encoder *e)
-{
-    void *ret;
-
-    if (!e->queue_in)
-        return 0;
-
-    tq_send_finish(e->queue_in, 0);
-    tq_receive_finish(e->queue_out, 0);
-
-    pthread_join(e->thread, &ret);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void enc_free(Encoder **penc)
 {
     Encoder *enc = *penc;
@@ -106,13 +66,6 @@  void enc_free(Encoder **penc)
     if (!enc)
         return;
 
-    enc_thread_stop(enc);
-
-    av_frame_free(&enc->sq_frame);
-    av_frame_free(&enc->sub_frame);
-
-    av_packet_free(&enc->pkt);
-
     av_freep(penc);
 }
 
@@ -127,25 +80,12 @@  int enc_alloc(Encoder **penc, const AVCodec *codec,
     if (!enc)
         return AVERROR(ENOMEM);
 
-    if (codec->type == AVMEDIA_TYPE_SUBTITLE) {
-        enc->sub_frame = av_frame_alloc();
-        if (!enc->sub_frame)
-            goto fail;
-    }
-
-    enc->pkt = av_packet_alloc();
-    if (!enc->pkt)
-        goto fail;
-
     enc->sch     = sch;
     enc->sch_idx = sch_idx;
 
     *penc = enc;
 
     return 0;
-fail:
-    enc_free(&enc);
-    return AVERROR(ENOMEM);
 }
 
 static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef *frames_ref)
@@ -224,52 +164,9 @@  static int set_encoder_id(OutputFile *of, OutputStream *ost)
     return 0;
 }
 
-static int enc_thread_start(OutputStream *ost)
-{
-    Encoder *e = ost->enc;
-    ObjPool *op;
-    int ret = 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    e->queue_in = tq_alloc(1, 1, op, frame_move);
-    if (!e->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    op = objpool_alloc_packets();
-    if (!op)
-        goto fail;
-
-    e->queue_out = tq_alloc(1, 4, op, pkt_move);
-    if (!e->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&e->thread, NULL, encoder_thread, ost);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n",
-               av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&e->queue_in);
-    tq_free(&e->queue_out);
-    return ret;
-}
-
-int enc_open(OutputStream *ost, const AVFrame *frame)
+int enc_open(void *opaque, const AVFrame *frame)
 {
+    OutputStream *ost = opaque;
     InputStream *ist = ost->ist;
     Encoder              *e = ost->enc;
     AVCodecContext *enc_ctx = ost->enc_ctx;
@@ -277,6 +174,7 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     const AVCodec      *enc = enc_ctx->codec;
     OutputFile      *of = output_files[ost->file_index];
     FrameData *fd;
+    int frame_samples = 0;
     int ret;
 
     if (e->opened)
@@ -420,17 +318,8 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
 
     e->opened = 1;
 
-    if (ost->sq_idx_encode >= 0) {
-        e->sq_frame = av_frame_alloc();
-        if (!e->sq_frame)
-            return AVERROR(ENOMEM);
-    }
-
-    if (ost->enc_ctx->frame_size) {
-        av_assert0(ost->sq_idx_encode >= 0);
-        sq_frame_samples(output_files[ost->file_index]->sq_encode,
-                         ost->sq_idx_encode, ost->enc_ctx->frame_size);
-    }
+    if (ost->enc_ctx->frame_size)
+        frame_samples = ost->enc_ctx->frame_size;
 
     ret = check_avoptions(ost->encoder_opts);
     if (ret < 0)
@@ -476,18 +365,11 @@  int enc_open(OutputStream *ost, const AVFrame *frame)
     if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0)
         ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1});
 
-    ret = enc_thread_start(ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n",
-               av_err2str(ret));
-        return ret;
-    }
-
     ret = of_stream_init(of, ost);
     if (ret < 0)
         return ret;
 
-    return 0;
+    return frame_samples;
 }
 
 static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb)
@@ -514,8 +396,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n");
         return exit_on_error ? AVERROR(EINVAL) : 0;
     }
-    if (ost->finished ||
-        (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
+    if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time))
         return 0;
 
     enc = ost->enc_ctx;
@@ -579,7 +460,7 @@  static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *
         }
         pkt->dts = pkt->pts;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -671,10 +552,13 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     int64_t frame_number;
     double ti1, bitrate, avg_bitrate;
     double psnr_val = -1;
+    int quality;
 
-    ost->quality   = sd ? AV_RL32(sd) : -1;
+    quality        = sd ? AV_RL32(sd) : -1;
     pict_type      = sd ? sd[4] : AV_PICTURE_TYPE_NONE;
 
+    atomic_store(&ost->quality, quality);
+
     if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) {
         // FIXME the scaling assumes 8bit
         double error = AV_RL64(sd + 8) / (enc->width * enc->height * 255.0 * 255.0);
@@ -697,10 +581,10 @@  static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_
     frame_number = e->packets_encoded;
     if (vstats_version <= 1) {
         fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     } else  {
         fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f ", ost->file_index, ost->index, frame_number,
-                ost->quality / (float)FF_QP2LAMBDA);
+                quality / (float)FF_QP2LAMBDA);
     }
 
     if (psnr_val >= 0)
@@ -801,18 +685,11 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
                    av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base));
         }
 
-        if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Subtitle heartbeat logic failed in %s! (%s)\n",
-                   __func__, av_err2str(ret));
-            return ret;
-        }
-
         e->data_size += pkt->size;
 
         e->packets_encoded++;
 
-        ret = tq_send(e->queue_out, 0, pkt);
+        ret = sch_enc_send(e->sch, e->sch_idx, pkt);
         if (ret < 0) {
             av_packet_unref(pkt);
             return ret;
@@ -822,50 +699,6 @@  static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame,
     av_assert0(0);
 }
 
-static int submit_encode_frame(OutputFile *of, OutputStream *ost,
-                               AVFrame *frame, AVPacket *pkt)
-{
-    Encoder *e = ost->enc;
-    int ret;
-
-    if (ost->sq_idx_encode < 0)
-        return encode_frame(of, ost, frame, pkt);
-
-    if (frame) {
-        ret = av_frame_ref(e->sq_frame, frame);
-        if (ret < 0)
-            return ret;
-        frame = e->sq_frame;
-    }
-
-    ret = sq_send(of->sq_encode, ost->sq_idx_encode,
-                  SQFRAME(frame));
-    if (ret < 0) {
-        if (frame)
-            av_frame_unref(frame);
-        if (ret != AVERROR_EOF)
-            return ret;
-    }
-
-    while (1) {
-        AVFrame *enc_frame = e->sq_frame;
-
-        ret = sq_receive(of->sq_encode, ost->sq_idx_encode,
-                               SQFRAME(enc_frame));
-        if (ret == AVERROR_EOF) {
-            enc_frame = NULL;
-        } else if (ret < 0) {
-            return (ret == AVERROR(EAGAIN)) ? 0 : ret;
-        }
-
-        ret = encode_frame(of, ost, enc_frame, pkt);
-        if (enc_frame)
-            av_frame_unref(enc_frame);
-        if (ret < 0)
-            return ret;
-    }
-}
-
 static int do_audio_out(OutputFile *of, OutputStream *ost,
                         AVFrame *frame, AVPacket *pkt)
 {
@@ -881,7 +714,7 @@  static int do_audio_out(OutputFile *of, OutputStream *ost,
     if (!check_recording_time(ost, frame->pts, frame->time_base))
         return AVERROR_EOF;
 
-    return submit_encode_frame(of, ost, frame, pkt);
+    return encode_frame(of, ost, frame, pkt);
 }
 
 static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf,
@@ -949,7 +782,7 @@  static int do_video_out(OutputFile *of, OutputStream *ost,
     }
 #endif
 
-    return submit_encode_frame(of, ost, in_picture, pkt);
+    return encode_frame(of, ost, in_picture, pkt);
 }
 
 static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
@@ -958,9 +791,12 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
     enum AVMediaType type = ost->type;
 
     if (type == AVMEDIA_TYPE_SUBTITLE) {
+        const AVSubtitle *subtitle = frame && frame->buf[0] ?
+                                     (AVSubtitle*)frame->buf[0]->data : NULL;
+
         // no flushing for subtitles
-        return frame ?
-               do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0;
+        return subtitle && subtitle->num_rects ?
+               do_subtitle_out(of, ost, subtitle, pkt) : 0;
     }
 
     if (frame) {
@@ -968,7 +804,7 @@  static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt)
                                               do_audio_out(of, ost, frame, pkt);
     }
 
-    return submit_encode_frame(of, ost, NULL, pkt);
+    return  encode_frame(of, ost, NULL, pkt);
 }
 
 static void enc_thread_set_name(const OutputStream *ost)
@@ -1009,24 +845,50 @@  fail:
 void *encoder_thread(void *arg)
 {
     OutputStream *ost = arg;
-    OutputFile    *of = output_files[ost->file_index];
     Encoder        *e = ost->enc;
     EncoderThread et;
     int ret = 0, input_status = 0;
+    int name_set = 0;
 
     ret = enc_thread_init(&et);
     if (ret < 0)
         goto finish;
 
-    enc_thread_set_name(ost);
+    /* Open the subtitle encoders immediately. AVFrame-based encoders
+     * are opened through a callback from the scheduler once they get
+     * their first frame
+     *
+     * N.B.: because the callback is called from a different thread,
+     * enc_ctx MUST NOT be accessed before sch_enc_receive() returns
+     * for the first time for audio/video. */
+    if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != AVMEDIA_TYPE_AUDIO) {
+        ret = enc_open(ost, NULL);
+        if (ret < 0)
+            goto finish;
+    }
 
     while (!input_status) {
-        int dummy;
-
-        input_status = tq_receive(e->queue_in, &dummy, et.frame);
-        if (input_status < 0)
+        input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n");
 
+            if (!e->opened) {
+                av_log(ost, AV_LOG_ERROR, "Could not open encoder before EOF\n");
+                ret = AVERROR(EINVAL);
+                goto finish;
+            }
+        } else if (input_status < 0) {
+            ret = input_status;
+            av_log(ost, AV_LOG_ERROR, "Error receiving a frame for encoding: %s\n",
+                   av_err2str(ret));
+            goto finish;
+        }
+
+        if (!name_set) {
+            enc_thread_set_name(ost);
+            name_set = 1;
+        }
+
         ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt);
 
         av_packet_unref(et.pkt);
@@ -1040,15 +902,6 @@  void *encoder_thread(void *arg)
                        av_err2str(ret));
             break;
         }
-
-        // signal to the consumer thread that the frame was encoded
-        ret = tq_send(e->queue_out, 0, et.pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
-                av_log(ost, AV_LOG_ERROR,
-                       "Error communicating with the main thread\n");
-            break;
-        }
     }
 
     // EOF is normal thread termination
@@ -1056,118 +909,7 @@  void *encoder_thread(void *arg)
         ret = 0;
 
 finish:
-    if (ost->sq_idx_encode >= 0)
-        sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-
-    tq_receive_finish(e->queue_in,  0);
-    tq_send_finish   (e->queue_out, 0);
-
     enc_thread_uninit(&et);
 
-    av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n");
-
     return (void*)(intptr_t)ret;
 }
-
-int enc_frame(OutputStream *ost, AVFrame *frame)
-{
-    OutputFile *of = output_files[ost->file_index];
-    Encoder     *e = ost->enc;
-    int ret, thread_ret;
-
-    ret = enc_open(ost, frame);
-    if (ret < 0)
-        return ret;
-
-    if (!e->queue_in)
-        return AVERROR_EOF;
-
-    // send the frame/EOF to the encoder thread
-    if (frame) {
-        ret = tq_send(e->queue_in, 0, frame);
-        if (ret < 0)
-            goto finish;
-    } else
-        tq_send_finish(e->queue_in, 0);
-
-    // retrieve all encoded data for the frame
-    while (1) {
-        int dummy;
-
-        ret = tq_receive(e->queue_out, &dummy, e->pkt);
-        if (ret < 0)
-            break;
-
-        // frame fully encoded
-        if (!e->pkt->data && !e->pkt->side_data_elems)
-            return 0;
-
-        // process the encoded packet
-        ret = of_output_packet(of, ost, e->pkt);
-        if (ret < 0)
-            goto finish;
-    }
-
-finish:
-    thread_ret = enc_thread_stop(e);
-    if (thread_ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n",
-               av_err2str(thread_ret));
-        ret = err_merge(ret, thread_ret);
-    }
-
-    if (ret < 0 && ret != AVERROR_EOF)
-        return ret;
-
-    // signal EOF to the muxer
-    return of_output_packet(of, ost, NULL);
-}
-
-int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub)
-{
-    Encoder *e = ost->enc;
-    AVFrame *f = e->sub_frame;
-    int ret;
-
-    // XXX the queue for transferring data to the encoder thread runs
-    // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put
-    // that inside the frame
-    // eventually, subtitles should be switched to use AVFrames natively
-    ret = subtitle_wrap_frame(f, sub, 1);
-    if (ret < 0)
-        return ret;
-
-    ret = enc_frame(ost, f);
-    av_frame_unref(f);
-
-    return ret;
-}
-
-int enc_flush(void)
-{
-    int ret = 0;
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        OutputFile      *of = output_files[ost->file_index];
-        if (ost->sq_idx_encode >= 0)
-            sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL));
-    }
-
-    for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) {
-        Encoder          *e = ost->enc;
-        AVCodecContext *enc = ost->enc_ctx;
-        int err;
-
-        if (!enc || !e->opened ||
-            (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO))
-            continue;
-
-        err = enc_frame(ost, NULL);
-        if (err != AVERROR_EOF && ret < 0)
-            ret = err_merge(ret, err);
-
-        av_assert0(!e->queue_in);
-    }
-
-    return ret;
-}
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 635b1b0b6e..ada235b084 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -21,8 +21,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg.h"
-#include "ffmpeg_utils.h"
-#include "thread_queue.h"
 
 #include "libavfilter/avfilter.h"
 #include "libavfilter/buffersink.h"
@@ -53,10 +51,11 @@  typedef struct FilterGraphPriv {
     // true when the filtergraph contains only meta filters
     // that do not modify the frame data
     int is_meta;
+    // source filters are present in the graph
+    int have_sources;
     int disable_conversions;
 
-    int nb_inputs_bound;
-    int nb_outputs_bound;
+    unsigned nb_outputs_done;
 
     const char *graph_desc;
 
@@ -67,41 +66,6 @@  typedef struct FilterGraphPriv {
 
     Scheduler       *sch;
     unsigned         sch_idx;
-
-    pthread_t        thread;
-    /**
-     * Queue for sending frames from the main thread to the filtergraph. Has
-     * nb_inputs+1 streams - the first nb_inputs stream correspond to
-     * filtergraph inputs. Frames on those streams may have their opaque set to
-     * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the
-     *   EOF event for the correspondint stream. Will be immediately followed by
-     *   this stream being send-closed.
-     * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of
-     *   a subtitle heartbeat event. Will only be sent for sub2video streams.
-     *
-     * The last stream is "control" - the main thread sends empty AVFrames with
-     * opaque set to
-     * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame available
-     *   from filtergraph outputs. These frames are sent to corresponding
-     *   streams in queue_out. Finally an empty frame is sent to the control
-     *   stream in queue_out.
-     * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames are
-     *   available the terminating empty frame's opaque will contain the index+1
-     *   of the filtergraph input to which more input frames should be supplied.
-     */
-    ThreadQueue     *queue_in;
-    /**
-     * Queue for sending frames from the filtergraph back to the main thread.
-     * Has nb_outputs+1 streams - the first nb_outputs stream correspond to
-     * filtergraph outputs.
-     *
-     * The last stream is "control" - see documentation for queue_in for more
-     * details.
-     */
-    ThreadQueue     *queue_out;
-    // submitting frames to filter thread returned EOF
-    // this only happens on thread exit, so is not per-input
-    int              eof_in;
 } FilterGraphPriv;
 
 static FilterGraphPriv *fgp_from_fg(FilterGraph *fg)
@@ -123,6 +87,9 @@  typedef struct FilterGraphThread {
     // The output index is stored in frame opaque.
     AVFifo  *frame_queue_out;
 
+    // index of the next input to request from the scheduler
+    unsigned next_in;
+    // set to 1 after at least one frame passed through this output
     int      got_frame;
 
     // EOF status of each input/output, as received by the thread
@@ -253,9 +220,6 @@  typedef struct OutputFilterPriv {
     int64_t ts_offset;
     int64_t next_pts;
     FPSConvContext fps;
-
-    // set to 1 after at least one frame passed through this output
-    int got_frame;
 } OutputFilterPriv;
 
 static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter)
@@ -653,57 +617,6 @@  static int ifilter_has_all_input_formats(FilterGraph *fg)
 
 static void *filter_thread(void *arg);
 
-// start the filtering thread once all inputs and outputs are bound
-static int fg_thread_try_start(FilterGraphPriv *fgp)
-{
-    FilterGraph *fg = &fgp->fg;
-    ObjPool *op;
-    int ret = 0;
-
-    if (fgp->nb_inputs_bound  < fg->nb_inputs ||
-        fgp->nb_outputs_bound < fg->nb_outputs)
-        return 0;
-
-    op = objpool_alloc_frames();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move);
-    if (!fgp->queue_in) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    // at least one output is mandatory
-    op = objpool_alloc_frames();
-    if (!op)
-        goto fail;
-
-    fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move);
-    if (!fgp->queue_out) {
-        objpool_free(&op);
-        goto fail;
-    }
-
-    ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp);
-    if (ret) {
-        ret = AVERROR(ret);
-        av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n",
-               fg->index, av_err2str(ret));
-        goto fail;
-    }
-
-    return 0;
-fail:
-    if (ret >= 0)
-        ret = AVERROR(ENOMEM);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return ret;
-}
-
 static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in)
 {
     AVFilterContext *ctx = inout->filter_ctx;
@@ -729,7 +642,6 @@  static OutputFilter *ofilter_alloc(FilterGraph *fg)
     ofilter->graph    = fg;
     ofp->format       = -1;
     ofp->index        = fg->nb_outputs - 1;
-    ofilter->last_pts = AV_NOPTS_VALUE;
 
     return ofilter;
 }
@@ -760,10 +672,7 @@  static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist)
             return AVERROR(ENOMEM);
     }
 
-    fgp->nb_inputs_bound++;
-    av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost)
@@ -902,10 +811,7 @@  int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost,
     if (ret < 0)
         return ret;
 
-    fgp->nb_outputs_bound++;
-    av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs);
-
-    return fg_thread_try_start(fgp);
+    return 0;
 }
 
 static InputFilter *ifilter_alloc(FilterGraph *fg)
@@ -935,34 +841,6 @@  static InputFilter *ifilter_alloc(FilterGraph *fg)
     return ifilter;
 }
 
-static int fg_thread_stop(FilterGraphPriv *fgp)
-{
-    void *ret;
-
-    if (!fgp->queue_in)
-        return 0;
-
-    for (int i = 0; i <= fgp->fg.nb_inputs; i++) {
-        InputFilterPriv *ifp = i < fgp->fg.nb_inputs ?
-                               ifp_from_ifilter(fgp->fg.inputs[i]) : NULL;
-
-        if (ifp)
-            ifp->eof = 1;
-
-        tq_send_finish(fgp->queue_in, i);
-    }
-
-    for (int i = 0; i <= fgp->fg.nb_outputs; i++)
-        tq_receive_finish(fgp->queue_out, i);
-
-    pthread_join(fgp->thread, &ret);
-
-    tq_free(&fgp->queue_in);
-    tq_free(&fgp->queue_out);
-
-    return (int)(intptr_t)ret;
-}
-
 void fg_free(FilterGraph **pfg)
 {
     FilterGraph *fg = *pfg;
@@ -972,8 +850,6 @@  void fg_free(FilterGraph **pfg)
         return;
     fgp = fgp_from_fg(fg);
 
-    fg_thread_stop(fgp);
-
     avfilter_graph_free(&fg->graph);
     for (int j = 0; j < fg->nb_inputs; j++) {
         InputFilter *ifilter = fg->inputs[j];
@@ -1072,6 +948,15 @@  int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch)
     if (ret < 0)
         goto fail;
 
+    for (unsigned i = 0; i < graph->nb_filters; i++) {
+        const AVFilter *f = graph->filters[i]->filter;
+        if (!avfilter_filter_pad_count(f, 0) &&
+            !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) {
+            fgp->have_sources = 1;
+            break;
+        }
+    }
+
     for (AVFilterInOut *cur = inputs; cur; cur = cur->next) {
         InputFilter *const ifilter = ifilter_alloc(fg);
         InputFilterPriv       *ifp;
@@ -1800,6 +1685,7 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
     AVBufferRef *hw_device;
     AVFilterInOut *inputs, *outputs, *cur;
     int ret, i, simple = filtergraph_is_simple(fg);
+    int have_input_eof = 0;
     const char *graph_desc = fgp->graph_desc;
 
     cleanup_filtergraph(fg);
@@ -1922,11 +1808,18 @@  static int configure_filtergraph(FilterGraph *fg, const FilterGraphThread *fgt)
             ret = av_buffersrc_add_frame(ifp->filter, NULL);
             if (ret < 0)
                 goto fail;
+            have_input_eof = 1;
         }
     }
 
-    return 0;
+    if (have_input_eof) {
+        // make sure the EOF propagates to the end of the graph
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF)
+            goto fail;
+    }
 
+    return 0;
 fail:
     cleanup_filtergraph(fg);
     return ret;
@@ -2182,7 +2075,7 @@  static void video_sync_process(OutputFilterPriv *ofp, AVFrame *frame,
                                                 fps->frames_prev_hist[2]);
 
         if (!*nb_frames && fps->last_dropped) {
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             fps->last_dropped++;
         }
 
@@ -2260,21 +2153,23 @@  finish:
     fps->frames_prev_hist[0] = *nb_frames_prev;
 
     if (*nb_frames_prev == 0 && fps->last_dropped) {
-        ofilter->nb_frames_drop++;
+        atomic_fetch_add(&ofilter->nb_frames_drop, 1);
         av_log(ost, AV_LOG_VERBOSE,
                "*** dropping frame %"PRId64" at ts %"PRId64"\n",
                fps->frame_number, fps->last_frame->pts);
     }
     if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > *nb_frames_prev)) {
+        uint64_t nb_frames_dup;
         if (*nb_frames > dts_error_threshold * 30) {
             av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too large, skipping\n", *nb_frames - 1);
-            ofilter->nb_frames_drop++;
+            atomic_fetch_add(&ofilter->nb_frames_drop, 1);
             *nb_frames = 0;
             return;
         }
-        ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev);
+        nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup,
+                                         *nb_frames - (*nb_frames_prev && fps->last_dropped) - (*nb_frames > *nb_frames_prev));
         av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - 1);
-        if (ofilter->nb_frames_dup > fps->dup_warning) {
+        if (nb_frames_dup > fps->dup_warning) {
             av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames duplicated\n", fps->dup_warning);
             fps->dup_warning *= 10;
         }
@@ -2284,8 +2179,57 @@  finish:
     fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY);
 }
 
+static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt)
+{
+    FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph);
+    int ret;
+
+    // we are finished and no frames were ever seen at this output,
+    // at least initialize the encoder with a dummy frame
+    if (!fgt->got_frame) {
+        AVFrame *frame = fgt->frame;
+        FrameData *fd;
+
+        frame->time_base   = ofp->tb_out;
+        frame->format      = ofp->format;
+
+        frame->width               = ofp->width;
+        frame->height              = ofp->height;
+        frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
+
+        frame->sample_rate = ofp->sample_rate;
+        if (ofp->ch_layout.nb_channels) {
+            ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
+            if (ret < 0)
+                return ret;
+        }
+
+        fd = frame_data(frame);
+        if (!fd)
+            return AVERROR(ENOMEM);
+
+        fd->frame_rate_filter = ofp->fps.framerate;
+
+        av_assert0(!frame->buf[0]);
+
+        av_log(ofp->ofilter.ost, AV_LOG_WARNING,
+               "No filtered frames for output stream, trying to "
+               "initialize anyway.\n");
+
+        ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame);
+        if (ret < 0) {
+            av_frame_unref(frame);
+            return ret;
+        }
+    }
+
+    fgt->eof_out[ofp->index] = 1;
+
+    return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL);
+}
+
 static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                           AVFrame *frame, int buffer)
+                           AVFrame *frame)
 {
     FilterGraphPriv  *fgp = fgp_from_fg(ofp->ofilter.graph);
     AVFrame   *frame_prev = ofp->fps.last_frame;
@@ -2332,28 +2276,17 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
             frame_out = frame;
         }
 
-        if (buffer) {
-            AVFrame *f = av_frame_alloc();
-
-            if (!f) {
-                av_frame_unref(frame_out);
-                return AVERROR(ENOMEM);
-            }
-
-            av_frame_move_ref(f, frame_out);
-            f->opaque = (void*)(intptr_t)ofp->index;
-
-            ret = av_fifo_write(fgt->frame_queue_out, &f, 1);
-            if (ret < 0) {
-                av_frame_free(&f);
-                return AVERROR(ENOMEM);
-            }
-        } else {
-            // return the frame to the main thread
-            ret = tq_send(fgp->queue_out, ofp->index, frame_out);
+        {
+            // send the frame to consumers
+            ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out);
             if (ret < 0) {
                 av_frame_unref(frame_out);
-                fgt->eof_out[ofp->index] = 1;
+
+                if (!fgt->eof_out[ofp->index]) {
+                    fgt->eof_out[ofp->index] = 1;
+                    fgp->nb_outputs_done++;
+                }
+
                 return ret == AVERROR_EOF ? 0 : ret;
             }
         }
@@ -2374,16 +2307,14 @@  static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         av_frame_move_ref(frame_prev, frame);
     }
 
-    if (!frame) {
-        tq_send_finish(fgp->queue_out, ofp->index);
-        fgt->eof_out[ofp->index] = 1;
-    }
+    if (!frame)
+        return close_output(ofp, fgt);
 
     return 0;
 }
 
 static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
-                          AVFrame *frame,  int buffer)
+                          AVFrame *frame)
 {
     FilterGraphPriv    *fgp = fgp_from_fg(ofp->ofilter.graph);
     OutputStream       *ost = ofp->ofilter.ost;
@@ -2393,8 +2324,8 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
 
     ret = av_buffersink_get_frame_flags(filter, frame,
                                         AV_BUFFERSINK_FLAG_NO_REQUEST);
-    if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) {
-        ret = fg_output_frame(ofp, fgt, NULL, buffer);
+    if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) {
+        ret = fg_output_frame(ofp, fgt, NULL);
         return (ret < 0) ? ret : 1;
     } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) {
         return 1;
@@ -2448,7 +2379,7 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
         fd->frame_rate_filter = ofp->fps.framerate;
     }
 
-    ret = fg_output_frame(ofp, fgt, frame, buffer);
+    ret = fg_output_frame(ofp, fgt, frame);
     av_frame_unref(frame);
     if (ret < 0)
         return ret;
@@ -2456,44 +2387,68 @@  static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt,
     return 0;
 }
 
-/* retrieve all frames available at filtergraph outputs and either send them to
- * the main thread (buffer=0) or buffer them for later (buffer=1) */
+/* retrieve all frames available at filtergraph outputs
+ * and send them to consumers */
 static int read_frames(FilterGraph *fg, FilterGraphThread *fgt,
-                       AVFrame *frame, int buffer)
+                       AVFrame *frame)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
-    int ret = 0;
+    int did_step = 0;
 
-    if (!fg->graph)
-        return 0;
-
-    // process buffered frames
-    if (!buffer) {
-        AVFrame *f;
-
-        while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) {
-            int out_idx = (intptr_t)f->opaque;
-            f->opaque = NULL;
-            ret = tq_send(fgp->queue_out, out_idx, f);
-            av_frame_free(&f);
-            if (ret < 0 && ret != AVERROR_EOF)
-                return ret;
+    // graph not configured, just select the input to request
+    if (!fg->graph) {
+        for (int i = 0; i < fg->nb_inputs; i++) {
+            InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]);
+            if (ifp->format < 0 && !fgt->eof_in[i]) {
+                fgt->next_in = i;
+                return 0;
+            }
         }
+
+        // This state - graph is not configured, but all inputs are either
+        // initialized or EOF - should be unreachable because sending EOF to a
+        // filter without even a fallback format should fail
+        av_assert0(0);
+        return AVERROR_BUG;
     }
 
-    /* Reap all buffers present in the buffer sinks */
-    for (int i = 0; i < fg->nb_outputs; i++) {
-        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
-        int ret = 0;
+    while (fgp->nb_outputs_done < fg->nb_outputs) {
+        int ret;
 
-        while (!ret) {
-            ret = fg_output_step(ofp, fgt, frame, buffer);
-            if (ret < 0)
-                return ret;
+        ret = avfilter_graph_request_oldest(fg->graph);
+        if (ret == AVERROR(EAGAIN)) {
+            fgt->next_in = choose_input(fg, fgt);
+            break;
+        } else if (ret < 0) {
+            if (ret == AVERROR_EOF)
+                av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, finishing\n");
+            else
+                av_log(fg, AV_LOG_ERROR,
+                       "Error requesting a frame from the filtergraph: %s\n",
+                       av_err2str(ret));
+            return ret;
         }
-    }
+        fgt->next_in = fg->nb_inputs;
 
-    return 0;
+        // return after one iteration, so that scheduler can rate-control us
+        if (did_step && fgp->have_sources)
+            return 0;
+
+        /* Reap all buffers present in the buffer sinks */
+        for (int i = 0; i < fg->nb_outputs; i++) {
+            OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+            ret = 0;
+            while (!ret) {
+                ret = fg_output_step(ofp, fgt, frame);
+                if (ret < 0)
+                    return ret;
+            }
+        }
+        did_step = 1;
+    };
+
+    return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0;
 }
 
 static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
@@ -2571,6 +2526,9 @@  static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter,
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     int ret;
 
+    if (fgt->eof_in[ifp->index])
+       return 0;
+
     fgt->eof_in[ifp->index] = 1;
 
     if (ifp->filter) {
@@ -2672,7 +2630,7 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
             return ret;
         }
 
-        ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0;
+        ret = fg->graph ? read_frames(fg, fgt, tmp) : 0;
         av_frame_free(&tmp);
         if (ret < 0)
             return ret;
@@ -2705,82 +2663,6 @@  static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
     return 0;
 }
 
-static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt,
-                       AVFrame *frame)
-{
-    const enum FrameOpaque msg = (intptr_t)frame->opaque;
-    FilterGraph            *fg = &fgp->fg;
-    int              graph_eof = 0;
-    int ret;
-
-    frame->opaque = NULL;
-    av_assert0(msg > 0);
-    av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]);
-
-    if (!fg->graph) {
-        // graph not configured yet, ignore all messages other than choosing
-        // the input to read from
-        if (msg != FRAME_OPAQUE_CHOOSE_INPUT) {
-            av_frame_unref(frame);
-            goto done;
-        }
-
-        for (int i = 0; i < fg->nb_inputs; i++) {
-            InputFilter *ifilter = fg->inputs[i];
-            InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-            if (ifp->format < 0 && !fgt->eof_in[i]) {
-                frame->opaque = (void*)(intptr_t)(i + 1);
-                goto done;
-            }
-        }
-
-        // This state - graph is not configured, but all inputs are either
-        // initialized or EOF - should be unreachable because sending EOF to a
-        // filter without even a fallback format should fail
-        av_assert0(0);
-        return AVERROR_BUG;
-    }
-
-    if (msg == FRAME_OPAQUE_SEND_COMMAND) {
-        FilterCommand *fc = (FilterCommand*)frame->buf[0]->data;
-        send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters);
-        av_frame_unref(frame);
-        goto done;
-    }
-
-    if (msg == FRAME_OPAQUE_CHOOSE_INPUT) {
-        ret = avfilter_graph_request_oldest(fg->graph);
-
-        graph_eof = ret == AVERROR_EOF;
-
-        if (ret == AVERROR(EAGAIN)) {
-            frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1);
-            goto done;
-        } else if (ret < 0 && !graph_eof)
-            return ret;
-    }
-
-    ret = read_frames(fg, fgt, frame, 0);
-    if (ret < 0) {
-        av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for encoding\n");
-        return ret;
-    }
-
-    if (graph_eof)
-        return AVERROR_EOF;
-
-    // signal to the main thread that we are done processing the message
-done:
-    ret = tq_send(fgp->queue_out, fg->nb_outputs, frame);
-    if (ret < 0) {
-        if (ret != AVERROR_EOF)
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
-        return ret;
-    }
-
-    return 0;
-}
-
 static void fg_thread_set_name(const FilterGraph *fg)
 {
     char name[16];
@@ -2867,294 +2749,94 @@  static void *filter_thread(void *arg)
         InputFilter *ifilter;
         InputFilterPriv *ifp;
         enum FrameOpaque o;
-        int input_idx, eof_frame;
+        unsigned input_idx = fgt.next_in;
 
-        input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-        if (input_idx < 0 ||
-            (input_idx == fg->nb_inputs && input_status < 0)) {
+        input_status = sch_filter_receive(fgp->sch, fgp->sch_idx,
+                                          &input_idx, fgt.frame);
+        if (input_status == AVERROR_EOF) {
             av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n");
             break;
+        } else if (input_status == AVERROR(EAGAIN)) {
+            // should only happen when we didn't request any input
+            av_assert0(input_idx == fg->nb_inputs);
+            goto read_frames;
         }
+        av_assert0(input_status >= 0);
+
+        o = (intptr_t)fgt.frame->opaque;
 
         o = (intptr_t)fgt.frame->opaque;
 
         // message on the control stream
         if (input_idx == fg->nb_inputs) {
-            ret = msg_process(fgp, &fgt, fgt.frame);
-            if (ret < 0)
-                goto finish;
+            FilterCommand *fc;
 
+            av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && fgt.frame->buf[0]);
+
+            fc = (FilterCommand*)fgt.frame->buf[0]->data;
+            send_command(fg, fc->time, fc->target, fc->command, fc->arg,
+                         fc->all_filters);
+            av_frame_unref(fgt.frame);
             continue;
         }
 
         // we received an input frame or EOF
         ifilter   = fg->inputs[input_idx];
         ifp       = ifp_from_ifilter(ifilter);
-        eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF;
+
         if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) {
             int hb_frame = input_status >= 0 && o == FRAME_OPAQUE_SUB_HEARTBEAT;
             ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || hb_frame) ? fgt.frame : NULL);
-        } else if (input_status >= 0 && fgt.frame->buf[0]) {
+        } else if (fgt.frame->buf[0]) {
             ret = send_frame(fg, &fgt, ifilter, fgt.frame);
         } else {
-            int64_t   pts = input_status >= 0 ? fgt.frame->pts : AV_NOPTS_VALUE;
-            AVRational tb = input_status >= 0 ? fgt.frame->time_base : (AVRational){ 1, 1 };
-            ret = send_eof(&fgt, ifilter, pts, tb);
+            av_assert1(o == FRAME_OPAQUE_EOF);
+            ret = send_eof(&fgt, ifilter, fgt.frame->pts, fgt.frame->time_base);
         }
         av_frame_unref(fgt.frame);
         if (ret < 0)
+            goto finish;
+
+read_frames:
+        // retrieve all newly avalable frames
+        ret = read_frames(fg, &fgt, fgt.frame);
+        if (ret == AVERROR_EOF) {
+            av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n");
             break;
-
-        if (eof_frame) {
-            // an EOF frame is immediately followed by sender closing
-            // the corresponding stream, so retrieve that event
-            input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame);
-            av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index);
-        }
-
-        // signal to the main thread that we are done
-        ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame);
-        if (ret < 0) {
-            if (ret == AVERROR_EOF)
-                break;
-
-            av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n");
+        } else if (ret < 0) {
+            av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n",
+                   av_err2str(ret));
             goto finish;
         }
     }
 
+    for (unsigned i = 0; i < fg->nb_outputs; i++) {
+        OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]);
+
+        if (fgt.eof_out[i])
+            continue;
+
+        ret = fg_output_frame(ofp, &fgt, NULL);
+        if (ret < 0)
+            goto finish;
+    }
+
 finish:
     // EOF is normal termination
     if (ret == AVERROR_EOF)
         ret = 0;
 
-    for (int i = 0; i <= fg->nb_inputs; i++)
-        tq_receive_finish(fgp->queue_in, i);
-    for (int i = 0; i <= fg->nb_outputs; i++)
-        tq_send_finish(fgp->queue_out, i);
-
     fg_thread_uninit(&fgt);
 
-    av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter,
-                             AVFrame *frame, enum FrameOpaque type)
-{
-    InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
-    int output_idx, ret;
-
-    if (ifp->eof) {
-        av_frame_unref(frame);
-        return AVERROR_EOF;
-    }
-
-    frame->opaque = (void*)(intptr_t)type;
-
-    ret = tq_send(fgp->queue_in, ifp->index, frame);
-    if (ret < 0) {
-        ifp->eof = 1;
-        av_frame_unref(frame);
-        return ret;
-    }
-
-    if (type == FRAME_OPAQUE_EOF)
-        tq_send_finish(fgp->queue_in, ifp->index);
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
-
-    return ret;
-}
-
-int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    if (keep_reference) {
-        ret = av_frame_ref(fgp->frame, frame);
-        if (ret < 0)
-            return ret;
-    } else
-        av_frame_move_ref(fgp->frame, frame);
-
-    return thread_send_frame(fgp, ifilter, fgp->frame, 0);
-}
-
-int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-    int ret;
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF);
-
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph);
-
-    fgp->frame->pts       = pts;
-    fgp->frame->time_base = tb;
-
-    thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT);
-}
-
-int fg_transcode_step(FilterGraph *graph, InputStream **best_ist)
-{
-    FilterGraphPriv *fgp = fgp_from_fg(graph);
-    int ret, got_frames = 0;
-
-    if (fgp->eof_in)
-        return AVERROR_EOF;
-
-    // signal to the filtering thread to return all frames it can
-    av_assert0(!fgp->frame->buf[0]);
-    fgp->frame->opaque = (void*)(intptr_t)(best_ist             ?
-                                           FRAME_OPAQUE_CHOOSE_INPUT :
-                                           FRAME_OPAQUE_REAP_FILTERS);
-
-    ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        goto finish;
-    }
-
-    while (1) {
-        OutputFilter *ofilter;
-        OutputFilterPriv *ofp;
-        OutputStream *ost;
-        int output_idx;
-
-        ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-
-        // EOF on the whole queue or the control stream
-        if (output_idx < 0 ||
-            (ret < 0 && output_idx == graph->nb_outputs))
-            goto finish;
-
-        // EOF for a specific stream
-        if (ret < 0) {
-            ofilter = graph->outputs[output_idx];
-            ofp     = ofp_from_ofilter(ofilter);
-
-            // we are finished and no frames were ever seen at this output,
-            // at least initialize the encoder with a dummy frame
-            if (!ofp->got_frame) {
-                AVFrame *frame = fgp->frame;
-                FrameData *fd;
-
-                frame->time_base   = ofp->tb_out;
-                frame->format      = ofp->format;
-
-                frame->width               = ofp->width;
-                frame->height              = ofp->height;
-                frame->sample_aspect_ratio = ofp->sample_aspect_ratio;
-
-                frame->sample_rate = ofp->sample_rate;
-                if (ofp->ch_layout.nb_channels) {
-                    ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout);
-                    if (ret < 0)
-                        return ret;
-                }
-
-                fd = frame_data(frame);
-                if (!fd)
-                    return AVERROR(ENOMEM);
-
-                fd->frame_rate_filter = ofp->fps.framerate;
-
-                av_assert0(!frame->buf[0]);
-
-                av_log(ofilter->ost, AV_LOG_WARNING,
-                       "No filtered frames for output stream, trying to "
-                       "initialize anyway.\n");
-
-                enc_open(ofilter->ost, frame);
-                av_frame_unref(frame);
-            }
-
-            close_output_stream(graph->outputs[output_idx]->ost);
-            continue;
-        }
-
-        // request was fully processed by the filtering thread,
-        // return the input stream to read from, if needed
-        if (output_idx == graph->nb_outputs) {
-            int input_idx = (intptr_t)fgp->frame->opaque - 1;
-            av_assert0(input_idx <= graph->nb_inputs);
-
-            if (best_ist) {
-                *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ?
-                            ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL;
-
-                if (input_idx < 0 && !got_frames) {
-                    for (int i = 0; i < graph->nb_outputs; i++)
-                        graph->outputs[i]->ost->unavailable = 1;
-                }
-            }
-            break;
-        }
-
-        // got a frame from the filtering thread, send it for encoding
-        ofilter = graph->outputs[output_idx];
-        ost     = ofilter->ost;
-        ofp     = ofp_from_ofilter(ofilter);
-
-        if (ost->finished) {
-            av_frame_unref(fgp->frame);
-            tq_receive_finish(fgp->queue_out, output_idx);
-            continue;
-        }
-
-        if (fgp->frame->pts != AV_NOPTS_VALUE) {
-            ofilter->last_pts = av_rescale_q(fgp->frame->pts,
-                                             fgp->frame->time_base,
-                                             AV_TIME_BASE_Q);
-        }
-
-        ret = enc_frame(ost, fgp->frame);
-        av_frame_unref(fgp->frame);
-        if (ret < 0)
-            goto finish;
-
-        ofp->got_frame = 1;
-        got_frames     = 1;
-    }
-
-finish:
-    if (ret < 0) {
-        fgp->eof_in = 1;
-        for (int i = 0; i < graph->nb_outputs; i++)
-            close_output_stream(graph->outputs[i]->ost);
-    }
-
-    return ret;
-}
-
-int reap_filters(FilterGraph *fg, int flush)
-{
-    return fg_transcode_step(fg, NULL);
-}
-
 void fg_send_command(FilterGraph *fg, double time, const char *target,
                      const char *command, const char *arg, int all_filters)
 {
     FilterGraphPriv *fgp = fgp_from_fg(fg);
     AVBufferRef *buf;
     FilterCommand *fc;
-    int output_idx, ret;
-
-    if (!fgp->queue_in)
-        return;
 
     fc = av_mallocz(sizeof(*fc));
     if (!fc)
@@ -3180,13 +2862,5 @@  void fg_send_command(FilterGraph *fg, double time, const char *target,
     fgp->frame->buf[0] = buf;
     fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND;
 
-    ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame);
-    if (ret < 0) {
-        av_frame_unref(fgp->frame);
-        return;
-    }
-
-    // wait for the frame to be processed
-    ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame);
-    av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF);
+    sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame);
 }
diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c
index ef5c2f60e0..067dc65d4e 100644
--- a/fftools/ffmpeg_mux.c
+++ b/fftools/ffmpeg_mux.c
@@ -23,16 +23,13 @@ 
 #include "ffmpeg.h"
 #include "ffmpeg_mux.h"
 #include "ffmpeg_utils.h"
-#include "objpool.h"
 #include "sync_queue.h"
-#include "thread_queue.h"
 
 #include "libavutil/fifo.h"
 #include "libavutil/intreadwrite.h"
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -41,10 +38,9 @@ 
 
 typedef struct MuxThreadContext {
     AVPacket *pkt;
+    AVPacket *fix_sub_duration_pkt;
 } MuxThreadContext;
 
-int want_sdp = 1;
-
 static Muxer *mux_from_of(OutputFile *of)
 {
     return (Muxer*)of;
@@ -207,14 +203,41 @@  static int sync_queue_process(Muxer *mux, OutputStream *ost, AVPacket *pkt, int
     return 0;
 }
 
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt);
+
 /* apply the output bitstream filters */
-static int mux_packet_filter(Muxer *mux, OutputStream *ost,
-                             AVPacket *pkt, int *stream_eof)
+static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt,
+                             OutputStream *ost, AVPacket *pkt, int *stream_eof)
 {
     MuxStream *ms = ms_from_ost(ost);
     const char *err_msg;
     int ret = 0;
 
+    if (pkt && !ost->enc) {
+        ret = of_streamcopy(ost, pkt);
+        if (ret == AVERROR(EAGAIN))
+            return 0;
+        else if (ret == AVERROR_EOF) {
+            av_packet_unref(pkt);
+            pkt = NULL;
+            ret = 0;
+        } else if (ret < 0)
+            goto fail;
+    }
+
+    // emit heartbeat for -fix_sub_duration;
+    // we are only interested in heartbeats on on random access points.
+    if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) {
+        mt->fix_sub_duration_pkt->opaque    = (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION;
+        mt->fix_sub_duration_pkt->pts       = pkt->pts;
+        mt->fix_sub_duration_pkt->time_base = pkt->time_base;
+
+        ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx,
+                                    mt->fix_sub_duration_pkt);
+        if (ret < 0)
+            goto fail;
+    }
+
     if (ms->bsf_ctx) {
         int bsf_eof = 0;
 
@@ -278,6 +301,7 @@  static void thread_set_name(OutputFile *of)
 static void mux_thread_uninit(MuxThreadContext *mt)
 {
     av_packet_free(&mt->pkt);
+    av_packet_free(&mt->fix_sub_duration_pkt);
 
     memset(mt, 0, sizeof(*mt));
 }
@@ -290,6 +314,10 @@  static int mux_thread_init(MuxThreadContext *mt)
     if (!mt->pkt)
         goto fail;
 
+    mt->fix_sub_duration_pkt = av_packet_alloc();
+    if (!mt->fix_sub_duration_pkt)
+        goto fail;
+
     return 0;
 
 fail:
@@ -316,19 +344,22 @@  void *muxer_thread(void *arg)
         OutputStream *ost;
         int stream_idx, stream_eof = 0;
 
-        ret = tq_receive(mux->tq, &stream_idx, mt.pkt);
+        ret = sch_mux_receive(mux->sch, of->index, mt.pkt);
+        stream_idx = mt.pkt->stream_index;
         if (stream_idx < 0) {
             av_log(mux, AV_LOG_VERBOSE, "All streams finished\n");
             ret = 0;
             break;
         }
 
-        ost = of->streams[stream_idx];
-        ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
+        ost = of->streams[mux->sch_stream_idx[stream_idx]];
+        mt.pkt->stream_index = ost->index;
+
+        ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, &stream_eof);
         av_packet_unref(mt.pkt);
         if (ret == AVERROR_EOF) {
             if (stream_eof) {
-                tq_receive_finish(mux->tq, stream_idx);
+                sch_mux_receive_finish(mux->sch, of->index, stream_idx);
             } else {
                 av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n");
                 ret = 0;
@@ -343,243 +374,55 @@  void *muxer_thread(void *arg)
 finish:
     mux_thread_uninit(&mt);
 
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_receive_finish(mux->tq, i);
-
-    av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n");
-
     return (void*)(intptr_t)ret;
 }
 
-static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt)
-{
-    int ret = 0;
-
-    if (!pkt || ost->finished & MUXER_FINISHED)
-        goto finish;
-
-    ret = tq_send(mux->tq, ost->index, pkt);
-    if (ret < 0)
-        goto finish;
-
-    return 0;
-
-finish:
-    if (pkt)
-        av_packet_unref(pkt);
-
-    ost->finished |= MUXER_FINISHED;
-    tq_send_finish(mux->tq, ost->index);
-    return ret == AVERROR_EOF ? 0 : ret;
-}
-
-static int queue_packet(OutputStream *ost, AVPacket *pkt)
-{
-    MuxStream *ms = ms_from_ost(ost);
-    AVPacket *tmp_pkt = NULL;
-    int ret;
-
-    if (!av_fifo_can_write(ms->muxing_queue)) {
-        size_t cur_size = av_fifo_can_read(ms->muxing_queue);
-        size_t pkt_size = pkt ? pkt->size : 0;
-        unsigned int are_we_over_size =
-            (ms->muxing_queue_data_size + pkt_size) > ms->muxing_queue_data_threshold;
-        size_t limit    = are_we_over_size ? ms->max_muxing_queue_size : SIZE_MAX;
-        size_t new_size = FFMIN(2 * cur_size, limit);
-
-        if (new_size <= cur_size) {
-            av_log(ost, AV_LOG_ERROR,
-                   "Too many packets buffered for output stream %d:%d.\n",
-                   ost->file_index, ost->st->index);
-            return AVERROR(ENOSPC);
-        }
-        ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size);
-        if (ret < 0)
-            return ret;
-    }
-
-    if (pkt) {
-        ret = av_packet_make_refcounted(pkt);
-        if (ret < 0)
-            return ret;
-
-        tmp_pkt = av_packet_alloc();
-        if (!tmp_pkt)
-            return AVERROR(ENOMEM);
-
-        av_packet_move_ref(tmp_pkt, pkt);
-        ms->muxing_queue_data_size += tmp_pkt->size;
-    }
-    av_fifo_write(ms->muxing_queue, &tmp_pkt, 1);
-
-    return 0;
-}
-
-static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost)
-{
-    int ret;
-
-    if (mux->tq) {
-        return thread_submit_packet(mux, ost, pkt);
-    } else {
-        /* the muxer is not initialized yet, buffer the packet */
-        ret = queue_packet(ost, pkt);
-        if (ret < 0) {
-            if (pkt)
-                av_packet_unref(pkt);
-            return ret;
-        }
-    }
-
-    return 0;
-}
-
-int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt)
-{
-    Muxer *mux = mux_from_of(of);
-    int ret = 0;
-
-    if (pkt && pkt->dts != AV_NOPTS_VALUE)
-        ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, AV_TIME_BASE_Q);
-
-    ret = submit_packet(mux, pkt, ost);
-    if (ret < 0) {
-        av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the muxer: %s",
-               av_err2str(ret));
-        return ret;
-    }
-
-    return 0;
-}
-
-int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts)
+static int of_streamcopy(OutputStream *ost, AVPacket *pkt)
 {
     OutputFile *of = output_files[ost->file_index];
     MuxStream  *ms = ms_from_ost(ost);
+    DemuxPktData *pd = pkt->opaque_ref ? (DemuxPktData*)pkt->opaque_ref->data : NULL;
+    int64_t      dts = pd ? pd->dts_est : AV_NOPTS_VALUE;
     int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time;
     int64_t ts_offset;
-    AVPacket *opkt = ms->pkt;
-    int ret;
-
-    av_packet_unref(opkt);
 
     if (of->recording_time != INT64_MAX &&
         dts >= of->recording_time + start_time)
-        pkt = NULL;
-
-    // EOF: flush output bitstream filters.
-    if (!pkt)
-        return of_output_packet(of, ost, NULL);
+        return AVERROR_EOF;
 
     if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) &&
         !ms->copy_initial_nonkeyframes)
-        return 0;
+        return AVERROR(EAGAIN);
 
     if (!ms->streamcopy_started) {
         if (!ms->copy_prior_start &&
             (pkt->pts == AV_NOPTS_VALUE ?
              dts < ms->ts_copy_start :
              pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base)))
-            return 0;
+            return AVERROR(EAGAIN);
 
         if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time)
-            return 0;
+            return AVERROR(EAGAIN);
     }
 
-    ret = av_packet_ref(opkt, pkt);
-    if (ret < 0)
-        return ret;
-
-    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base);
+    ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base);
 
     if (pkt->pts != AV_NOPTS_VALUE)
-        opkt->pts -= ts_offset;
+        pkt->pts -= ts_offset;
 
     if (pkt->dts == AV_NOPTS_VALUE) {
-        opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base);
+        pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base);
     } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
-        opkt->pts = opkt->dts - ts_offset;
-    }
-    opkt->dts -= ts_offset;
-
-    {
-        int ret = trigger_fix_sub_duration_heartbeat(ost, pkt);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR,
-                   "Subtitle heartbeat logic failed in %s! (%s)\n",
-                   __func__, av_err2str(ret));
-            return ret;
-        }
+        pkt->pts = pkt->dts - ts_offset;
     }
 
-    ret = of_output_packet(of, ost, opkt);
-    if (ret < 0)
-        return ret;
+    pkt->dts -= ts_offset;
 
     ms->streamcopy_started = 1;
 
     return 0;
 }
 
-static int thread_stop(Muxer *mux)
-{
-    void *ret;
-
-    if (!mux || !mux->tq)
-        return 0;
-
-    for (unsigned int i = 0; i < mux->fc->nb_streams; i++)
-        tq_send_finish(mux->tq, i);
-
-    pthread_join(mux->thread, &ret);
-
-    tq_free(&mux->tq);
-
-    return (int)(intptr_t)ret;
-}
-
-static int thread_start(Muxer *mux)
-{
-    AVFormatContext *fc = mux->fc;
-    ObjPool *op;
-    int ret;
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move);
-    if (!mux->tq) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-    ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux);
-    if (ret) {
-        tq_free(&mux->tq);
-        return AVERROR(ret);
-    }
-
-    /* flush the muxing queues */
-    for (int i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = mux->of.streams[i];
-        MuxStream     *ms = ms_from_ost(ost);
-        AVPacket *pkt;
-
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) {
-            ret = thread_submit_packet(mux, ost, pkt);
-            if (pkt) {
-                ms->muxing_queue_data_size -= pkt->size;
-                av_packet_free(&pkt);
-            }
-            if (ret < 0)
-                return ret;
-        }
-    }
-
-    return 0;
-}
-
 int print_sdp(const char *filename);
 
 int print_sdp(const char *filename)
@@ -590,11 +433,6 @@  int print_sdp(const char *filename)
     AVIOContext *sdp_pb;
     AVFormatContext **avc;
 
-    for (i = 0; i < nb_output_files; i++) {
-        if (!mux_from_of(output_files[i])->header_written)
-            return 0;
-    }
-
     avc = av_malloc_array(nb_output_files, sizeof(*avc));
     if (!avc)
         return AVERROR(ENOMEM);
@@ -629,25 +467,17 @@  int print_sdp(const char *filename)
         avio_closep(&sdp_pb);
     }
 
-    // SDP successfully written, allow muxer threads to start
-    ret = 1;
-
 fail:
     av_freep(&avc);
     return ret;
 }
 
-int mux_check_init(Muxer *mux)
+int mux_check_init(void *arg)
 {
+    Muxer     *mux = arg;
     OutputFile *of = &mux->of;
     AVFormatContext *fc = mux->fc;
-    int ret, i;
-
-    for (i = 0; i < fc->nb_streams; i++) {
-        OutputStream *ost = of->streams[i];
-        if (!ost->initialized)
-            return 0;
-    }
+    int ret;
 
     ret = avformat_write_header(fc, &mux->opts);
     if (ret < 0) {
@@ -659,27 +489,7 @@  int mux_check_init(Muxer *mux)
     mux->header_written = 1;
 
     av_dump_format(fc, of->index, fc->url, 1);
-    nb_output_dumped++;
-
-    if (sdp_filename || want_sdp) {
-        ret = print_sdp(sdp_filename);
-        if (ret < 0) {
-            av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n");
-            return ret;
-        } else if (ret == 1) {
-            /* SDP is written only after all the muxers are ready, so now we
-             * start ALL the threads */
-            for (i = 0; i < nb_output_files; i++) {
-                ret = thread_start(mux_from_of(output_files[i]));
-                if (ret < 0)
-                    return ret;
-            }
-        }
-    } else {
-        ret = thread_start(mux_from_of(of));
-        if (ret < 0)
-            return ret;
-    }
+    atomic_fetch_add(&nb_output_dumped, 1);
 
     return 0;
 }
@@ -736,9 +546,10 @@  int of_stream_init(OutputFile *of, OutputStream *ost)
                                          ost->st->time_base);
     }
 
-    ost->initialized = 1;
+    if (ms->sch_idx >= 0)
+        return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx);
 
-    return mux_check_init(mux);
+    return 0;
 }
 
 static int check_written(OutputFile *of)
@@ -852,15 +663,13 @@  int of_write_trailer(OutputFile *of)
     AVFormatContext *fc = mux->fc;
     int ret, mux_result = 0;
 
-    if (!mux->tq) {
+    if (!mux->header_written) {
         av_log(mux, AV_LOG_ERROR,
                "Nothing was written into output file, because "
                "at least one of its streams received no packets.\n");
         return AVERROR(EINVAL);
     }
 
-    mux_result = thread_stop(mux);
-
     ret = av_write_trailer(fc);
     if (ret < 0) {
         av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret));
@@ -905,13 +714,6 @@  static void ost_free(OutputStream **post)
         ost->logfile = NULL;
     }
 
-    if (ms->muxing_queue) {
-        AVPacket *pkt;
-        while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0)
-            av_packet_free(&pkt);
-        av_fifo_freep2(&ms->muxing_queue);
-    }
-
     avcodec_parameters_free(&ost->par_in);
 
     av_bsf_free(&ms->bsf_ctx);
@@ -976,8 +778,6 @@  void of_free(OutputFile **pof)
         return;
     mux = mux_from_of(of);
 
-    thread_stop(mux);
-
     sq_free(&of->sq_encode);
     sq_free(&mux->sq_mux);
 
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index eee2b2cb07..5d7cf3fa76 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -25,7 +25,6 @@ 
 #include <stdint.h>
 
 #include "ffmpeg_sched.h"
-#include "thread_queue.h"
 
 #include "libavformat/avformat.h"
 
@@ -33,7 +32,6 @@ 
 
 #include "libavutil/dict.h"
 #include "libavutil/fifo.h"
-#include "libavutil/thread.h"
 
 typedef struct MuxStream {
     OutputStream ost;
@@ -41,9 +39,6 @@  typedef struct MuxStream {
     // name used for logging
     char log_name[32];
 
-    /* the packets are buffered here until the muxer is ready to be initialized */
-    AVFifo *muxing_queue;
-
     AVBSFContext *bsf_ctx;
     AVPacket     *bsf_pkt;
 
@@ -57,17 +52,6 @@  typedef struct MuxStream {
 
     int64_t max_frames;
 
-    /*
-     * The size of the AVPackets' buffers in queue.
-     * Updated when a packet is either pushed or pulled from the queue.
-     */
-    size_t muxing_queue_data_size;
-
-    int max_muxing_queue_size;
-
-    /* Threshold after which max_muxing_queue_size will be in effect */
-    size_t muxing_queue_data_threshold;
-
     // timestamp from which the streamcopied streams should start,
     // in AV_TIME_BASE_Q;
     // everything before it should be discarded
@@ -106,9 +90,6 @@  typedef struct Muxer {
     int         *sch_stream_idx;
     int       nb_sch_stream_idx;
 
-    pthread_t    thread;
-    ThreadQueue *tq;
-
     AVDictionary *opts;
 
     int thread_queue_size;
@@ -122,10 +103,7 @@  typedef struct Muxer {
     AVPacket *sq_pkt;
 } Muxer;
 
-/* whether we want to print an SDP, set in of_open() */
-extern int want_sdp;
-
-int mux_check_init(Muxer *mux);
+int mux_check_init(void *arg);
 
 static MuxStream *ms_from_ost(OutputStream *ost)
 {
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 534b4379c7..6459296ab0 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -924,13 +924,6 @@  static int new_stream_audio(Muxer *mux, const OptionsContext *o,
     return 0;
 }
 
-static int new_stream_attachment(Muxer *mux, const OptionsContext *o,
-                                 OutputStream *ost)
-{
-    ost->finished    = 1;
-    return 0;
-}
-
 static int new_stream_subtitle(Muxer *mux, const OptionsContext *o,
                                OutputStream *ost)
 {
@@ -1168,9 +1161,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (!ost->par_in)
         return AVERROR(ENOMEM);
 
-    ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0);
-    if (!ms->muxing_queue)
-        return AVERROR(ENOMEM);
     ms->last_mux_dts = AV_NOPTS_VALUE;
 
     ost->st         = st;
@@ -1190,7 +1180,8 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
         if (!ost->enc_ctx)
             return AVERROR(ENOMEM);
 
-        ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL);
+        ret = sch_add_enc(mux->sch, encoder_thread, ost,
+                          ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : enc_open);
         if (ret < 0)
             return ret;
         ms->sch_idx_enc = ret;
@@ -1414,9 +1405,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
 
         sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx,
                                  max_muxing_queue_size, muxing_queue_data_threshold);
-
-        ms->max_muxing_queue_size       = max_muxing_queue_size;
-        ms->muxing_queue_data_threshold = muxing_queue_data_threshold;
     }
 
     MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample,
@@ -1434,8 +1422,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     if (ost->enc_ctx && av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24)
         av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0);
 
-    ost->last_mux_dts = AV_NOPTS_VALUE;
-
     MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i,
                          ms->copy_initial_nonkeyframes, oc, st);
 
@@ -1443,7 +1429,6 @@  static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type,
     case AVMEDIA_TYPE_VIDEO:      ret = new_stream_video     (mux, o, ost); break;
     case AVMEDIA_TYPE_AUDIO:      ret = new_stream_audio     (mux, o, ost); break;
     case AVMEDIA_TYPE_SUBTITLE:   ret = new_stream_subtitle  (mux, o, ost); break;
-    case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, ost); break;
     }
     if (ret < 0)
         return ret;
@@ -1938,7 +1923,6 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
         MuxStream     *ms = ms_from_ost(ost);
         enum AVMediaType type = ost->type;
 
-        ost->sq_idx_encode = -1;
         ost->sq_idx_mux    = -1;
 
         nb_interleaved += IS_INTERLEAVED(type);
@@ -1961,11 +1945,17 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
      * - at least one encoded audio/video stream is frame-limited, since
      *   that has similar semantics to 'shortest'
      * - at least one audio encoder requires constant frame sizes
+     *
+     * Note that encoding sync queues are handled in the scheduler, because
+     * different encoders run in different threads and need external
+     * synchronization, while muxer sync queues can be handled inside the muxer
      */
     if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) {
-        of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux);
-        if (!of->sq_encode)
-            return AVERROR(ENOMEM);
+        int sq_idx, ret;
+
+        sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux);
+        if (sq_idx < 0)
+            return sq_idx;
 
         for (int i = 0; i < oc->nb_streams; i++) {
             OutputStream *ost = of->streams[i];
@@ -1975,13 +1965,11 @@  static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u
             if (!IS_AV_ENC(ost, type))
                 continue;
 
-            ost->sq_idx_encode = sq_add_stream(of->sq_encode,
-                                               of->shortest || ms->max_frames < INT64_MAX);
-            if (ost->sq_idx_encode < 0)
-                return ost->sq_idx_encode;
-
-            if (ms->max_frames != INT64_MAX)
-                sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames);
+            ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc,
+                                 of->shortest || ms->max_frames < INT64_MAX,
+                                 ms->max_frames);
+            if (ret < 0)
+                return ret;
         }
     }
 
@@ -2652,23 +2640,6 @@  static int validate_enc_avopt(Muxer *mux, const AVDictionary *codec_avopt)
     return 0;
 }
 
-static int init_output_stream_nofilter(OutputStream *ost)
-{
-    int ret = 0;
-
-    if (ost->enc_ctx) {
-        ret = enc_open(ost, NULL);
-        if (ret < 0)
-            return ret;
-    } else {
-        ret = of_stream_init(output_files[ost->file_index], ost);
-        if (ret < 0)
-            return ret;
-    }
-
-    return ret;
-}
-
 static const char *output_file_item_name(void *obj)
 {
     const Muxer *mux = obj;
@@ -2751,8 +2722,6 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     av_strlcat(mux->log_name, "/",               sizeof(mux->log_name));
     av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name));
 
-    if (strcmp(oc->oformat->name, "rtp"))
-        want_sdp = 0;
 
     of->format = oc->oformat;
     if (recording_time != INT64_MAX)
@@ -2768,7 +2737,7 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                                            AVFMT_FLAG_BITEXACT);
     }
 
-    err = sch_add_mux(sch, muxer_thread, NULL, mux,
+    err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
                       !strcmp(oc->oformat->name, "rtp"));
     if (err < 0)
         return err;
@@ -2854,26 +2823,15 @@  int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
 
     of->url        = filename;
 
-    /* initialize stream copy and subtitle/data streams.
-     * Encoded AVFrame based streams will get initialized when the first AVFrame
-     * is received in do_video_out
-     */
+    /* initialize streamcopy streams. */
     for (int i = 0; i < of->nb_streams; i++) {
         OutputStream *ost = of->streams[i];
 
-        if (ost->filter)
-            continue;
-
-        err = init_output_stream_nofilter(ost);
-        if (err < 0)
-            return err;
-    }
-
-    /* write the header for files with no streams */
-    if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) {
-        int ret = mux_check_init(mux);
-        if (ret < 0)
-            return ret;
+        if (!ost->enc) {
+            err = of_stream_init(of, ost);
+            if (err < 0)
+                return err;
+        }
     }
 
     return 0;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index d463306546..6177a96a4e 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -64,7 +64,6 @@  const char *const opt_name_top_field_first[]                  = {"top", NULL};
 HWDevice *filter_hw_device;
 
 char *vstats_filename;
-char *sdp_filename;
 
 float audio_drift_threshold = 0.1;
 float dts_delta_threshold   = 10;
@@ -580,9 +579,8 @@  fail:
 
 static int opt_sdp_file(void *optctx, const char *opt, const char *arg)
 {
-    av_free(sdp_filename);
-    sdp_filename = av_strdup(arg);
-    return 0;
+    Scheduler *sch = optctx;
+    return sch_sdp_filename(sch, arg);
 }
 
 #if CONFIG_VAAPI
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index 957a410921..bc9b833799 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -1,48 +1,40 @@ 
 1
-00:00:00,968 --> 00:00:01,001
+00:00:00,968 --> 00:00:01,168
 <font face="Monospace">{\an7}(</font>
 
 2
-00:00:01,001 --> 00:00:01,168
-<font face="Monospace">{\an7}(</font>
-
-3
 00:00:01,168 --> 00:00:01,368
 <font face="Monospace">{\an7}(<i> inaudibl</i></font>
 
-4
+3
 00:00:01,368 --> 00:00:01,568
 <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font>
 
-5
+4
 00:00:01,568 --> 00:00:02,002
 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
 
+5
+00:00:02,002 --> 00:00:03,103
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
+
 6
-00:00:02,002 --> 00:00:03,003
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-7
-00:00:03,003 --> 00:00:03,103
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font>
-
-8
 00:00:03,103 --> 00:00:03,303
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >></font>
 
-9
+7
 00:00:03,303 --> 00:00:03,503
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety rema</font>
 
-10
+8
 00:00:03,504 --> 00:00:03,704
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our numb</font>
 
-11
+9
 00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
+<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our number one</font>