diff mbox

[FFmpeg-devel,4/5] ffserver: Implement ffserver and add Makefile

Message ID 20180510154126.30789-5-klaxa1337@googlemail.com
State Superseded
Headers show

Commit Message

Stephan Holljes May 10, 2018, 3:41 p.m. UTC
Signed-off-by: Stephan Holljes <klaxa1337@googlemail.com>
---
 Makefile   |  18 +++
 ffserver.c | 514 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 532 insertions(+)
 create mode 100644 Makefile
 create mode 100644 ffserver.c

Comments

Moritz Barsnick May 11, 2018, 12:49 p.m. UTC | #1
On Thu, May 10, 2018 at 17:41:25 +0200, Stephan Holljes wrote:
> +ffserver: segment.o publisher.o lavfhttpd.o ffserver.c
> +	cc -g -Wall $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o ffserver.c
        ^ $(CC) $(CFLAGS) to be more generic and allow overriding.

The LDFLAGS "$(LAV_FLAGS) -lpthread" need to be at the end of the
command for my compile/link chains to work. *shrug*

> +clean:
> +	rm *.o ffserver

Make this "rm -f" to silence warnings about non-existant files.

> diff --git a/ffserver.c b/ffserver.c

It may be me, but the whole thing doesn't work for me.

With any MP4 file, I get:

[AVIOContext @ 0xa3c3b40] {
        "free": 16,
        "reserved": 0,
        "wait": 0,
        "writable": 0,
        "busy": 0,
        "buffer_full": 0,
        "current_read": -1,
        "newest_write": 0,
        "oldest_write": -1
}
[mov,mp4,m4a,3gp,3g2,mj2 @ 0xa394580] Finding video stream.
[matroska @ 0xb4319700] Tag mp4a incompatible with output codec id '86018' ([255][0][0][0])
[matroska @ 0xb4319700] Error occured while writing header: Invalid data found when processing input
Segmentation fault

gdb, if it's of interest:
Program received signal SIGSEGV, Segmentation fault.
[Switching to Thread 0xb5717b90 (LWP 26045)]
0x08068d6d in compute_muxer_pkt_fields (s=0xb4c19240, st=0xfffffffe, pkt=0xffffffff) at src/libavformat/mux.c:640
640         st->internal->priv_pts->val = pkt->dts;

(ffmpeg of the same version as the libs has no issue processing the
same file to matroska.)

Using an MKV as input, it launches as such:

[AVIOContext @ 0xb441e5c0] {
        "free": 16,
        "reserved": 0,
        "wait": 0,
        "writable": 0,
        "busy": 0,
        "buffer_full": 0,
        "current_read": -1,
        "newest_write": 0,
        "oldest_write": -1
}
[matroska,webm @ 0xa166580] Finding video stream.
[AVIOContext @ 0xb441e5c0] Shutting down http server.

"Shutting down"??? And I can't connect and play.

It may just be me though. Hope this helps anyway.

Moritz
Moritz Barsnick May 11, 2018, 12:57 p.m. UTC | #2
On Fri, May 11, 2018 at 14:49:53 +0200, Moritz Barsnick wrote:
> [matroska,webm @ 0xa166580] Finding video stream.
> [AVIOContext @ 0xb441e5c0] Shutting down http server.
> 
> "Shutting down"??? And I can't connect and play.

Forget this part, my static video was too short, and I didn't realize
it gets streamed realtime, and not "served".

Moritz
Stephan Holljes May 11, 2018, 1:05 p.m. UTC | #3
On Fri, May 11, 2018 at 2:49 PM, Moritz Barsnick <barsnick@gmx.net> wrote:
> On Thu, May 10, 2018 at 17:41:25 +0200, Stephan Holljes wrote:
>> +ffserver: segment.o publisher.o lavfhttpd.o ffserver.c
>> +     cc -g -Wall $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o ffserver.c
>         ^ $(CC) $(CFLAGS) to be more generic and allow overriding.
>
> The LDFLAGS "$(LAV_FLAGS) -lpthread" need to be at the end of the
> command for my compile/link chains to work. *shrug*
>
>> +clean:
>> +     rm *.o ffserver
>
> Make this "rm -f" to silence warnings about non-existant files.

Ok

>
>> diff --git a/ffserver.c b/ffserver.c
>
> It may be me, but the whole thing doesn't work for me.
>
> With any MP4 file, I get:
>
> [AVIOContext @ 0xa3c3b40] {
>         "free": 16,
>         "reserved": 0,
>         "wait": 0,
>         "writable": 0,
>         "busy": 0,
>         "buffer_full": 0,
>         "current_read": -1,
>         "newest_write": 0,
>         "oldest_write": -1
> }
> [mov,mp4,m4a,3gp,3g2,mj2 @ 0xa394580] Finding video stream.
> [matroska @ 0xb4319700] Tag mp4a incompatible with output codec id '86018' ([255][0][0][0])
> [matroska @ 0xb4319700] Error occured while writing header: Invalid data found when processing input
> Segmentation fault
>
> gdb, if it's of interest:
> Program received signal SIGSEGV, Segmentation fault.
> [Switching to Thread 0xb5717b90 (LWP 26045)]
> 0x08068d6d in compute_muxer_pkt_fields (s=0xb4c19240, st=0xfffffffe, pkt=0xffffffff) at src/libavformat/mux.c:640
> 640         st->internal->priv_pts->val = pkt->dts;
>
> (ffmpeg of the same version as the libs has no issue processing the
> same file to matroska.)
>
> Using an MKV as input, it launches as such:
>
> [AVIOContext @ 0xb441e5c0] {
>         "free": 16,
>         "reserved": 0,
>         "wait": 0,
>         "writable": 0,
>         "busy": 0,
>         "buffer_full": 0,
>         "current_read": -1,
>         "newest_write": 0,
>         "oldest_write": -1
> }
> [matroska,webm @ 0xa166580] Finding video stream.
> [AVIOContext @ 0xb441e5c0] Shutting down http server.
>
> "Shutting down"??? And I can't connect and play.
>
> It may just be me though. Hope this helps anyway.
>
> Moritz
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel

Hmm that *is* a problem. Maybe I should have used more than one file
for testing. I just tested another file and it also only worked
properly on second try. This time there was a condition I could not
reproduce the last few days:
>[matroska @ 0x55eb77169040] Application provided invalid, non monotonically increasing dts to muxer in stream 0: 18894 >= 18894
I already mentioned that I'm not too sure about way I handle
timestamps. (Currently each segment has a separate dynamic array
filled with dts and pts as int64_t taken from the demuxer during
reading.)

I also tested an mp4 file and I can reproduce the error message. I'm
guessing it's "only" a metadata thing, but I know ffmpeg can mux this
without problems, so I'll investigate how to do that.

Thanks for the time, I will assemble more files for testing!
Stephan Holljes May 13, 2018, 12:01 a.m. UTC | #4
On Fri, May 11, 2018 at 3:05 PM, Stephan Holljes
<klaxa1337@googlemail.com> wrote:
> On Fri, May 11, 2018 at 2:49 PM, Moritz Barsnick <barsnick@gmx.net> wrote:
>> On Thu, May 10, 2018 at 17:41:25 +0200, Stephan Holljes wrote:
>>> +ffserver: segment.o publisher.o lavfhttpd.o ffserver.c
>>> +     cc -g -Wall $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o ffserver.c
>>         ^ $(CC) $(CFLAGS) to be more generic and allow overriding.
>>
>> The LDFLAGS "$(LAV_FLAGS) -lpthread" need to be at the end of the
>> command for my compile/link chains to work. *shrug*
>>
>>> +clean:
>>> +     rm *.o ffserver
>>
>> Make this "rm -f" to silence warnings about non-existant files.
>
> Ok
>
>>
>>> diff --git a/ffserver.c b/ffserver.c
>>
>> It may be me, but the whole thing doesn't work for me.
>>
>> With any MP4 file, I get:
>>
>> [AVIOContext @ 0xa3c3b40] {
>>         "free": 16,
>>         "reserved": 0,
>>         "wait": 0,
>>         "writable": 0,
>>         "busy": 0,
>>         "buffer_full": 0,
>>         "current_read": -1,
>>         "newest_write": 0,
>>         "oldest_write": -1
>> }
>> [mov,mp4,m4a,3gp,3g2,mj2 @ 0xa394580] Finding video stream.
>> [matroska @ 0xb4319700] Tag mp4a incompatible with output codec id '86018' ([255][0][0][0])
>> [matroska @ 0xb4319700] Error occured while writing header: Invalid data found when processing input
>> Segmentation fault
>>
>> gdb, if it's of interest:
>> Program received signal SIGSEGV, Segmentation fault.
>> [Switching to Thread 0xb5717b90 (LWP 26045)]
>> 0x08068d6d in compute_muxer_pkt_fields (s=0xb4c19240, st=0xfffffffe, pkt=0xffffffff) at src/libavformat/mux.c:640
>> 640         st->internal->priv_pts->val = pkt->dts;
>>
>> (ffmpeg of the same version as the libs has no issue processing the
>> same file to matroska.)
>>
>> Using an MKV as input, it launches as such:
>>
>> [AVIOContext @ 0xb441e5c0] {
>>         "free": 16,
>>         "reserved": 0,
>>         "wait": 0,
>>         "writable": 0,
>>         "busy": 0,
>>         "buffer_full": 0,
>>         "current_read": -1,
>>         "newest_write": 0,
>>         "oldest_write": -1
>> }
>> [matroska,webm @ 0xa166580] Finding video stream.
>> [AVIOContext @ 0xb441e5c0] Shutting down http server.
>>
>> "Shutting down"??? And I can't connect and play.
>>
>> It may just be me though. Hope this helps anyway.
>>
>> Moritz
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel@ffmpeg.org
>> http://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> Hmm that *is* a problem. Maybe I should have used more than one file
> for testing. I just tested another file and it also only worked
> properly on second try. This time there was a condition I could not
> reproduce the last few days:
>>[matroska @ 0x55eb77169040] Application provided invalid, non monotonically increasing dts to muxer in stream 0: 18894 >= 18894
> I already mentioned that I'm not too sure about way I handle
> timestamps. (Currently each segment has a separate dynamic array
> filled with dts and pts as int64_t taken from the demuxer during
> reading.)
>
> I also tested an mp4 file and I can reproduce the error message. I'm
> guessing it's "only" a metadata thing, but I know ffmpeg can mux this
> without problems, so I'll investigate how to do that.
>
> Thanks for the time, I will assemble more files for testing!

I figured out how to prevent the fatal error during muxing by looking
at doc/examples/remuxing.c.
In the process I came to the conclusion that the incorrect timestamps
written to a client most likely are a race condition, since after
testing with a single thread and after tightening locking this does no
longer occur. The first hint for this was also given by compiling with
address sanitation. However, there is still something wrong with the
timestamps. Remuxing from an mp4 file to in-memory matroska segments
yields:

> [matroska @ 0x61b00001ea80] Starting new cluster due to timestamp

many times in the output. The stream produced can also not properly be
played, mpv outputs:

> Invalid audio PTS: 0.023220 -> 1.024000

This does not happen with matroska files that are read.
I will send an updated version including the fixes from the other emails.

Maybe someone can point me to some resources about timestamps and how
to deal with them? Like I mentioned a few times, I'm really not sure
how to do this properly. In the meantime I'll read more doc/example
code and probably ffmpeg.c as well.

Thanks!
Stephan
diff mbox

Patch

diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..118ba27
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,18 @@ 
+all: ffserver
+LAV_FLAGS = $(shell pkg-config --libs --cflags libavformat libavcodec libavutil)
+# LAV_FLAGS = -L/usr/local/lib -lavcodec -lavformat -lavutil
+
+ffserver: segment.o publisher.o lavfhttpd.o ffserver.c
+	cc -g -Wall $(LAV_FLAGS) -lpthread -o ffserver segment.o publisher.o lavfhttpd.o ffserver.c
+
+segment.o: segment.c segment.h
+	cc -g -Wall $(LAV_FLAGS) -lpthread -c segment.c
+
+publisher.o: publisher.c publisher.h
+	cc -g -Wall $(LAV_FLAGS) -lpthread -c publisher.c
+
+lavfhttpd.o: lavfhttpd.c httpd.h
+	cc -g -Wall $(LAV_FLAGS) -lpthread -c lavfhttpd.c
+
+clean:
+	rm *.o ffserver
diff --git a/ffserver.c b/ffserver.c
new file mode 100644
index 0000000..9cbf469
--- /dev/null
+++ b/ffserver.c
@@ -0,0 +1,514 @@ 
+/*
+ * Copyright (c) 2018 Stephan Holljes
+ *
+ * This file is part of FFmpeg.
+ *
+ * FFmpeg is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * FFmpeg is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with FFmpeg; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+/**
+ * @file
+ * multimedia server based on the FFmpeg libraries
+ */
+
+#include <stdio.h>
+#include <unistd.h>
+#include <inttypes.h>
+#include <pthread.h>
+
+#include <libavutil/log.h>
+#include <libavutil/timestamp.h>
+#include <libavutil/time.h>
+#include <libavutil/opt.h>
+#include <libavformat/avformat.h>
+#include <libavcodec/avcodec.h>
+
+#include "segment.h"
+#include "publisher.h"
+#include "httpd.h"
+
+#define BUFFER_SECS 30
+#define LISTEN_TIMEOUT_MSEC 1000
+
+struct ReadInfo {
+    struct PublisherContext *pub;
+    AVFormatContext *ifmt_ctx;
+    char *in_filename;
+};
+
+struct WriteInfo {
+    struct PublisherContext *pub;
+    int thread_id;
+};
+
+struct AcceptInfo {
+    struct PublisherContext *pub;
+    struct HTTPDInterface *httpd;
+    AVFormatContext *ifmt_ctx;
+};
+
+
+int ffserver_write(void *opaque, unsigned char *buf, int buf_size)
+{
+    struct FFServerInfo *info = (struct FFServerInfo*) opaque;
+    return info->httpd->write(info->server, info->client, buf, buf_size);
+}
+
+
+void *read_thread(void *arg)
+{
+    struct ReadInfo *info = (struct ReadInfo*) arg;
+    AVFormatContext *ifmt_ctx = info->ifmt_ctx;
+    int ret, i;
+    int video_idx = -1;
+    int id = 0;
+    int64_t pts, now, start;
+    int64_t *ts;
+    struct Segment *seg = NULL;
+    AVPacket pkt;
+    AVStream *in_stream;
+    AVRational tb;
+    tb.num = 1;
+    tb.den = AV_TIME_BASE;
+    AVStream *stream;
+    AVCodecParameters *params;
+    enum AVMediaType type;
+    
+    if ((ret = avformat_find_stream_info(ifmt_ctx, NULL)) < 0) {
+        av_log(ifmt_ctx, AV_LOG_ERROR, "Could not get input stream info.\n");
+        goto end;
+    }
+    
+    av_log(ifmt_ctx, AV_LOG_INFO, "Finding video stream.\n");
+    for (i = 0; i < ifmt_ctx->nb_streams; i++) {
+        av_log(ifmt_ctx, AV_LOG_DEBUG, "Checking stream %d\n", i);
+        stream = ifmt_ctx->streams[i];
+        params = stream->codecpar;
+        type = params->codec_type;
+        if (type == AVMEDIA_TYPE_VIDEO) {
+            video_idx = i;
+            break;
+        }
+    }
+    if (video_idx == -1) {
+        av_log(ifmt_ctx, AV_LOG_ERROR, "No video stream found.\n");
+        goto end;
+    }
+    
+    
+    // All information needed to start segmenting the file is gathered now.
+    // start BUFFER_SECS seconds "in the past" to "catch up" to real-time. Has no effect on streamed sources.
+    start = av_gettime_relative() - BUFFER_SECS * AV_TIME_BASE;
+    
+    // segmenting main-loop
+    
+    for (;;) {
+        ret = av_read_frame(ifmt_ctx, &pkt);
+        if (ret < 0)
+            break;
+        
+        in_stream = ifmt_ctx->streams[pkt.stream_index];
+        if (pkt.pts == AV_NOPTS_VALUE) {
+            pkt.pts = 0;
+        }
+        if (pkt.dts == AV_NOPTS_VALUE) {
+            pkt.dts = 0;
+        }
+        
+        // current pts
+        pts = av_rescale_q(pkt.pts, in_stream->time_base, tb);
+        
+        // current stream "uptime"
+        now = av_gettime_relative() - start;
+
+        // simulate real-time reading
+        while (pts > now) {
+            usleep(1000);
+            now = av_gettime_relative() - start;
+        }
+        
+        // keyframe or first Segment
+        if ((pkt.flags & AV_PKT_FLAG_KEY && pkt.stream_index == video_idx) || !seg) {
+            if (seg) {
+                segment_close(seg);
+                publisher_push_segment(info->pub, seg);
+                av_log(NULL, AV_LOG_DEBUG, "New segment pushed.\n");
+                publish(info->pub);
+				av_log(NULL, AV_LOG_DEBUG, "Published new segment.\n");
+            }
+            segment_init(&seg, ifmt_ctx);
+            seg->id = id++;
+            av_log(NULL, AV_LOG_DEBUG, "Starting new segment, id: %d\n", seg->id);
+        }
+        
+        ts = av_dynarray2_add((void **)&seg->ts, &seg->ts_len, sizeof(int64_t),
+                              (const void *)&pkt.dts);
+        if (!ts) {
+            av_log(seg->fmt_ctx, AV_LOG_ERROR, "could not write dts\n.");
+            goto end;
+        }
+        
+        ts = av_dynarray2_add((void **)&seg->ts, &seg->ts_len, sizeof(int64_t),
+                              (const void *)&pkt.pts);
+        if (!ts) {
+            av_log(seg->fmt_ctx, AV_LOG_ERROR, "could not write pts\n.");
+            goto end;
+        }
+        ret = av_write_frame(seg->fmt_ctx, &pkt);
+        av_packet_unref(&pkt);
+        if (ret < 0) {
+            av_log(seg->fmt_ctx, AV_LOG_ERROR, "av_write_frame() failed.\n");
+            goto end;
+        }
+    }
+    
+    if (ret < 0 && ret != AVERROR_EOF) {
+        av_log(seg->fmt_ctx, AV_LOG_ERROR, "Error occurred during read: %s\n", av_err2str(ret));
+        goto end;
+    }
+
+    segment_close(seg);
+    publisher_push_segment(info->pub, seg);
+    publish(info->pub);
+
+
+end:
+    avformat_close_input(&ifmt_ctx);
+    info->pub->shutdown = 1;
+    return NULL;
+}
+
+void write_segment(struct Client *c)
+{
+    struct Segment *seg;
+    int ret;
+    int pkt_count = 0;
+    pthread_mutex_lock(&c->buffer_lock);
+    if (av_fifo_size(c->buffer) > 0) {
+        AVFormatContext *fmt_ctx;
+        AVIOContext *avio_ctx;
+        AVPacket pkt;
+        struct SegmentReadInfo info;
+        unsigned char *avio_buffer;
+        
+        av_fifo_generic_peek(c->buffer, &seg, sizeof(struct Segment*), NULL);
+        pthread_mutex_unlock(&c->buffer_lock);
+        client_set_state(c, BUSY);
+        c->current_segment_id = seg->id;
+        info.buf = seg->buf;
+        info.left = seg->size;
+        
+        if (!(fmt_ctx = avformat_alloc_context())) {
+            av_log(NULL, AV_LOG_ERROR, "Could not allocate format context\n");
+            client_disconnect(c, 0);
+            return;
+        }
+        
+        avio_buffer = (unsigned char*) av_malloc(AV_BUFSIZE);
+        avio_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 0, &info, &segment_read, NULL, NULL);
+        
+        fmt_ctx->pb = avio_ctx;
+        ret = avformat_open_input(&fmt_ctx, NULL, seg->ifmt, NULL);
+        if (ret < 0) {
+            av_log(avio_ctx, AV_LOG_ERROR, "Could not open input\n");
+            av_free(avio_ctx->buffer);
+            avio_context_free(&avio_ctx);
+            client_disconnect(c, 0);
+            return;
+        }
+        
+        ret = avformat_find_stream_info(fmt_ctx, NULL);
+        if (ret < 0) {
+            av_log(fmt_ctx, AV_LOG_ERROR, "Could not find stream information\n");
+            av_free(avio_ctx->buffer);
+            avio_context_free(&avio_ctx);
+            client_disconnect(c, 0);
+            return;
+        }
+        
+        av_log(fmt_ctx, AV_LOG_DEBUG, "Client: %d, Segment: %d\n", c->id, seg->id);
+
+        for (;;) {
+            ret = av_read_frame(fmt_ctx, &pkt);
+            if (ret < 0)
+                break;
+            
+            pkt.dts = seg->ts[pkt_count];
+            pkt.pts = seg->ts[pkt_count+1];
+            pkt_count += 2;
+            ret = av_write_frame(c->ofmt_ctx, &pkt);
+            av_packet_unref(&pkt);
+            if (ret < 0) {
+                av_log(fmt_ctx, AV_LOG_ERROR, "write_frame failed, disconnecting client: %d\n", c->id);
+                avformat_close_input(&fmt_ctx);
+                av_free(avio_ctx->buffer);
+                avio_context_free(&avio_ctx);
+                client_disconnect(c, 0);
+                return;
+            }
+        }
+        avformat_close_input(&fmt_ctx);
+        av_free(avio_ctx->buffer);
+        avformat_free_context(fmt_ctx);
+        avio_context_free(&avio_ctx);
+        pthread_mutex_lock(&c->buffer_lock);
+        av_fifo_drain(c->buffer, sizeof(struct Segment*));
+        pthread_mutex_unlock(&c->buffer_lock);
+        segment_unref(seg);
+        client_set_state(c, WRITABLE);
+    } else {
+        pthread_mutex_unlock(&c->buffer_lock);
+        client_set_state(c, WAIT);
+    }
+}
+
+void *accept_thread(void *arg)
+{
+    struct AcceptInfo *info = (struct AcceptInfo*) arg;
+    struct FFServerInfo *ffinfo = NULL;
+    char status[4096];
+    struct HTTPClient *client = NULL;
+    void *server = NULL;
+    AVIOContext *client_ctx = NULL;
+    AVFormatContext *ofmt_ctx = NULL;
+    unsigned char *avio_buffer;
+    AVOutputFormat *ofmt;
+    AVDictionary *mkvopts = NULL;
+    AVStream *in_stream, *out_stream;
+    int ret, i, reply_code;
+    struct HTTPDConfig config = {
+        .bind_address = "0",
+        .port = 8080,
+        .accept_timeout = LISTEN_TIMEOUT_MSEC,
+    };
+    
+    info->httpd->init(&server, config);
+    
+    
+    for (;;) {
+        if (info->pub->shutdown)
+            break;
+        publisher_gen_status_json(info->pub, status);
+        av_log(server, AV_LOG_INFO, status);
+        client = NULL;
+        av_log(server, AV_LOG_DEBUG, "Accepting new clients.\n");
+        reply_code = 200;
+        if (publisher_reserve_client(info->pub)) {
+            av_log(client, AV_LOG_WARNING, "No more client slots free, Returning 503.\n");
+            reply_code = 503;
+        }
+        
+        if ((ret = info->httpd->accept(server, &client, reply_code)) < 0) {
+            if (ret == HTTPD_LISTEN_TIMEOUT) {
+                publisher_cancel_reserve(info->pub);
+                continue;
+            } else if (ret == HTTPD_CLIENT_ERROR) {
+                info->httpd->close(server, client);
+            }
+            av_log(server, AV_LOG_WARNING, "Error during accept, retrying.\n");
+            publisher_cancel_reserve(info->pub);
+            continue;
+        }
+        
+        if (reply_code != 200) {
+            publisher_cancel_reserve(info->pub);
+            info->httpd->close(server, client);
+            continue;
+        }
+        
+        avio_buffer = av_malloc(AV_BUFSIZE);
+        ffinfo = av_malloc(sizeof(struct FFServerInfo));
+        ffinfo->httpd = info->httpd;
+        ffinfo->client = client;
+        ffinfo->server = server;
+        client_ctx = avio_alloc_context(avio_buffer, AV_BUFSIZE, 1, ffinfo, NULL, &ffserver_write, NULL);
+        if (!client_ctx) {
+            av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
+            publisher_cancel_reserve(info->pub);
+            info->httpd->close(server, client);
+            av_free(client_ctx->buffer);
+            avio_context_free(&client_ctx);
+            av_free(ffinfo);
+            continue;
+        }
+        avformat_alloc_output_context2(&ofmt_ctx, NULL, "matroska", NULL);
+        if (!ofmt_ctx) {
+            av_log(client, AV_LOG_ERROR, "Could not allocate output format context.\n");
+            publisher_cancel_reserve(info->pub);
+            info->httpd->close(server, client);
+            avformat_free_context(ofmt_ctx);
+            av_free(client_ctx->buffer);
+            avio_context_free(&client_ctx);
+            av_free(ffinfo);
+            continue;
+        }
+        if ((ret = av_dict_set(&mkvopts, "live", "1", 0)) < 0) {
+            av_log(client, AV_LOG_ERROR, "Failed to set live mode for matroska: %s\n", av_err2str(ret));
+            publisher_cancel_reserve(info->pub);
+            info->httpd->close(server, client);
+            avformat_free_context(ofmt_ctx);
+            av_free(client_ctx->buffer);
+            avio_context_free(&client_ctx);
+            av_free(ffinfo);
+            continue;
+        }        
+        ofmt_ctx->flags |= AVFMT_FLAG_GENPTS;
+        ofmt = ofmt_ctx->oformat;
+        ofmt->flags &= AVFMT_NOFILE;
+        
+        for (i = 0; i < info->ifmt_ctx->nb_streams; i++) {
+            in_stream = info->ifmt_ctx->streams[i];
+            out_stream = avformat_new_stream(ofmt_ctx, NULL);
+            
+            if (!out_stream) {
+                av_log(client, AV_LOG_ERROR, "Could not allocate output stream.\n");
+                publisher_cancel_reserve(info->pub);
+                info->httpd->close(server, client);
+                avformat_free_context(ofmt_ctx);
+                av_free(client_ctx->buffer);
+                avio_context_free(&client_ctx);
+                av_free(ffinfo);
+                continue;
+            }
+            
+            ret = avcodec_parameters_copy(out_stream->codecpar, in_stream->codecpar);
+            if (ret < 0) {
+                av_log(client, AV_LOG_ERROR, "Failed to copy context from input to output stream codec context: %s.\n", av_err2str(ret));
+                publisher_cancel_reserve(info->pub);
+                info->httpd->close(server, client);
+                avformat_free_context(ofmt_ctx);
+                av_free(client_ctx->buffer);
+                avio_context_free(&client_ctx);
+                av_free(ffinfo);
+                continue;
+            }
+            if (out_stream->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
+                if (in_stream->sample_aspect_ratio.num)
+                    out_stream->sample_aspect_ratio = in_stream->sample_aspect_ratio;
+                out_stream->avg_frame_rate = in_stream->avg_frame_rate;
+                out_stream->r_frame_rate = in_stream->r_frame_rate;
+            }
+            av_dict_copy(&out_stream->metadata, in_stream->metadata, 0);
+        }
+        av_dict_copy(&info->ifmt_ctx->metadata, ofmt_ctx->metadata, 0);
+        ofmt_ctx->pb = client_ctx;
+        ret = avformat_write_header(ofmt_ctx, &mkvopts);
+        if (ret < 0) {
+            av_log(client, AV_LOG_ERROR, "Could not write header to client: %s.\n", av_err2str(ret));
+            publisher_cancel_reserve(info->pub);
+            info->httpd->close(server, client);
+            avformat_free_context(ofmt_ctx);
+            av_free(client_ctx->buffer);
+            avio_context_free(&client_ctx);
+            av_free(ffinfo);
+            continue;
+        }
+        publisher_add_client(info->pub, ofmt_ctx, ffinfo);
+        ofmt_ctx = NULL;
+        
+    }
+    av_log(server, AV_LOG_INFO, "Shutting down http server.\n");
+    info->httpd->shutdown(server);
+    av_log(NULL, AV_LOG_INFO, "Shut down http server.\n");
+    return NULL;
+}
+
+void *write_thread(void *arg)
+{
+    struct WriteInfo *info = (struct WriteInfo*) arg;
+    int i, nb_free;
+    struct Client *c;
+    for(;;) {
+        nb_free = 0;
+        usleep(500000);
+        av_log(NULL, AV_LOG_DEBUG, "Checking clients, thread: %d\n", info->thread_id);
+        for (i = 0; i < MAX_CLIENTS; i++) {
+            c = &info->pub->clients[i];
+            switch(c->state) {
+                case WRITABLE:
+                    write_segment(c);
+                    if (info->pub->shutdown && info->pub->current_segment_id == c->current_segment_id) {
+                        client_disconnect(c, 1);
+                    }
+                    continue;
+                case FREE:
+                    nb_free++;
+                default:
+                    continue;
+            }
+        }
+        if (info->pub->shutdown && nb_free == MAX_CLIENTS)
+            break;
+    }
+    
+    return NULL;
+}
+
+
+int main(int argc, char *argv[])
+{
+    struct ReadInfo rinfo;
+    struct AcceptInfo ainfo;
+    struct WriteInfo *winfos;
+    struct PublisherContext *pub;
+    int ret, i;
+    pthread_t r_thread, a_thread;
+    pthread_t *w_threads;
+    
+    AVFormatContext *ifmt_ctx = NULL;
+    
+    rinfo.in_filename = "pipe:0";
+    if (argc > 1)
+        rinfo.in_filename = argv[1];
+    
+    av_log_set_level(AV_LOG_INFO);
+    
+    if ((ret = avformat_open_input(&ifmt_ctx, rinfo.in_filename, NULL, NULL))) {
+        av_log(NULL, AV_LOG_ERROR, "main: Could not open input\n");
+        return 1;
+    }
+    
+    publisher_init(&pub);
+    
+    rinfo.ifmt_ctx = ifmt_ctx;
+    rinfo.pub = pub;
+    ainfo.ifmt_ctx = ifmt_ctx;
+    ainfo.pub = pub;
+    ainfo.httpd = &lavfhttpd;
+    
+    w_threads = (pthread_t*) av_malloc(sizeof(pthread_t) * pub->nb_threads);
+    winfos = (struct WriteInfo*) av_malloc(sizeof(struct WriteInfo) * pub->nb_threads);
+    
+    for (i = 0; i < pub->nb_threads; i++) {
+        winfos[i].pub = pub;
+        winfos[i].thread_id = i;
+        pthread_create(&w_threads[i], NULL, write_thread, &winfos[i]);
+    }
+    
+    pthread_create(&r_thread, NULL, read_thread, &rinfo);
+    
+    accept_thread(&ainfo);
+    
+    pthread_join(r_thread, NULL);
+    
+    for (i = 0; i < pub->nb_threads; i++) {
+        pthread_join(w_threads[i], NULL);
+    }
+    av_free(w_threads);
+    av_free(winfos);
+    
+    publisher_freep(&pub);
+    return 0;
+}