diff mbox series

[FFmpeg-devel,v2,02/03] libavdevice/avfoundation.m: Replace mutex-based concurrency handling in avfoundation.m by a thread-safe fifo queue with maximum length.

Message ID 1299D3A3-E37C-4EE2-9F00-BCBA46FAD441@rastageeks.org
State New
Headers show
Series [FFmpeg-devel,v2,01/03] libavdevice/avfoundation.m: use AudioConvert, extend supported formats | expand

Checks

Context Check Description
andriy/configurex86 warning Failed to apply patch
andriy/configureppc warning Failed to apply patch

Commit Message

Romain Beauxis Dec. 13, 2021, 4:39 p.m. UTC
This is the second patch of a series of 3 that cleanup and enhance the
avfoundation implementation for libavdevice.

This patch fixes the concurrency model. Avfoundation runs its own producing thread
to send produced frames and ffmpeg runs its own thread to consume them.

The existing implementation stores the last transmitted frame and uses a mutex
to avoid concurrent access. However, this leads to situations where upcoming frames
can be dropped if the ffmpeg thread is acessing the latest frame. This happens
even when the thread would otherwise catch up and process frames fast enought.

This patches changes this implementation to use a buffer queue with a max queue length
and encapsulated thread-safety. This greatly simplifies the logic of the calling code
and gives the consuming thread a chance to process all frames concurrently to the producing
thread while avoiding memory leaks.

Signed-off-by: Romain Beauxis <toots@rastageeks.org>
---
libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
1 file changed, 127 insertions(+), 93 deletions(-)

Comments

Marvin Scholz Dec. 13, 2021, 6:56 p.m. UTC | #1
On 13 Dec 2021, at 17:39, Romain Beauxis wrote:

> This is the second patch of a series of 3 that cleanup and enhance the
> avfoundation implementation for libavdevice.
>
> This patch fixes the concurrency model. Avfoundation runs its own 
> producing thread
> to send produced frames and ffmpeg runs its own thread to consume 
> them.
>
> The existing implementation stores the last transmitted frame and uses 
> a mutex
> to avoid concurrent access. However, this leads to situations where 
> upcoming frames
> can be dropped if the ffmpeg thread is acessing the latest frame. This 
> happens
> even when the thread would otherwise catch up and process frames fast 
> enought.
>
> This patches changes this implementation to use a buffer queue with a 
> max queue length
> and encapsulated thread-safety. This greatly simplifies the logic of 
> the calling code
> and gives the consuming thread a chance to process all frames 
> concurrently to the producing
> thread while avoiding memory leaks.

Couldn't this just use CMSimpleQueue 
https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
or CMBufferQueue?

The implementation of the queue in this patch does not seem right, see 
review below.

>
> Signed-off-by: Romain Beauxis <toots@rastageeks.org>
> ---
> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
> 1 file changed, 127 insertions(+), 93 deletions(-)
>
> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
> index 79c9207cfa..95414fd16a 100644
> --- a/libavdevice/avfoundation.m
> +++ b/libavdevice/avfoundation.m
> @@ -26,7 +26,6 @@
>  */
>
> #import <AVFoundation/AVFoundation.h>
> -#include <pthread.h>
>
> #include "libavutil/channel_layout.h"
> #include "libavutil/pixdesc.h"
> @@ -80,13 +79,97 @@
>     { AV_PIX_FMT_NONE, 0 }
> };
>
> +#define MAX_QUEUED_OBJECTS 10
> +
> +@interface AvdeviceAvfoundationBuffer : NSObject
> ++ (AvdeviceAvfoundationBuffer *) 
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
> +- (CMSampleBufferRef) getCMSampleBuffer;
> +@end
> +
> +@implementation AvdeviceAvfoundationBuffer {
> +    CMSampleBufferRef sampleBuffer;
> +}
> +
> ++ (AvdeviceAvfoundationBuffer *) 
> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
> +}
> +
> +- (id) init:(CMSampleBufferRef)buffer {
> +    sampleBuffer = buffer;
> +    return self;
> +}
> +
> +- (CMSampleBufferRef) getCMSampleBuffer {
> +    return sampleBuffer;
> +}
> +@end
> +
> +@interface AvdeviceAvfoundationBufferQueue : NSObject
> +- (CMSampleBufferRef) dequeue;
> +- (NSUInteger) count;
> +- (void) enqueue:(CMSampleBufferRef)obj;
> +@end
> +
> +@implementation AvdeviceAvfoundationBufferQueue {
> +    NSLock *mutex;
> +    NSMutableArray *queue;
> +}
> +
> +- (id) init {
> +    mutex = [[[NSLock alloc] init] retain];
> +    queue = [[[NSMutableArray alloc] init] retain];
> +    return self;
> +}
> +
> +- (oneway void) release {
> +    NSEnumerator *enumerator = [queue objectEnumerator];
> +    AvdeviceAvfoundationBuffer *buffer;
> +
> +    while (buffer = [enumerator nextObject]) {
> +        CFRelease([buffer getCMSampleBuffer]);
> +    }
> +
> +    [mutex release];
> +    [queue release];
> +}

Shouldn't this be done in dealloc instead of release?
Especially as retain is not subclassed, so this seems
like it could lead to over-releasing resources.

> +
> +- (NSUInteger) count {
> +    [mutex lock];
> +    NSUInteger c = [queue count];
> +    [mutex unlock];
> +    return c;
> +}

This does not look right, the count can change after it is returned
and the caller does not hold a lock to prevent this.

> +
> +- (CMSampleBufferRef) dequeue {
> +    [mutex lock];
> +
> +    if ([queue count] < 1) {
> +      [mutex unlock];
> +      return nil;
> +    }
> +
> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
> +    [queue removeObjectAtIndex:0];
> +    [mutex unlock];
> +
> +    return sampleBuffer;
> +}
> +
> +- (void) enqueue:(CMSampleBufferRef)buffer {
> +    [mutex lock];
> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
> +      [queue removeObjectAtIndex:0];
> +    }
> +    [queue addObject:[AvdeviceAvfoundationBuffer 
> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
> +    [mutex unlock];
> +}
> +@end
> +
> typedef struct
> {
>     AVClass*        class;
>
> -    int             frames_captured;
> -    int             audio_frames_captured;
> -    pthread_mutex_t frame_lock;
>     id              avf_delegate;
>     id              avf_audio_delegate;
>
> @@ -121,8 +204,8 @@
>     AVCaptureSession         *capture_session;
>     AVCaptureVideoDataOutput *video_output;
>     AVCaptureAudioDataOutput *audio_output;
> -    CMSampleBufferRef         current_frame;
> -    CMSampleBufferRef         current_audio_frame;
> +    AvdeviceAvfoundationBufferQueue *audio_frames;
> +    AvdeviceAvfoundationBufferQueue *video_frames;
>
>     AVCaptureDevice          *observed_device;
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
> @@ -131,16 +214,6 @@
>     int                      observed_quit;
> } AVFContext;
>
> -static void lock_frames(AVFContext* ctx)
> -{
> -    pthread_mutex_lock(&ctx->frame_lock);
> -}
> -
> -static void unlock_frames(AVFContext* ctx)
> -{
> -    pthread_mutex_unlock(&ctx->frame_lock);
> -}
> -
> /** FrameReciever class - delegate for AVCaptureSession
>  */
> @interface AVFFrameReceiver : NSObject
> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput 
> *)captureOutput
>   didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>          fromConnection:(AVCaptureConnection *)connection
> {
> -    lock_frames(_context);
> -
> -    if (_context->current_frame != nil) {
> -        CFRelease(_context->current_frame);
> -    }
> -
> -    _context->current_frame = 
> (CMSampleBufferRef)CFRetain(videoFrame);
> -
> -    unlock_frames(_context);
> -
> -    ++_context->frames_captured;
> +    [_context->video_frames enqueue:videoFrame];
> }
>
> @end
> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput 
> *)captureOutput
>   didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>          fromConnection:(AVCaptureConnection *)connection
> {
> -    lock_frames(_context);
> -
> -    if (_context->current_audio_frame != nil) {
> -        CFRelease(_context->current_audio_frame);
> -    }
> -
> -    _context->current_audio_frame = 
> (CMSampleBufferRef)CFRetain(audioFrame);
> -
> -    unlock_frames(_context);
> -
> -    ++_context->audio_frames_captured;
> +    [_context->audio_frames enqueue:audioFrame];
> }
>
> @end
> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>     [ctx->capture_session release];
>     [ctx->video_output    release];
>     [ctx->audio_output    release];
> +    [ctx->video_frames    release];
> +    [ctx->audio_frames    release];
>     [ctx->avf_delegate    release];
>     [ctx->avf_audio_delegate release];
>
>     ctx->capture_session = NULL;
>     ctx->video_output    = NULL;
>     ctx->audio_output    = NULL;
> +    ctx->video_frames    = NULL;
> +    ctx->audio_frames    = NULL;
>     ctx->avf_delegate    = NULL;
>     ctx->avf_audio_delegate = NULL;
>
> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>       AudioConverterDispose(ctx->audio_converter);
>       ctx->audio_converter = NULL;
>     }
> -
> -    pthread_mutex_destroy(&ctx->frame_lock);
> -
> -    if (ctx->current_frame) {
> -        CFRelease(ctx->current_frame);
> -    }
> }
>
> static void parse_device_name(AVFormatContext *s)
> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
>     }
>
>     // Take stream info from the first frame.
> -    while (ctx->frames_captured < 1) {
> +    while ([ctx->video_frames count] < 1) {
>         CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>     }
>
> -    lock_frames(ctx);
> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>
>     ctx->video_stream_index = stream->index;
>
>     avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> -    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>
>     if (image_buffer) {
>         image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>         stream->codecpar->format     = ctx->pixel_format;
>     }
>
> -    CFRelease(ctx->current_frame);
> -    ctx->current_frame = nil;
> -
> -    unlock_frames(ctx);
> +    CFRelease(frame);
>
>     return 0;
> }
> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
>     }
>
>     // Take stream info from the first frame.
> -    while (ctx->audio_frames_captured < 1) {
> +    while ([ctx->audio_frames count] < 1) {
>         CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>     }
>
> -    lock_frames(ctx);
> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>
>     ctx->audio_stream_index = stream->index;
>
>     avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>
> -    format_desc = 
> CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>     const AudioStreamBasicDescription *input_format = 
> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>
>     if (!input_format) {
> -        unlock_frames(ctx);
> +        CFRelease(frame);
>         av_log(s, AV_LOG_ERROR, "audio format not available\n");
>         return 1;
>     }
>
>     if (input_format->mFormatID != kAudioFormatLinearPCM) {
> -        unlock_frames(ctx);
> +        CFRelease(frame);
>         av_log(s, AV_LOG_ERROR, "only PCM audio format are supported 
> at the moment\n");
>         return 1;
>     }
> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
>     if (must_convert) {
>         OSStatus ret = AudioConverterNew(input_format, &output_format, 
> &ctx->audio_converter);
>         if (ret != noErr) {
> -            unlock_frames(ctx);
> +            CFRelease(frame);
>             av_log(s, AV_LOG_ERROR, "Error while allocating audio 
> converter\n");
>             return 1;
>         }
>     }
>
> -    CFRelease(ctx->current_audio_frame);
> -    ctx->current_audio_frame = nil;
> -
> -    unlock_frames(ctx);
> +    CFRelease(frame);
>
>     return 0;
> }
> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>
>     ctx->num_video_devices = [devices count] + [devices_muxed count];
>
> -    pthread_mutex_init(&ctx->frame_lock, NULL);
> -
> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>     CGGetActiveDisplayList(0, NULL, &num_screens);
> #endif
> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>
>     // Initialize capture session
>     ctx->capture_session = [[AVCaptureSession alloc] init];
> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
> init];
> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
> init];
>
>     if (video_device && add_video_device(s, video_device)) {
>         goto fail;
> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>     AVFContext* ctx = (AVFContext*)s->priv_data;
>
>     do {
> -        CVImageBufferRef image_buffer;
> -        CMBlockBufferRef block_buffer;
> -        lock_frames(ctx);
> -
> -        if (ctx->current_frame != nil) {
> +        if (1 <= [ctx->video_frames count]) {
>             int status;
>             int length = 0;
> -
> -            image_buffer = 
> CMSampleBufferGetImageBuffer(ctx->current_frame);
> -            block_buffer = 
> CMSampleBufferGetDataBuffer(ctx->current_frame);
> +            CMSampleBufferRef video_frame = [ctx->video_frames 
> dequeue];
> +            CVImageBufferRef image_buffer = 
> CMSampleBufferGetImageBuffer(video_frame);;
> +            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(video_frame);
>
>             if (image_buffer != nil) {
>                 length = (int)CVPixelBufferGetDataSize(image_buffer);
>             } else if (block_buffer != nil) {
>                 length = 
> (int)CMBlockBufferGetDataLength(block_buffer);
>             } else  {
> -                unlock_frames(ctx);
> +                CFRelease(video_frame);
>                 return AVERROR(EINVAL);
>             }
>
>             if (av_new_packet(pkt, length) < 0) {
> -                unlock_frames(ctx);
> +                CFRelease(video_frame);
>                 return AVERROR(EIO);
>             }
>
>             CMItemCount count;
>             CMSampleTimingInfo timing_info;
>
> -            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, 
> &timing_info, &count) == noErr) {
> +            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, 
> &timing_info, &count) == noErr) {
>                 AVRational timebase_q = av_make_q(1, 
> timing_info.presentationTimeStamp.timescale);
>                 pkt->pts = pkt->dts = 
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
> avf_time_base_q);
>             }
> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>                     status = AVERROR(EIO);
>                 }
>              }
> -            CFRelease(ctx->current_frame);
> -            ctx->current_frame = nil;
> +            CFRelease(video_frame);
>
>             if (status < 0) {
> -                unlock_frames(ctx);
>                 return status;
>             }
> -        } else if (ctx->current_audio_frame != nil) {
> -            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
> +        } else if (1 <= [ctx->audio_frames count]) {
> +            CMSampleBufferRef audio_frame = [ctx->audio_frames 
> dequeue];
> +            CMBlockBufferRef block_buffer = 
> CMSampleBufferGetDataBuffer(audio_frame);
>
>             size_t input_size = 
> CMBlockBufferGetDataLength(block_buffer);
>             int buffer_size = input_size / ctx->audio_buffers;
> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             UInt32 size = sizeof(output_size);
>             ret = AudioConverterGetProperty(ctx->audio_converter, 
> kAudioConverterPropertyCalculateOutputBufferSize, &size, 
> &output_size);
>             if (ret != noErr) {
> -                unlock_frames(ctx);
> +                CFRelease(audio_frame);
>                 return AVERROR(EIO);
>             }
>
>             if (av_new_packet(pkt, output_size) < 0) {
> -                unlock_frames(ctx);
> +                CFRelease(audio_frame);
>                 return AVERROR(EIO);
>             }
>
> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>
>                     if (ret != kCMBlockBufferNoErr) {
>                         av_free(input_buffer);
> -                        unlock_frames(ctx);
> +                        CFRelease(audio_frame);
>                         return AVERROR(EIO);
>                     }
>                 }
> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>                 av_free(input_buffer);
>
>                 if (ret != noErr) {
> -                    unlock_frames(ctx);
> +                    CFRelease(audio_frame);
>                     return AVERROR(EIO);
>                 }
>
> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             } else {
>                  ret = CMBlockBufferCopyDataBytes(block_buffer, 0, 
> pkt->size, pkt->data);
>                  if (ret != kCMBlockBufferNoErr) {
> -                     unlock_frames(ctx);
> +                     CFRelease(audio_frame);
>                      return AVERROR(EIO);
>                  }
>             }
> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             CMItemCount count;
>             CMSampleTimingInfo timing_info;
>
> -            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 
> 1, &timing_info, &count) == noErr) {
> +            if 
> (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, 
> &timing_info, &count) == noErr) {
>                 AVRational timebase_q = av_make_q(1, 
> timing_info.presentationTimeStamp.timescale);
>                 pkt->pts = pkt->dts = 
> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
> avf_time_base_q);
>             }
> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, 
> AVPacket *pkt)
>             pkt->stream_index  = ctx->audio_stream_index;
>             pkt->flags        |= AV_PKT_FLAG_KEY;
>
> -            CFRelease(ctx->current_audio_frame);
> -            ctx->current_audio_frame = nil;
> -
> -            unlock_frames(ctx);
> +            CFRelease(audio_frame);
>         } else {
>             pkt->data = NULL;
> -            unlock_frames(ctx);
>             if (ctx->observed_quit) {
>                 return AVERROR_EOF;
>             } else {
>                 return AVERROR(EAGAIN);
>             }
>         }
> -
> -        unlock_frames(ctx);
>     } while (!pkt->data);
>
>     return 0;
> -- 
> 2.30.1 (Apple Git-130)
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Romain Beauxis Dec. 13, 2021, 8:29 p.m. UTC | #2
> On Dec 13, 2021, at 12:56 PM, Marvin Scholz <epirat07@gmail.com> wrote:
> 
> 
> 
> On 13 Dec 2021, at 17:39, Romain Beauxis wrote:
> 
>> This is the second patch of a series of 3 that cleanup and enhance the
>> avfoundation implementation for libavdevice.
>> 
>> This patch fixes the concurrency model. Avfoundation runs its own producing thread
>> to send produced frames and ffmpeg runs its own thread to consume them.
>> 
>> The existing implementation stores the last transmitted frame and uses a mutex
>> to avoid concurrent access. However, this leads to situations where upcoming frames
>> can be dropped if the ffmpeg thread is acessing the latest frame. This happens
>> even when the thread would otherwise catch up and process frames fast enought.
>> 
>> This patches changes this implementation to use a buffer queue with a max queue length
>> and encapsulated thread-safety. This greatly simplifies the logic of the calling code
>> and gives the consuming thread a chance to process all frames concurrently to the producing
>> thread while avoiding memory leaks.
> 
> Couldn't this just use CMSimpleQueue https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
> or CMBufferQueue?

I’m happy to switch to this one, which seems more directly related to the task at hand if you think it is a better primitive.

> The implementation of the queue in this patch does not seem right, see review below.
> 
>> 
>> Signed-off-by: Romain Beauxis <toots@rastageeks.org>
>> ---
>> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
>> 1 file changed, 127 insertions(+), 93 deletions(-)
>> 
>> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
>> index 79c9207cfa..95414fd16a 100644
>> --- a/libavdevice/avfoundation.m
>> +++ b/libavdevice/avfoundation.m
>> @@ -26,7 +26,6 @@
>> */
>> 
>> #import <AVFoundation/AVFoundation.h>
>> -#include <pthread.h>
>> 
>> #include "libavutil/channel_layout.h"
>> #include "libavutil/pixdesc.h"
>> @@ -80,13 +79,97 @@
>>    { AV_PIX_FMT_NONE, 0 }
>> };
>> 
>> +#define MAX_QUEUED_OBJECTS 10
>> +
>> +@interface AvdeviceAvfoundationBuffer : NSObject
>> ++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
>> +- (CMSampleBufferRef) getCMSampleBuffer;
>> +@end
>> +
>> +@implementation AvdeviceAvfoundationBuffer {
>> +    CMSampleBufferRef sampleBuffer;
>> +}
>> +
>> ++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
>> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
>> +}
>> +
>> +- (id) init:(CMSampleBufferRef)buffer {
>> +    sampleBuffer = buffer;
>> +    return self;
>> +}
>> +
>> +- (CMSampleBufferRef) getCMSampleBuffer {
>> +    return sampleBuffer;
>> +}
>> +@end
>> +
>> +@interface AvdeviceAvfoundationBufferQueue : NSObject
>> +- (CMSampleBufferRef) dequeue;
>> +- (NSUInteger) count;
>> +- (void) enqueue:(CMSampleBufferRef)obj;
>> +@end
>> +
>> +@implementation AvdeviceAvfoundationBufferQueue {
>> +    NSLock *mutex;
>> +    NSMutableArray *queue;
>> +}
>> +
>> +- (id) init {
>> +    mutex = [[[NSLock alloc] init] retain];
>> +    queue = [[[NSMutableArray alloc] init] retain];
>> +    return self;
>> +}
>> +
>> +- (oneway void) release {
>> +    NSEnumerator *enumerator = [queue objectEnumerator];
>> +    AvdeviceAvfoundationBuffer *buffer;
>> +
>> +    while (buffer = [enumerator nextObject]) {
>> +        CFRelease([buffer getCMSampleBuffer]);
>> +    }
>> +
>> +    [mutex release];
>> +    [queue release];
>> +}
> 
> Shouldn't this be done in dealloc instead of release?
> Especially as retain is not subclassed, so this seems
> like it could lead to over-releasing resources.

I’m fairly new to objective-c’s memory model, I’ll double check those.

> 
>> +
>> +- (NSUInteger) count {
>> +    [mutex lock];
>> +    NSUInteger c = [queue count];
>> +    [mutex unlock];
>> +    return c;
>> +}
> 
> This does not look right, the count can change after it is returned
> and the caller does not hold a lock to prevent this.

For a generic queue it is indeed. However, here, it is used in a monotonic fashion only:
* One thread only increases the frame count by pushing new ones
* One thread only decreases the frame count by pulling existing ones

Frame count is only used by the consuming thread and only to check if there are available frames, therefore, the logic is always sound, i.e. if the frame count never decreases concurrently.

This being said, after writing that it seems that this could be simplified into a Boolean logic that simply says true when frames are available, along with a code comment restating the above. I’ll do that with the next iteration of the patch set.

> 
>> +
>> +- (CMSampleBufferRef) dequeue {
>> +    [mutex lock];
>> +
>> +    if ([queue count] < 1) {
>> +      [mutex unlock];
>> +      return nil;
>> +    }
>> +
>> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
>> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
>> +    [queue removeObjectAtIndex:0];
>> +    [mutex unlock];
>> +
>> +    return sampleBuffer;
>> +}
>> +
>> +- (void) enqueue:(CMSampleBufferRef)buffer {
>> +    [mutex lock];
>> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
>> +      [queue removeObjectAtIndex:0];
>> +    }
>> +    [queue addObject:[AvdeviceAvfoundationBuffer fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
>> +    [mutex unlock];
>> +}
>> +@end
>> +
>> typedef struct
>> {
>>    AVClass*        class;
>> 
>> -    int             frames_captured;
>> -    int             audio_frames_captured;
>> -    pthread_mutex_t frame_lock;
>>    id              avf_delegate;
>>    id              avf_audio_delegate;
>> 
>> @@ -121,8 +204,8 @@
>>    AVCaptureSession         *capture_session;
>>    AVCaptureVideoDataOutput *video_output;
>>    AVCaptureAudioDataOutput *audio_output;
>> -    CMSampleBufferRef         current_frame;
>> -    CMSampleBufferRef         current_audio_frame;
>> +    AvdeviceAvfoundationBufferQueue *audio_frames;
>> +    AvdeviceAvfoundationBufferQueue *video_frames;
>> 
>>    AVCaptureDevice          *observed_device;
>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>> @@ -131,16 +214,6 @@
>>    int                      observed_quit;
>> } AVFContext;
>> 
>> -static void lock_frames(AVFContext* ctx)
>> -{
>> -    pthread_mutex_lock(&ctx->frame_lock);
>> -}
>> -
>> -static void unlock_frames(AVFContext* ctx)
>> -{
>> -    pthread_mutex_unlock(&ctx->frame_lock);
>> -}
>> -
>> /** FrameReciever class - delegate for AVCaptureSession
>> */
>> @interface AVFFrameReceiver : NSObject
>> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
>>  didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>>         fromConnection:(AVCaptureConnection *)connection
>> {
>> -    lock_frames(_context);
>> -
>> -    if (_context->current_frame != nil) {
>> -        CFRelease(_context->current_frame);
>> -    }
>> -
>> -    _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame);
>> -
>> -    unlock_frames(_context);
>> -
>> -    ++_context->frames_captured;
>> +    [_context->video_frames enqueue:videoFrame];
>> }
>> 
>> @end
>> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
>>  didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>>         fromConnection:(AVCaptureConnection *)connection
>> {
>> -    lock_frames(_context);
>> -
>> -    if (_context->current_audio_frame != nil) {
>> -        CFRelease(_context->current_audio_frame);
>> -    }
>> -
>> -    _context->current_audio_frame = (CMSampleBufferRef)CFRetain(audioFrame);
>> -
>> -    unlock_frames(_context);
>> -
>> -    ++_context->audio_frames_captured;
>> +    [_context->audio_frames enqueue:audioFrame];
>> }
>> 
>> @end
>> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>>    [ctx->capture_session release];
>>    [ctx->video_output    release];
>>    [ctx->audio_output    release];
>> +    [ctx->video_frames    release];
>> +    [ctx->audio_frames    release];
>>    [ctx->avf_delegate    release];
>>    [ctx->avf_audio_delegate release];
>> 
>>    ctx->capture_session = NULL;
>>    ctx->video_output    = NULL;
>>    ctx->audio_output    = NULL;
>> +    ctx->video_frames    = NULL;
>> +    ctx->audio_frames    = NULL;
>>    ctx->avf_delegate    = NULL;
>>    ctx->avf_audio_delegate = NULL;
>> 
>> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>>      AudioConverterDispose(ctx->audio_converter);
>>      ctx->audio_converter = NULL;
>>    }
>> -
>> -    pthread_mutex_destroy(&ctx->frame_lock);
>> -
>> -    if (ctx->current_frame) {
>> -        CFRelease(ctx->current_frame);
>> -    }
>> }
>> 
>> static void parse_device_name(AVFormatContext *s)
>> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
>>    }
>> 
>>    // Take stream info from the first frame.
>> -    while (ctx->frames_captured < 1) {
>> +    while ([ctx->video_frames count] < 1) {
>>        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>    }
>> 
>> -    lock_frames(ctx);
>> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>> 
>>    ctx->video_stream_index = stream->index;
>> 
>>    avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>> 
>> -    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
>> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
>> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>> 
>>    if (image_buffer) {
>>        image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
>> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>>        stream->codecpar->format     = ctx->pixel_format;
>>    }
>> 
>> -    CFRelease(ctx->current_frame);
>> -    ctx->current_frame = nil;
>> -
>> -    unlock_frames(ctx);
>> +    CFRelease(frame);
>> 
>>    return 0;
>> }
>> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
>>    }
>> 
>>    // Take stream info from the first frame.
>> -    while (ctx->audio_frames_captured < 1) {
>> +    while ([ctx->audio_frames count] < 1) {
>>        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>    }
>> 
>> -    lock_frames(ctx);
>> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>> 
>>    ctx->audio_stream_index = stream->index;
>> 
>>    avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>> 
>> -    format_desc = CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
>> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>>    const AudioStreamBasicDescription *input_format = CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>> 
>>    if (!input_format) {
>> -        unlock_frames(ctx);
>> +        CFRelease(frame);
>>        av_log(s, AV_LOG_ERROR, "audio format not available\n");
>>        return 1;
>>    }
>> 
>>    if (input_format->mFormatID != kAudioFormatLinearPCM) {
>> -        unlock_frames(ctx);
>> +        CFRelease(frame);
>>        av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the moment\n");
>>        return 1;
>>    }
>> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
>>    if (must_convert) {
>>        OSStatus ret = AudioConverterNew(input_format, &output_format, &ctx->audio_converter);
>>        if (ret != noErr) {
>> -            unlock_frames(ctx);
>> +            CFRelease(frame);
>>            av_log(s, AV_LOG_ERROR, "Error while allocating audio converter\n");
>>            return 1;
>>        }
>>    }
>> 
>> -    CFRelease(ctx->current_audio_frame);
>> -    ctx->current_audio_frame = nil;
>> -
>> -    unlock_frames(ctx);
>> +    CFRelease(frame);
>> 
>>    return 0;
>> }
>> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>> 
>>    ctx->num_video_devices = [devices count] + [devices_muxed count];
>> 
>> -    pthread_mutex_init(&ctx->frame_lock, NULL);
>> -
>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>>    CGGetActiveDisplayList(0, NULL, &num_screens);
>> #endif
>> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>> 
>>    // Initialize capture session
>>    ctx->capture_session = [[AVCaptureSession alloc] init];
>> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>> 
>>    if (video_device && add_video_device(s, video_device)) {
>>        goto fail;
>> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>    AVFContext* ctx = (AVFContext*)s->priv_data;
>> 
>>    do {
>> -        CVImageBufferRef image_buffer;
>> -        CMBlockBufferRef block_buffer;
>> -        lock_frames(ctx);
>> -
>> -        if (ctx->current_frame != nil) {
>> +        if (1 <= [ctx->video_frames count]) {
>>            int status;
>>            int length = 0;
>> -
>> -            image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
>> -            block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>> +            CMSampleBufferRef video_frame = [ctx->video_frames dequeue];
>> +            CVImageBufferRef image_buffer = CMSampleBufferGetImageBuffer(video_frame);;
>> +            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(video_frame);
>> 
>>            if (image_buffer != nil) {
>>                length = (int)CVPixelBufferGetDataSize(image_buffer);
>>            } else if (block_buffer != nil) {
>>                length = (int)CMBlockBufferGetDataLength(block_buffer);
>>            } else  {
>> -                unlock_frames(ctx);
>> +                CFRelease(video_frame);
>>                return AVERROR(EINVAL);
>>            }
>> 
>>            if (av_new_packet(pkt, length) < 0) {
>> -                unlock_frames(ctx);
>> +                CFRelease(video_frame);
>>                return AVERROR(EIO);
>>            }
>> 
>>            CMItemCount count;
>>            CMSampleTimingInfo timing_info;
>> 
>> -            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, &timing_info, &count) == noErr) {
>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, &timing_info, &count) == noErr) {
>>                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
>>                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
>>            }
>> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>                    status = AVERROR(EIO);
>>                }
>>             }
>> -            CFRelease(ctx->current_frame);
>> -            ctx->current_frame = nil;
>> +            CFRelease(video_frame);
>> 
>>            if (status < 0) {
>> -                unlock_frames(ctx);
>>                return status;
>>            }
>> -        } else if (ctx->current_audio_frame != nil) {
>> -            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
>> +        } else if (1 <= [ctx->audio_frames count]) {
>> +            CMSampleBufferRef audio_frame = [ctx->audio_frames dequeue];
>> +            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(audio_frame);
>> 
>>            size_t input_size = CMBlockBufferGetDataLength(block_buffer);
>>            int buffer_size = input_size / ctx->audio_buffers;
>> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>            UInt32 size = sizeof(output_size);
>>            ret = AudioConverterGetProperty(ctx->audio_converter, kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size);
>>            if (ret != noErr) {
>> -                unlock_frames(ctx);
>> +                CFRelease(audio_frame);
>>                return AVERROR(EIO);
>>            }
>> 
>>            if (av_new_packet(pkt, output_size) < 0) {
>> -                unlock_frames(ctx);
>> +                CFRelease(audio_frame);
>>                return AVERROR(EIO);
>>            }
>> 
>> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>> 
>>                    if (ret != kCMBlockBufferNoErr) {
>>                        av_free(input_buffer);
>> -                        unlock_frames(ctx);
>> +                        CFRelease(audio_frame);
>>                        return AVERROR(EIO);
>>                    }
>>                }
>> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>                av_free(input_buffer);
>> 
>>                if (ret != noErr) {
>> -                    unlock_frames(ctx);
>> +                    CFRelease(audio_frame);
>>                    return AVERROR(EIO);
>>                }
>> 
>> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>            } else {
>>                 ret = CMBlockBufferCopyDataBytes(block_buffer, 0, pkt->size, pkt->data);
>>                 if (ret != kCMBlockBufferNoErr) {
>> -                     unlock_frames(ctx);
>> +                     CFRelease(audio_frame);
>>                     return AVERROR(EIO);
>>                 }
>>            }
>> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>            CMItemCount count;
>>            CMSampleTimingInfo timing_info;
>> 
>> -            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, &timing_info, &count) == noErr) {
>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, &timing_info, &count) == noErr) {
>>                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
>>                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
>>            }
>> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>            pkt->stream_index  = ctx->audio_stream_index;
>>            pkt->flags        |= AV_PKT_FLAG_KEY;
>> 
>> -            CFRelease(ctx->current_audio_frame);
>> -            ctx->current_audio_frame = nil;
>> -
>> -            unlock_frames(ctx);
>> +            CFRelease(audio_frame);
>>        } else {
>>            pkt->data = NULL;
>> -            unlock_frames(ctx);
>>            if (ctx->observed_quit) {
>>                return AVERROR_EOF;
>>            } else {
>>                return AVERROR(EAGAIN);
>>            }
>>        }
>> -
>> -        unlock_frames(ctx);
>>    } while (!pkt->data);
>> 
>>    return 0;
>> -- 
>> 2.30.1 (Apple Git-130)
>> 
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel@ffmpeg.org
>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>> 
>> To unsubscribe, visit link above, or email
>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Marvin Scholz Dec. 13, 2021, 9:12 p.m. UTC | #3
On 13 Dec 2021, at 21:29, Romain Beauxis wrote:

>> On Dec 13, 2021, at 12:56 PM, Marvin Scholz <epirat07@gmail.com> 
>> wrote:
>>
>>
>>
>> On 13 Dec 2021, at 17:39, Romain Beauxis wrote:
>>
>>> This is the second patch of a series of 3 that cleanup and enhance 
>>> the
>>> avfoundation implementation for libavdevice.
>>>
>>> This patch fixes the concurrency model. Avfoundation runs its own 
>>> producing thread
>>> to send produced frames and ffmpeg runs its own thread to consume 
>>> them.
>>>
>>> The existing implementation stores the last transmitted frame and 
>>> uses a mutex
>>> to avoid concurrent access. However, this leads to situations where 
>>> upcoming frames
>>> can be dropped if the ffmpeg thread is acessing the latest frame. 
>>> This happens
>>> even when the thread would otherwise catch up and process frames 
>>> fast enought.
>>>
>>> This patches changes this implementation to use a buffer queue with 
>>> a max queue length
>>> and encapsulated thread-safety. This greatly simplifies the logic of 
>>> the calling code
>>> and gives the consuming thread a chance to process all frames 
>>> concurrently to the producing
>>> thread while avoiding memory leaks.
>>
>> Couldn't this just use CMSimpleQueue 
>> https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
>> or CMBufferQueue?
>
> I’m happy to switch to this one, which seems more directly related 
> to the task at hand if you think it is a better primitive.
>

I did not check in details but if either of the existing implementations 
referred to above do the task you need here,
I would prefer if it is used instead of writing your own implementation.

>> The implementation of the queue in this patch does not seem right, 
>> see review below.
>>
>>>
>>> Signed-off-by: Romain Beauxis <toots@rastageeks.org>
>>> ---
>>> libavdevice/avfoundation.m | 220 
>>> +++++++++++++++++++++----------------
>>> 1 file changed, 127 insertions(+), 93 deletions(-)
>>>
>>> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
>>> index 79c9207cfa..95414fd16a 100644
>>> --- a/libavdevice/avfoundation.m
>>> +++ b/libavdevice/avfoundation.m
>>> @@ -26,7 +26,6 @@
>>> */
>>>
>>> #import <AVFoundation/AVFoundation.h>
>>> -#include <pthread.h>
>>>
>>> #include "libavutil/channel_layout.h"
>>> #include "libavutil/pixdesc.h"
>>> @@ -80,13 +79,97 @@
>>>    { AV_PIX_FMT_NONE, 0 }
>>> };
>>>
>>> +#define MAX_QUEUED_OBJECTS 10
>>> +
>>> +@interface AvdeviceAvfoundationBuffer : NSObject
>>> ++ (AvdeviceAvfoundationBuffer *) 
>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
>>> +- (CMSampleBufferRef) getCMSampleBuffer;
>>> +@end
>>> +
>>> +@implementation AvdeviceAvfoundationBuffer {
>>> +    CMSampleBufferRef sampleBuffer;
>>> +}
>>> +
>>> ++ (AvdeviceAvfoundationBuffer *) 
>>> fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
>>> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
>>> +}
>>> +
>>> +- (id) init:(CMSampleBufferRef)buffer {
>>> +    sampleBuffer = buffer;
>>> +    return self;
>>> +}
>>> +
>>> +- (CMSampleBufferRef) getCMSampleBuffer {
>>> +    return sampleBuffer;
>>> +}
>>> +@end
>>> +
>>> +@interface AvdeviceAvfoundationBufferQueue : NSObject
>>> +- (CMSampleBufferRef) dequeue;
>>> +- (NSUInteger) count;
>>> +- (void) enqueue:(CMSampleBufferRef)obj;
>>> +@end
>>> +
>>> +@implementation AvdeviceAvfoundationBufferQueue {
>>> +    NSLock *mutex;
>>> +    NSMutableArray *queue;
>>> +}
>>> +
>>> +- (id) init {
>>> +    mutex = [[[NSLock alloc] init] retain];
>>> +    queue = [[[NSMutableArray alloc] init] retain];
>>> +    return self;
>>> +}
>>> +
>>> +- (oneway void) release {
>>> +    NSEnumerator *enumerator = [queue objectEnumerator];
>>> +    AvdeviceAvfoundationBuffer *buffer;
>>> +
>>> +    while (buffer = [enumerator nextObject]) {
>>> +        CFRelease([buffer getCMSampleBuffer]);
>>> +    }
>>> +
>>> +    [mutex release];
>>> +    [queue release];
>>> +}
>>
>> Shouldn't this be done in dealloc instead of release?
>> Especially as retain is not subclassed, so this seems
>> like it could lead to over-releasing resources.
>
> I’m fairly new to objective-c’s memory model, I’ll double check 
> those.
>
>>
>>> +
>>> +- (NSUInteger) count {
>>> +    [mutex lock];
>>> +    NSUInteger c = [queue count];
>>> +    [mutex unlock];
>>> +    return c;
>>> +}
>>
>> This does not look right, the count can change after it is returned
>> and the caller does not hold a lock to prevent this.
>
> For a generic queue it is indeed. However, here, it is used in a 
> monotonic fashion only:
> * One thread only increases the frame count by pushing new ones
> * One thread only decreases the frame count by pulling existing ones
>

Ok but then this should be clarified in a comment at least, so that it 
is clear how it is
intended to be used to not break its assumptions and introduce bugs in 
the future when someone
else is working on this.

> Frame count is only used by the consuming thread and only to check if 
> there are available frames, therefore, the logic is always sound, i.e. 
> if the frame count never decreases concurrently.
>
> This being said, after writing that it seems that this could be 
> simplified into a Boolean logic that simply says true when frames are 
> available, along with a code comment restating the above. I’ll do 
> that with the next iteration of the patch set.
>
>>
>>> +
>>> +- (CMSampleBufferRef) dequeue {
>>> +    [mutex lock];
>>> +
>>> +    if ([queue count] < 1) {
>>> +      [mutex unlock];
>>> +      return nil;
>>> +    }
>>> +
>>> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
>>> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
>>> +    [queue removeObjectAtIndex:0];
>>> +    [mutex unlock];
>>> +
>>> +    return sampleBuffer;
>>> +}
>>> +
>>> +- (void) enqueue:(CMSampleBufferRef)buffer {
>>> +    [mutex lock];
>>> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
>>> +      [queue removeObjectAtIndex:0];
>>> +    }
>>> +    [queue addObject:[AvdeviceAvfoundationBuffer 
>>> fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
>>> +    [mutex unlock];
>>> +}
>>> +@end
>>> +
>>> typedef struct
>>> {
>>>    AVClass*        class;
>>>
>>> -    int             frames_captured;
>>> -    int             audio_frames_captured;
>>> -    pthread_mutex_t frame_lock;
>>>    id              avf_delegate;
>>>    id              avf_audio_delegate;
>>>
>>> @@ -121,8 +204,8 @@
>>>    AVCaptureSession         *capture_session;
>>>    AVCaptureVideoDataOutput *video_output;
>>>    AVCaptureAudioDataOutput *audio_output;
>>> -    CMSampleBufferRef         current_frame;
>>> -    CMSampleBufferRef         current_audio_frame;
>>> +    AvdeviceAvfoundationBufferQueue *audio_frames;
>>> +    AvdeviceAvfoundationBufferQueue *video_frames;
>>>
>>>    AVCaptureDevice          *observed_device;
>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>>> @@ -131,16 +214,6 @@
>>>    int                      observed_quit;
>>> } AVFContext;
>>>
>>> -static void lock_frames(AVFContext* ctx)
>>> -{
>>> -    pthread_mutex_lock(&ctx->frame_lock);
>>> -}
>>> -
>>> -static void unlock_frames(AVFContext* ctx)
>>> -{
>>> -    pthread_mutex_unlock(&ctx->frame_lock);
>>> -}
>>> -
>>> /** FrameReciever class - delegate for AVCaptureSession
>>> */
>>> @interface AVFFrameReceiver : NSObject
>>> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput 
>>> *)captureOutput
>>>  didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>>>         fromConnection:(AVCaptureConnection *)connection
>>> {
>>> -    lock_frames(_context);
>>> -
>>> -    if (_context->current_frame != nil) {
>>> -        CFRelease(_context->current_frame);
>>> -    }
>>> -
>>> -    _context->current_frame = 
>>> (CMSampleBufferRef)CFRetain(videoFrame);
>>> -
>>> -    unlock_frames(_context);
>>> -
>>> -    ++_context->frames_captured;
>>> +    [_context->video_frames enqueue:videoFrame];
>>> }
>>>
>>> @end
>>> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput 
>>> *)captureOutput
>>>  didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>>>         fromConnection:(AVCaptureConnection *)connection
>>> {
>>> -    lock_frames(_context);
>>> -
>>> -    if (_context->current_audio_frame != nil) {
>>> -        CFRelease(_context->current_audio_frame);
>>> -    }
>>> -
>>> -    _context->current_audio_frame = 
>>> (CMSampleBufferRef)CFRetain(audioFrame);
>>> -
>>> -    unlock_frames(_context);
>>> -
>>> -    ++_context->audio_frames_captured;
>>> +    [_context->audio_frames enqueue:audioFrame];
>>> }
>>>
>>> @end
>>> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>>>    [ctx->capture_session release];
>>>    [ctx->video_output    release];
>>>    [ctx->audio_output    release];
>>> +    [ctx->video_frames    release];
>>> +    [ctx->audio_frames    release];
>>>    [ctx->avf_delegate    release];
>>>    [ctx->avf_audio_delegate release];
>>>
>>>    ctx->capture_session = NULL;
>>>    ctx->video_output    = NULL;
>>>    ctx->audio_output    = NULL;
>>> +    ctx->video_frames    = NULL;
>>> +    ctx->audio_frames    = NULL;
>>>    ctx->avf_delegate    = NULL;
>>>    ctx->avf_audio_delegate = NULL;
>>>
>>> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>>>      AudioConverterDispose(ctx->audio_converter);
>>>      ctx->audio_converter = NULL;
>>>    }
>>> -
>>> -    pthread_mutex_destroy(&ctx->frame_lock);
>>> -
>>> -    if (ctx->current_frame) {
>>> -        CFRelease(ctx->current_frame);
>>> -    }
>>> }
>>>
>>> static void parse_device_name(AVFormatContext *s)
>>> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext 
>>> *s)
>>>    }
>>>
>>>    // Take stream info from the first frame.
>>> -    while (ctx->frames_captured < 1) {
>>> +    while ([ctx->video_frames count] < 1) {
>>>        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>    }
>>>
>>> -    lock_frames(ctx);
>>> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>>>
>>>    ctx->video_stream_index = stream->index;
>>>
>>>    avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>
>>> -    image_buffer = 
>>> CMSampleBufferGetImageBuffer(ctx->current_frame);
>>> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>>> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
>>> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>>>
>>>    if (image_buffer) {
>>>        image_buffer_size = 
>>> CVImageBufferGetEncodedSize(image_buffer);
>>> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>>>        stream->codecpar->format     = ctx->pixel_format;
>>>    }
>>>
>>> -    CFRelease(ctx->current_frame);
>>> -    ctx->current_frame = nil;
>>> -
>>> -    unlock_frames(ctx);
>>> +    CFRelease(frame);
>>>
>>>    return 0;
>>> }
>>> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext 
>>> *s)
>>>    }
>>>
>>>    // Take stream info from the first frame.
>>> -    while (ctx->audio_frames_captured < 1) {
>>> +    while ([ctx->audio_frames count] < 1) {
>>>        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>    }
>>>
>>> -    lock_frames(ctx);
>>> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>>>
>>>    ctx->audio_stream_index = stream->index;
>>>
>>>    avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>
>>> -    format_desc = 
>>> CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
>>> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>>>    const AudioStreamBasicDescription *input_format = 
>>> CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>>>
>>>    if (!input_format) {
>>> -        unlock_frames(ctx);
>>> +        CFRelease(frame);
>>>        av_log(s, AV_LOG_ERROR, "audio format not available\n");
>>>        return 1;
>>>    }
>>>
>>>    if (input_format->mFormatID != kAudioFormatLinearPCM) {
>>> -        unlock_frames(ctx);
>>> +        CFRelease(frame);
>>>        av_log(s, AV_LOG_ERROR, "only PCM audio format are supported 
>>> at the moment\n");
>>>        return 1;
>>>    }
>>> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext 
>>> *s)
>>>    if (must_convert) {
>>>        OSStatus ret = AudioConverterNew(input_format, 
>>> &output_format, &ctx->audio_converter);
>>>        if (ret != noErr) {
>>> -            unlock_frames(ctx);
>>> +            CFRelease(frame);
>>>            av_log(s, AV_LOG_ERROR, "Error while allocating audio 
>>> converter\n");
>>>            return 1;
>>>        }
>>>    }
>>>
>>> -    CFRelease(ctx->current_audio_frame);
>>> -    ctx->current_audio_frame = nil;
>>> -
>>> -    unlock_frames(ctx);
>>> +    CFRelease(frame);
>>>
>>>    return 0;
>>> }
>>> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>>>
>>>    ctx->num_video_devices = [devices count] + [devices_muxed count];
>>>
>>> -    pthread_mutex_init(&ctx->frame_lock, NULL);
>>> -
>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>>>    CGGetActiveDisplayList(0, NULL, &num_screens);
>>> #endif
>>> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>>>
>>>    // Initialize capture session
>>>    ctx->capture_session = [[AVCaptureSession alloc] init];
>>> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
>>> init];
>>> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] 
>>> init];
>>>
>>>    if (video_device && add_video_device(s, video_device)) {
>>>        goto fail;
>>> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext 
>>> *s, AVPacket *pkt)
>>>    AVFContext* ctx = (AVFContext*)s->priv_data;
>>>
>>>    do {
>>> -        CVImageBufferRef image_buffer;
>>> -        CMBlockBufferRef block_buffer;
>>> -        lock_frames(ctx);
>>> -
>>> -        if (ctx->current_frame != nil) {
>>> +        if (1 <= [ctx->video_frames count]) {
>>>            int status;
>>>            int length = 0;
>>> -
>>> -            image_buffer = 
>>> CMSampleBufferGetImageBuffer(ctx->current_frame);
>>> -            block_buffer = 
>>> CMSampleBufferGetDataBuffer(ctx->current_frame);
>>> +            CMSampleBufferRef video_frame = [ctx->video_frames 
>>> dequeue];
>>> +            CVImageBufferRef image_buffer = 
>>> CMSampleBufferGetImageBuffer(video_frame);;
>>> +            CMBlockBufferRef block_buffer = 
>>> CMSampleBufferGetDataBuffer(video_frame);
>>>
>>>            if (image_buffer != nil) {
>>>                length = (int)CVPixelBufferGetDataSize(image_buffer);
>>>            } else if (block_buffer != nil) {
>>>                length = 
>>> (int)CMBlockBufferGetDataLength(block_buffer);
>>>            } else  {
>>> -                unlock_frames(ctx);
>>> +                CFRelease(video_frame);
>>>                return AVERROR(EINVAL);
>>>            }
>>>
>>>            if (av_new_packet(pkt, length) < 0) {
>>> -                unlock_frames(ctx);
>>> +                CFRelease(video_frame);
>>>                return AVERROR(EIO);
>>>            }
>>>
>>>            CMItemCount count;
>>>            CMSampleTimingInfo timing_info;
>>>
>>> -            if 
>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, 
>>> &timing_info, &count) == noErr) {
>>> +            if 
>>> (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, 
>>> &timing_info, &count) == noErr) {
>>>                AVRational timebase_q = av_make_q(1, 
>>> timing_info.presentationTimeStamp.timescale);
>>>                pkt->pts = pkt->dts = 
>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
>>> avf_time_base_q);
>>>            }
>>> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext 
>>> *s, AVPacket *pkt)
>>>                    status = AVERROR(EIO);
>>>                }
>>>             }
>>> -            CFRelease(ctx->current_frame);
>>> -            ctx->current_frame = nil;
>>> +            CFRelease(video_frame);
>>>
>>>            if (status < 0) {
>>> -                unlock_frames(ctx);
>>>                return status;
>>>            }
>>> -        } else if (ctx->current_audio_frame != nil) {
>>> -            CMBlockBufferRef block_buffer = 
>>> CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
>>> +        } else if (1 <= [ctx->audio_frames count]) {
>>> +            CMSampleBufferRef audio_frame = [ctx->audio_frames 
>>> dequeue];
>>> +            CMBlockBufferRef block_buffer = 
>>> CMSampleBufferGetDataBuffer(audio_frame);
>>>
>>>            size_t input_size = 
>>> CMBlockBufferGetDataLength(block_buffer);
>>>            int buffer_size = input_size / ctx->audio_buffers;
>>> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext 
>>> *s, AVPacket *pkt)
>>>            UInt32 size = sizeof(output_size);
>>>            ret = AudioConverterGetProperty(ctx->audio_converter, 
>>> kAudioConverterPropertyCalculateOutputBufferSize, &size, 
>>> &output_size);
>>>            if (ret != noErr) {
>>> -                unlock_frames(ctx);
>>> +                CFRelease(audio_frame);
>>>                return AVERROR(EIO);
>>>            }
>>>
>>>            if (av_new_packet(pkt, output_size) < 0) {
>>> -                unlock_frames(ctx);
>>> +                CFRelease(audio_frame);
>>>                return AVERROR(EIO);
>>>            }
>>>
>>> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>> AVPacket *pkt)
>>>
>>>                    if (ret != kCMBlockBufferNoErr) {
>>>                        av_free(input_buffer);
>>> -                        unlock_frames(ctx);
>>> +                        CFRelease(audio_frame);
>>>                        return AVERROR(EIO);
>>>                    }
>>>                }
>>> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>> AVPacket *pkt)
>>>                av_free(input_buffer);
>>>
>>>                if (ret != noErr) {
>>> -                    unlock_frames(ctx);
>>> +                    CFRelease(audio_frame);
>>>                    return AVERROR(EIO);
>>>                }
>>>
>>> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>> AVPacket *pkt)
>>>            } else {
>>>                 ret = CMBlockBufferCopyDataBytes(block_buffer, 0, 
>>> pkt->size, pkt->data);
>>>                 if (ret != kCMBlockBufferNoErr) {
>>> -                     unlock_frames(ctx);
>>> +                     CFRelease(audio_frame);
>>>                     return AVERROR(EIO);
>>>                 }
>>>            }
>>> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, 
>>> AVPacket *pkt)
>>>            CMItemCount count;
>>>            CMSampleTimingInfo timing_info;
>>>
>>> -            if 
>>> (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 
>>> 1, &timing_info, &count) == noErr) {
>>> +            if 
>>> (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, 
>>> &timing_info, &count) == noErr) {
>>>                AVRational timebase_q = av_make_q(1, 
>>> timing_info.presentationTimeStamp.timescale);
>>>                pkt->pts = pkt->dts = 
>>> av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, 
>>> avf_time_base_q);
>>>            }
>>> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext 
>>> *s, AVPacket *pkt)
>>>            pkt->stream_index  = ctx->audio_stream_index;
>>>            pkt->flags        |= AV_PKT_FLAG_KEY;
>>>
>>> -            CFRelease(ctx->current_audio_frame);
>>> -            ctx->current_audio_frame = nil;
>>> -
>>> -            unlock_frames(ctx);
>>> +            CFRelease(audio_frame);
>>>        } else {
>>>            pkt->data = NULL;
>>> -            unlock_frames(ctx);
>>>            if (ctx->observed_quit) {
>>>                return AVERROR_EOF;
>>>            } else {
>>>                return AVERROR(EAGAIN);
>>>            }
>>>        }
>>> -
>>> -        unlock_frames(ctx);
>>>    } while (!pkt->data);
>>>
>>>    return 0;
>>> -- 
>>> 2.30.1 (Apple Git-130)
>>>
>>> _______________________________________________
>>> ffmpeg-devel mailing list
>>> ffmpeg-devel@ffmpeg.org
>>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>>
>>> To unsubscribe, visit link above, or email
>>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel@ffmpeg.org
>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>
>> To unsubscribe, visit link above, or email
>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Romain Beauxis Dec. 15, 2021, 12:22 a.m. UTC | #4
Just sent an updated patch here: http://ffmpeg.org/pipermail/ffmpeg-devel/2021-December/289684.html

> On Dec 13, 2021, at 3:12 PM, Marvin Scholz <epirat07@gmail.com> wrote:
> 
> 
> 
> On 13 Dec 2021, at 21:29, Romain Beauxis wrote:
> 
>>> On Dec 13, 2021, at 12:56 PM, Marvin Scholz <epirat07@gmail.com> wrote:
>>> 
>>> 
>>> 
>>> On 13 Dec 2021, at 17:39, Romain Beauxis wrote:
>>> 
>>>> This is the second patch of a series of 3 that cleanup and enhance the
>>>> avfoundation implementation for libavdevice.
>>>> 
>>>> This patch fixes the concurrency model. Avfoundation runs its own producing thread
>>>> to send produced frames and ffmpeg runs its own thread to consume them.
>>>> 
>>>> The existing implementation stores the last transmitted frame and uses a mutex
>>>> to avoid concurrent access. However, this leads to situations where upcoming frames
>>>> can be dropped if the ffmpeg thread is acessing the latest frame. This happens
>>>> even when the thread would otherwise catch up and process frames fast enought.
>>>> 
>>>> This patches changes this implementation to use a buffer queue with a max queue length
>>>> and encapsulated thread-safety. This greatly simplifies the logic of the calling code
>>>> and gives the consuming thread a chance to process all frames concurrently to the producing
>>>> thread while avoiding memory leaks.
>>> 
>>> Couldn't this just use CMSimpleQueue https://developer.apple.com/documentation/coremedia/cmsimplequeue?language=objc
>>> or CMBufferQueue?
>> 
>> I’m happy to switch to this one, which seems more directly related to the task at hand if you think it is a better primitive.
>> 
> 
> I did not check in details but if either of the existing implementations referred to above do the task you need here,
> I would prefer if it is used instead of writing your own implementation.
> 
>>> The implementation of the queue in this patch does not seem right, see review below.
>>> 
>>>> 
>>>> Signed-off-by: Romain Beauxis <toots@rastageeks.org>
>>>> ---
>>>> libavdevice/avfoundation.m | 220 +++++++++++++++++++++----------------
>>>> 1 file changed, 127 insertions(+), 93 deletions(-)
>>>> 
>>>> diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
>>>> index 79c9207cfa..95414fd16a 100644
>>>> --- a/libavdevice/avfoundation.m
>>>> +++ b/libavdevice/avfoundation.m
>>>> @@ -26,7 +26,6 @@
>>>> */
>>>> 
>>>> #import <AVFoundation/AVFoundation.h>
>>>> -#include <pthread.h>
>>>> 
>>>> #include "libavutil/channel_layout.h"
>>>> #include "libavutil/pixdesc.h"
>>>> @@ -80,13 +79,97 @@
>>>>   { AV_PIX_FMT_NONE, 0 }
>>>> };
>>>> 
>>>> +#define MAX_QUEUED_OBJECTS 10
>>>> +
>>>> +@interface AvdeviceAvfoundationBuffer : NSObject
>>>> ++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
>>>> +- (CMSampleBufferRef) getCMSampleBuffer;
>>>> +@end
>>>> +
>>>> +@implementation AvdeviceAvfoundationBuffer {
>>>> +    CMSampleBufferRef sampleBuffer;
>>>> +}
>>>> +
>>>> ++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
>>>> +    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
>>>> +}
>>>> +
>>>> +- (id) init:(CMSampleBufferRef)buffer {
>>>> +    sampleBuffer = buffer;
>>>> +    return self;
>>>> +}
>>>> +
>>>> +- (CMSampleBufferRef) getCMSampleBuffer {
>>>> +    return sampleBuffer;
>>>> +}
>>>> +@end
>>>> +
>>>> +@interface AvdeviceAvfoundationBufferQueue : NSObject
>>>> +- (CMSampleBufferRef) dequeue;
>>>> +- (NSUInteger) count;
>>>> +- (void) enqueue:(CMSampleBufferRef)obj;
>>>> +@end
>>>> +
>>>> +@implementation AvdeviceAvfoundationBufferQueue {
>>>> +    NSLock *mutex;
>>>> +    NSMutableArray *queue;
>>>> +}
>>>> +
>>>> +- (id) init {
>>>> +    mutex = [[[NSLock alloc] init] retain];
>>>> +    queue = [[[NSMutableArray alloc] init] retain];
>>>> +    return self;
>>>> +}
>>>> +
>>>> +- (oneway void) release {
>>>> +    NSEnumerator *enumerator = [queue objectEnumerator];
>>>> +    AvdeviceAvfoundationBuffer *buffer;
>>>> +
>>>> +    while (buffer = [enumerator nextObject]) {
>>>> +        CFRelease([buffer getCMSampleBuffer]);
>>>> +    }
>>>> +
>>>> +    [mutex release];
>>>> +    [queue release];
>>>> +}
>>> 
>>> Shouldn't this be done in dealloc instead of release?
>>> Especially as retain is not subclassed, so this seems
>>> like it could lead to over-releasing resources.
>> 
>> I’m fairly new to objective-c’s memory model, I’ll double check those.
>> 
>>> 
>>>> +
>>>> +- (NSUInteger) count {
>>>> +    [mutex lock];
>>>> +    NSUInteger c = [queue count];
>>>> +    [mutex unlock];
>>>> +    return c;
>>>> +}
>>> 
>>> This does not look right, the count can change after it is returned
>>> and the caller does not hold a lock to prevent this.
>> 
>> For a generic queue it is indeed. However, here, it is used in a monotonic fashion only:
>> * One thread only increases the frame count by pushing new ones
>> * One thread only decreases the frame count by pulling existing ones
>> 
> 
> Ok but then this should be clarified in a comment at least, so that it is clear how it is
> intended to be used to not break its assumptions and introduce bugs in the future when someone
> else is working on this.
> 
>> Frame count is only used by the consuming thread and only to check if there are available frames, therefore, the logic is always sound, i.e. if the frame count never decreases concurrently.
>> 
>> This being said, after writing that it seems that this could be simplified into a Boolean logic that simply says true when frames are available, along with a code comment restating the above. I’ll do that with the next iteration of the patch set.
>> 
>>> 
>>>> +
>>>> +- (CMSampleBufferRef) dequeue {
>>>> +    [mutex lock];
>>>> +
>>>> +    if ([queue count] < 1) {
>>>> +      [mutex unlock];
>>>> +      return nil;
>>>> +    }
>>>> +
>>>> +    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
>>>> +    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
>>>> +    [queue removeObjectAtIndex:0];
>>>> +    [mutex unlock];
>>>> +
>>>> +    return sampleBuffer;
>>>> +}
>>>> +
>>>> +- (void) enqueue:(CMSampleBufferRef)buffer {
>>>> +    [mutex lock];
>>>> +    while (MAX_QUEUED_OBJECTS < [queue count]) {
>>>> +      [queue removeObjectAtIndex:0];
>>>> +    }
>>>> +    [queue addObject:[AvdeviceAvfoundationBuffer fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
>>>> +    [mutex unlock];
>>>> +}
>>>> +@end
>>>> +
>>>> typedef struct
>>>> {
>>>>   AVClass*        class;
>>>> 
>>>> -    int             frames_captured;
>>>> -    int             audio_frames_captured;
>>>> -    pthread_mutex_t frame_lock;
>>>>   id              avf_delegate;
>>>>   id              avf_audio_delegate;
>>>> 
>>>> @@ -121,8 +204,8 @@
>>>>   AVCaptureSession         *capture_session;
>>>>   AVCaptureVideoDataOutput *video_output;
>>>>   AVCaptureAudioDataOutput *audio_output;
>>>> -    CMSampleBufferRef         current_frame;
>>>> -    CMSampleBufferRef         current_audio_frame;
>>>> +    AvdeviceAvfoundationBufferQueue *audio_frames;
>>>> +    AvdeviceAvfoundationBufferQueue *video_frames;
>>>> 
>>>>   AVCaptureDevice          *observed_device;
>>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>>>> @@ -131,16 +214,6 @@
>>>>   int                      observed_quit;
>>>> } AVFContext;
>>>> 
>>>> -static void lock_frames(AVFContext* ctx)
>>>> -{
>>>> -    pthread_mutex_lock(&ctx->frame_lock);
>>>> -}
>>>> -
>>>> -static void unlock_frames(AVFContext* ctx)
>>>> -{
>>>> -    pthread_mutex_unlock(&ctx->frame_lock);
>>>> -}
>>>> -
>>>> /** FrameReciever class - delegate for AVCaptureSession
>>>> */
>>>> @interface AVFFrameReceiver : NSObject
>>>> @@ -218,17 +291,7 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
>>>> didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
>>>>        fromConnection:(AVCaptureConnection *)connection
>>>> {
>>>> -    lock_frames(_context);
>>>> -
>>>> -    if (_context->current_frame != nil) {
>>>> -        CFRelease(_context->current_frame);
>>>> -    }
>>>> -
>>>> -    _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame);
>>>> -
>>>> -    unlock_frames(_context);
>>>> -
>>>> -    ++_context->frames_captured;
>>>> +    [_context->video_frames enqueue:videoFrame];
>>>> }
>>>> 
>>>> @end
>>>> @@ -262,17 +325,7 @@ - (void)  captureOutput:(AVCaptureOutput *)captureOutput
>>>> didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
>>>>        fromConnection:(AVCaptureConnection *)connection
>>>> {
>>>> -    lock_frames(_context);
>>>> -
>>>> -    if (_context->current_audio_frame != nil) {
>>>> -        CFRelease(_context->current_audio_frame);
>>>> -    }
>>>> -
>>>> -    _context->current_audio_frame = (CMSampleBufferRef)CFRetain(audioFrame);
>>>> -
>>>> -    unlock_frames(_context);
>>>> -
>>>> -    ++_context->audio_frames_captured;
>>>> +    [_context->audio_frames enqueue:audioFrame];
>>>> }
>>>> 
>>>> @end
>>>> @@ -284,12 +337,16 @@ static void destroy_context(AVFContext* ctx)
>>>>   [ctx->capture_session release];
>>>>   [ctx->video_output    release];
>>>>   [ctx->audio_output    release];
>>>> +    [ctx->video_frames    release];
>>>> +    [ctx->audio_frames    release];
>>>>   [ctx->avf_delegate    release];
>>>>   [ctx->avf_audio_delegate release];
>>>> 
>>>>   ctx->capture_session = NULL;
>>>>   ctx->video_output    = NULL;
>>>>   ctx->audio_output    = NULL;
>>>> +    ctx->video_frames    = NULL;
>>>> +    ctx->audio_frames    = NULL;
>>>>   ctx->avf_delegate    = NULL;
>>>>   ctx->avf_audio_delegate = NULL;
>>>> 
>>>> @@ -297,12 +354,6 @@ static void destroy_context(AVFContext* ctx)
>>>>     AudioConverterDispose(ctx->audio_converter);
>>>>     ctx->audio_converter = NULL;
>>>>   }
>>>> -
>>>> -    pthread_mutex_destroy(&ctx->frame_lock);
>>>> -
>>>> -    if (ctx->current_frame) {
>>>> -        CFRelease(ctx->current_frame);
>>>> -    }
>>>> }
>>>> 
>>>> static void parse_device_name(AVFormatContext *s)
>>>> @@ -630,18 +681,18 @@ static int get_video_config(AVFormatContext *s)
>>>>   }
>>>> 
>>>>   // Take stream info from the first frame.
>>>> -    while (ctx->frames_captured < 1) {
>>>> +    while ([ctx->video_frames count] < 1) {
>>>>       CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>>   }
>>>> 
>>>> -    lock_frames(ctx);
>>>> +    CMSampleBufferRef frame = [ctx->video_frames dequeue];
>>>> 
>>>>   ctx->video_stream_index = stream->index;
>>>> 
>>>>   avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>> 
>>>> -    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
>>>> -    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>>>> +    image_buffer = CMSampleBufferGetImageBuffer(frame);
>>>> +    block_buffer = CMSampleBufferGetDataBuffer(frame);
>>>> 
>>>>   if (image_buffer) {
>>>>       image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
>>>> @@ -657,10 +708,7 @@ static int get_video_config(AVFormatContext *s)
>>>>       stream->codecpar->format     = ctx->pixel_format;
>>>>   }
>>>> 
>>>> -    CFRelease(ctx->current_frame);
>>>> -    ctx->current_frame = nil;
>>>> -
>>>> -    unlock_frames(ctx);
>>>> +    CFRelease(frame);
>>>> 
>>>>   return 0;
>>>> }
>>>> @@ -680,27 +728,27 @@ static int get_audio_config(AVFormatContext *s)
>>>>   }
>>>> 
>>>>   // Take stream info from the first frame.
>>>> -    while (ctx->audio_frames_captured < 1) {
>>>> +    while ([ctx->audio_frames count] < 1) {
>>>>       CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
>>>>   }
>>>> 
>>>> -    lock_frames(ctx);
>>>> +    CMSampleBufferRef frame = [ctx->audio_frames dequeue];
>>>> 
>>>>   ctx->audio_stream_index = stream->index;
>>>> 
>>>>   avpriv_set_pts_info(stream, 64, 1, avf_time_base);
>>>> 
>>>> -    format_desc = CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
>>>> +    format_desc = CMSampleBufferGetFormatDescription(frame);
>>>>   const AudioStreamBasicDescription *input_format = CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);
>>>> 
>>>>   if (!input_format) {
>>>> -        unlock_frames(ctx);
>>>> +        CFRelease(frame);
>>>>       av_log(s, AV_LOG_ERROR, "audio format not available\n");
>>>>       return 1;
>>>>   }
>>>> 
>>>>   if (input_format->mFormatID != kAudioFormatLinearPCM) {
>>>> -        unlock_frames(ctx);
>>>> +        CFRelease(frame);
>>>>       av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the moment\n");
>>>>       return 1;
>>>>   }
>>>> @@ -778,16 +826,13 @@ static int get_audio_config(AVFormatContext *s)
>>>>   if (must_convert) {
>>>>       OSStatus ret = AudioConverterNew(input_format, &output_format, &ctx->audio_converter);
>>>>       if (ret != noErr) {
>>>> -            unlock_frames(ctx);
>>>> +            CFRelease(frame);
>>>>           av_log(s, AV_LOG_ERROR, "Error while allocating audio converter\n");
>>>>           return 1;
>>>>       }
>>>>   }
>>>> 
>>>> -    CFRelease(ctx->current_audio_frame);
>>>> -    ctx->current_audio_frame = nil;
>>>> -
>>>> -    unlock_frames(ctx);
>>>> +    CFRelease(frame);
>>>> 
>>>>   return 0;
>>>> }
>>>> @@ -805,8 +850,6 @@ static int avf_read_header(AVFormatContext *s)
>>>> 
>>>>   ctx->num_video_devices = [devices count] + [devices_muxed count];
>>>> 
>>>> -    pthread_mutex_init(&ctx->frame_lock, NULL);
>>>> -
>>>> #if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
>>>>   CGGetActiveDisplayList(0, NULL, &num_screens);
>>>> #endif
>>>> @@ -1006,6 +1049,8 @@ static int avf_read_header(AVFormatContext *s)
>>>> 
>>>>   // Initialize capture session
>>>>   ctx->capture_session = [[AVCaptureSession alloc] init];
>>>> +    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>>>> +    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
>>>> 
>>>>   if (video_device && add_video_device(s, video_device)) {
>>>>       goto fail;
>>>> @@ -1088,35 +1133,31 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>   AVFContext* ctx = (AVFContext*)s->priv_data;
>>>> 
>>>>   do {
>>>> -        CVImageBufferRef image_buffer;
>>>> -        CMBlockBufferRef block_buffer;
>>>> -        lock_frames(ctx);
>>>> -
>>>> -        if (ctx->current_frame != nil) {
>>>> +        if (1 <= [ctx->video_frames count]) {
>>>>           int status;
>>>>           int length = 0;
>>>> -
>>>> -            image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
>>>> -            block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
>>>> +            CMSampleBufferRef video_frame = [ctx->video_frames dequeue];
>>>> +            CVImageBufferRef image_buffer = CMSampleBufferGetImageBuffer(video_frame);;
>>>> +            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(video_frame);
>>>> 
>>>>           if (image_buffer != nil) {
>>>>               length = (int)CVPixelBufferGetDataSize(image_buffer);
>>>>           } else if (block_buffer != nil) {
>>>>               length = (int)CMBlockBufferGetDataLength(block_buffer);
>>>>           } else  {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(video_frame);
>>>>               return AVERROR(EINVAL);
>>>>           }
>>>> 
>>>>           if (av_new_packet(pkt, length) < 0) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(video_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>> 
>>>>           CMItemCount count;
>>>>           CMSampleTimingInfo timing_info;
>>>> 
>>>> -            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, &timing_info, &count) == noErr) {
>>>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, &timing_info, &count) == noErr) {
>>>>               AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
>>>>               pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
>>>>           }
>>>> @@ -1133,15 +1174,14 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>                   status = AVERROR(EIO);
>>>>               }
>>>>            }
>>>> -            CFRelease(ctx->current_frame);
>>>> -            ctx->current_frame = nil;
>>>> +            CFRelease(video_frame);
>>>> 
>>>>           if (status < 0) {
>>>> -                unlock_frames(ctx);
>>>>               return status;
>>>>           }
>>>> -        } else if (ctx->current_audio_frame != nil) {
>>>> -            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
>>>> +        } else if (1 <= [ctx->audio_frames count]) {
>>>> +            CMSampleBufferRef audio_frame = [ctx->audio_frames dequeue];
>>>> +            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(audio_frame);
>>>> 
>>>>           size_t input_size = CMBlockBufferGetDataLength(block_buffer);
>>>>           int buffer_size = input_size / ctx->audio_buffers;
>>>> @@ -1151,12 +1191,12 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>           UInt32 size = sizeof(output_size);
>>>>           ret = AudioConverterGetProperty(ctx->audio_converter, kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size);
>>>>           if (ret != noErr) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(audio_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>> 
>>>>           if (av_new_packet(pkt, output_size) < 0) {
>>>> -                unlock_frames(ctx);
>>>> +                CFRelease(audio_frame);
>>>>               return AVERROR(EIO);
>>>>           }
>>>> 
>>>> @@ -1173,7 +1213,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>> 
>>>>                   if (ret != kCMBlockBufferNoErr) {
>>>>                       av_free(input_buffer);
>>>> -                        unlock_frames(ctx);
>>>> +                        CFRelease(audio_frame);
>>>>                       return AVERROR(EIO);
>>>>                   }
>>>>               }
>>>> @@ -1191,7 +1231,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>               av_free(input_buffer);
>>>> 
>>>>               if (ret != noErr) {
>>>> -                    unlock_frames(ctx);
>>>> +                    CFRelease(audio_frame);
>>>>                   return AVERROR(EIO);
>>>>               }
>>>> 
>>>> @@ -1199,7 +1239,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>           } else {
>>>>                ret = CMBlockBufferCopyDataBytes(block_buffer, 0, pkt->size, pkt->data);
>>>>                if (ret != kCMBlockBufferNoErr) {
>>>> -                     unlock_frames(ctx);
>>>> +                     CFRelease(audio_frame);
>>>>                    return AVERROR(EIO);
>>>>                }
>>>>           }
>>>> @@ -1207,7 +1247,7 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>           CMItemCount count;
>>>>           CMSampleTimingInfo timing_info;
>>>> 
>>>> -            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, &timing_info, &count) == noErr) {
>>>> +            if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, &timing_info, &count) == noErr) {
>>>>               AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
>>>>               pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
>>>>           }
>>>> @@ -1215,21 +1255,15 @@ static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
>>>>           pkt->stream_index  = ctx->audio_stream_index;
>>>>           pkt->flags        |= AV_PKT_FLAG_KEY;
>>>> 
>>>> -            CFRelease(ctx->current_audio_frame);
>>>> -            ctx->current_audio_frame = nil;
>>>> -
>>>> -            unlock_frames(ctx);
>>>> +            CFRelease(audio_frame);
>>>>       } else {
>>>>           pkt->data = NULL;
>>>> -            unlock_frames(ctx);
>>>>           if (ctx->observed_quit) {
>>>>               return AVERROR_EOF;
>>>>           } else {
>>>>               return AVERROR(EAGAIN);
>>>>           }
>>>>       }
>>>> -
>>>> -        unlock_frames(ctx);
>>>>   } while (!pkt->data);
>>>> 
>>>>   return 0;
>>>> -- 
>>>> 2.30.1 (Apple Git-130)
>>>> 
>>>> _______________________________________________
>>>> ffmpeg-devel mailing list
>>>> ffmpeg-devel@ffmpeg.org
>>>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>>> 
>>>> To unsubscribe, visit link above, or email
>>>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>>> _______________________________________________
>>> ffmpeg-devel mailing list
>>> ffmpeg-devel@ffmpeg.org
>>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>>> 
>>> To unsubscribe, visit link above, or email
>>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
>> 
>> _______________________________________________
>> ffmpeg-devel mailing list
>> ffmpeg-devel@ffmpeg.org
>> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>> 
>> To unsubscribe, visit link above, or email
>> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
> 
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
diff mbox series

Patch

diff --git a/libavdevice/avfoundation.m b/libavdevice/avfoundation.m
index 79c9207cfa..95414fd16a 100644
--- a/libavdevice/avfoundation.m
+++ b/libavdevice/avfoundation.m
@@ -26,7 +26,6 @@ 
 */

#import <AVFoundation/AVFoundation.h>
-#include <pthread.h>

#include "libavutil/channel_layout.h"
#include "libavutil/pixdesc.h"
@@ -80,13 +79,97 @@ 
    { AV_PIX_FMT_NONE, 0 }
};

+#define MAX_QUEUED_OBJECTS 10
+
+@interface AvdeviceAvfoundationBuffer : NSObject
++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer;
+- (CMSampleBufferRef) getCMSampleBuffer;
+@end
+
+@implementation AvdeviceAvfoundationBuffer {
+    CMSampleBufferRef sampleBuffer;
+}
+
++ (AvdeviceAvfoundationBuffer *) fromCMSampleBufferRef:(CMSampleBufferRef)sampleBuffer {
+    return [[AvdeviceAvfoundationBuffer alloc] init:sampleBuffer];
+}
+
+- (id) init:(CMSampleBufferRef)buffer {
+    sampleBuffer = buffer;
+    return self;
+}
+
+- (CMSampleBufferRef) getCMSampleBuffer {
+    return sampleBuffer;
+}
+@end
+
+@interface AvdeviceAvfoundationBufferQueue : NSObject
+- (CMSampleBufferRef) dequeue;
+- (NSUInteger) count;
+- (void) enqueue:(CMSampleBufferRef)obj;
+@end
+
+@implementation AvdeviceAvfoundationBufferQueue {
+    NSLock *mutex;
+    NSMutableArray *queue;
+}
+
+- (id) init {
+    mutex = [[[NSLock alloc] init] retain];
+    queue = [[[NSMutableArray alloc] init] retain];
+    return self;
+}
+
+- (oneway void) release {
+    NSEnumerator *enumerator = [queue objectEnumerator];
+    AvdeviceAvfoundationBuffer *buffer;
+
+    while (buffer = [enumerator nextObject]) {
+        CFRelease([buffer getCMSampleBuffer]);
+    }
+
+    [mutex release];
+    [queue release];
+}
+
+- (NSUInteger) count {
+    [mutex lock];
+    NSUInteger c = [queue count];
+    [mutex unlock];
+    return c;
+}
+
+- (CMSampleBufferRef) dequeue {
+    [mutex lock];
+
+    if ([queue count] < 1) {
+      [mutex unlock];
+      return nil;
+    }
+
+    AvdeviceAvfoundationBuffer *buffer = [queue objectAtIndex:0];
+    CMSampleBufferRef sampleBuffer = [buffer getCMSampleBuffer];
+    [queue removeObjectAtIndex:0];
+    [mutex unlock];
+
+    return sampleBuffer;
+}
+
+- (void) enqueue:(CMSampleBufferRef)buffer {
+    [mutex lock];
+    while (MAX_QUEUED_OBJECTS < [queue count]) {
+      [queue removeObjectAtIndex:0];
+    }
+    [queue addObject:[AvdeviceAvfoundationBuffer fromCMSampleBufferRef:(CMSampleBufferRef)CFRetain(buffer)]];
+    [mutex unlock];
+}
+@end
+
typedef struct
{
    AVClass*        class;

-    int             frames_captured;
-    int             audio_frames_captured;
-    pthread_mutex_t frame_lock;
    id              avf_delegate;
    id              avf_audio_delegate;

@@ -121,8 +204,8 @@ 
    AVCaptureSession         *capture_session;
    AVCaptureVideoDataOutput *video_output;
    AVCaptureAudioDataOutput *audio_output;
-    CMSampleBufferRef         current_frame;
-    CMSampleBufferRef         current_audio_frame;
+    AvdeviceAvfoundationBufferQueue *audio_frames;
+    AvdeviceAvfoundationBufferQueue *video_frames;

    AVCaptureDevice          *observed_device;
#if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
@@ -131,16 +214,6 @@ 
    int                      observed_quit;
} AVFContext;

-static void lock_frames(AVFContext* ctx)
-{
-    pthread_mutex_lock(&ctx->frame_lock);
-}
-
-static void unlock_frames(AVFContext* ctx)
-{
-    pthread_mutex_unlock(&ctx->frame_lock);
-}
-
/** FrameReciever class - delegate for AVCaptureSession
 */
@interface AVFFrameReceiver : NSObject
@@ -218,17 +291,7 @@  - (void)  captureOutput:(AVCaptureOutput *)captureOutput
  didOutputSampleBuffer:(CMSampleBufferRef)videoFrame
         fromConnection:(AVCaptureConnection *)connection
{
-    lock_frames(_context);
-
-    if (_context->current_frame != nil) {
-        CFRelease(_context->current_frame);
-    }
-
-    _context->current_frame = (CMSampleBufferRef)CFRetain(videoFrame);
-
-    unlock_frames(_context);
-
-    ++_context->frames_captured;
+    [_context->video_frames enqueue:videoFrame];
}

@end
@@ -262,17 +325,7 @@  - (void)  captureOutput:(AVCaptureOutput *)captureOutput
  didOutputSampleBuffer:(CMSampleBufferRef)audioFrame
         fromConnection:(AVCaptureConnection *)connection
{
-    lock_frames(_context);
-
-    if (_context->current_audio_frame != nil) {
-        CFRelease(_context->current_audio_frame);
-    }
-
-    _context->current_audio_frame = (CMSampleBufferRef)CFRetain(audioFrame);
-
-    unlock_frames(_context);
-
-    ++_context->audio_frames_captured;
+    [_context->audio_frames enqueue:audioFrame];
}

@end
@@ -284,12 +337,16 @@  static void destroy_context(AVFContext* ctx)
    [ctx->capture_session release];
    [ctx->video_output    release];
    [ctx->audio_output    release];
+    [ctx->video_frames    release];
+    [ctx->audio_frames    release];
    [ctx->avf_delegate    release];
    [ctx->avf_audio_delegate release];

    ctx->capture_session = NULL;
    ctx->video_output    = NULL;
    ctx->audio_output    = NULL;
+    ctx->video_frames    = NULL;
+    ctx->audio_frames    = NULL;
    ctx->avf_delegate    = NULL;
    ctx->avf_audio_delegate = NULL;

@@ -297,12 +354,6 @@  static void destroy_context(AVFContext* ctx)
      AudioConverterDispose(ctx->audio_converter);
      ctx->audio_converter = NULL;
    }
-
-    pthread_mutex_destroy(&ctx->frame_lock);
-
-    if (ctx->current_frame) {
-        CFRelease(ctx->current_frame);
-    }
}

static void parse_device_name(AVFormatContext *s)
@@ -630,18 +681,18 @@  static int get_video_config(AVFormatContext *s)
    }

    // Take stream info from the first frame.
-    while (ctx->frames_captured < 1) {
+    while ([ctx->video_frames count] < 1) {
        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
    }

-    lock_frames(ctx);
+    CMSampleBufferRef frame = [ctx->video_frames dequeue];

    ctx->video_stream_index = stream->index;

    avpriv_set_pts_info(stream, 64, 1, avf_time_base);

-    image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
-    block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
+    image_buffer = CMSampleBufferGetImageBuffer(frame);
+    block_buffer = CMSampleBufferGetDataBuffer(frame);

    if (image_buffer) {
        image_buffer_size = CVImageBufferGetEncodedSize(image_buffer);
@@ -657,10 +708,7 @@  static int get_video_config(AVFormatContext *s)
        stream->codecpar->format     = ctx->pixel_format;
    }

-    CFRelease(ctx->current_frame);
-    ctx->current_frame = nil;
-
-    unlock_frames(ctx);
+    CFRelease(frame);

    return 0;
}
@@ -680,27 +728,27 @@  static int get_audio_config(AVFormatContext *s)
    }

    // Take stream info from the first frame.
-    while (ctx->audio_frames_captured < 1) {
+    while ([ctx->audio_frames count] < 1) {
        CFRunLoopRunInMode(kCFRunLoopDefaultMode, 0.1, YES);
    }

-    lock_frames(ctx);
+    CMSampleBufferRef frame = [ctx->audio_frames dequeue];

    ctx->audio_stream_index = stream->index;

    avpriv_set_pts_info(stream, 64, 1, avf_time_base);

-    format_desc = CMSampleBufferGetFormatDescription(ctx->current_audio_frame);
+    format_desc = CMSampleBufferGetFormatDescription(frame);
    const AudioStreamBasicDescription *input_format = CMAudioFormatDescriptionGetStreamBasicDescription(format_desc);

    if (!input_format) {
-        unlock_frames(ctx);
+        CFRelease(frame);
        av_log(s, AV_LOG_ERROR, "audio format not available\n");
        return 1;
    }

    if (input_format->mFormatID != kAudioFormatLinearPCM) {
-        unlock_frames(ctx);
+        CFRelease(frame);
        av_log(s, AV_LOG_ERROR, "only PCM audio format are supported at the moment\n");
        return 1;
    }
@@ -778,16 +826,13 @@  static int get_audio_config(AVFormatContext *s)
    if (must_convert) {
        OSStatus ret = AudioConverterNew(input_format, &output_format, &ctx->audio_converter);
        if (ret != noErr) {
-            unlock_frames(ctx);
+            CFRelease(frame);
            av_log(s, AV_LOG_ERROR, "Error while allocating audio converter\n");
            return 1;
        }
    }

-    CFRelease(ctx->current_audio_frame);
-    ctx->current_audio_frame = nil;
-
-    unlock_frames(ctx);
+    CFRelease(frame);

    return 0;
}
@@ -805,8 +850,6 @@  static int avf_read_header(AVFormatContext *s)

    ctx->num_video_devices = [devices count] + [devices_muxed count];

-    pthread_mutex_init(&ctx->frame_lock, NULL);
-
#if !TARGET_OS_IPHONE && __MAC_OS_X_VERSION_MIN_REQUIRED >= 1070
    CGGetActiveDisplayList(0, NULL, &num_screens);
#endif
@@ -1006,6 +1049,8 @@  static int avf_read_header(AVFormatContext *s)

    // Initialize capture session
    ctx->capture_session = [[AVCaptureSession alloc] init];
+    ctx->video_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];
+    ctx->audio_frames    = [[AvdeviceAvfoundationBufferQueue alloc] init];

    if (video_device && add_video_device(s, video_device)) {
        goto fail;
@@ -1088,35 +1133,31 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
    AVFContext* ctx = (AVFContext*)s->priv_data;

    do {
-        CVImageBufferRef image_buffer;
-        CMBlockBufferRef block_buffer;
-        lock_frames(ctx);
-
-        if (ctx->current_frame != nil) {
+        if (1 <= [ctx->video_frames count]) {
            int status;
            int length = 0;
-
-            image_buffer = CMSampleBufferGetImageBuffer(ctx->current_frame);
-            block_buffer = CMSampleBufferGetDataBuffer(ctx->current_frame);
+            CMSampleBufferRef video_frame = [ctx->video_frames dequeue];
+            CVImageBufferRef image_buffer = CMSampleBufferGetImageBuffer(video_frame);;
+            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(video_frame);

            if (image_buffer != nil) {
                length = (int)CVPixelBufferGetDataSize(image_buffer);
            } else if (block_buffer != nil) {
                length = (int)CMBlockBufferGetDataLength(block_buffer);
            } else  {
-                unlock_frames(ctx);
+                CFRelease(video_frame);
                return AVERROR(EINVAL);
            }

            if (av_new_packet(pkt, length) < 0) {
-                unlock_frames(ctx);
+                CFRelease(video_frame);
                return AVERROR(EIO);
            }

            CMItemCount count;
            CMSampleTimingInfo timing_info;

-            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_frame, 1, &timing_info, &count) == noErr) {
+            if (CMSampleBufferGetOutputSampleTimingInfoArray(video_frame, 1, &timing_info, &count) == noErr) {
                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
            }
@@ -1133,15 +1174,14 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
                    status = AVERROR(EIO);
                }
             }
-            CFRelease(ctx->current_frame);
-            ctx->current_frame = nil;
+            CFRelease(video_frame);

            if (status < 0) {
-                unlock_frames(ctx);
                return status;
            }
-        } else if (ctx->current_audio_frame != nil) {
-            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(ctx->current_audio_frame);
+        } else if (1 <= [ctx->audio_frames count]) {
+            CMSampleBufferRef audio_frame = [ctx->audio_frames dequeue];
+            CMBlockBufferRef block_buffer = CMSampleBufferGetDataBuffer(audio_frame);

            size_t input_size = CMBlockBufferGetDataLength(block_buffer);
            int buffer_size = input_size / ctx->audio_buffers;
@@ -1151,12 +1191,12 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            UInt32 size = sizeof(output_size);
            ret = AudioConverterGetProperty(ctx->audio_converter, kAudioConverterPropertyCalculateOutputBufferSize, &size, &output_size);
            if (ret != noErr) {
-                unlock_frames(ctx);
+                CFRelease(audio_frame);
                return AVERROR(EIO);
            }

            if (av_new_packet(pkt, output_size) < 0) {
-                unlock_frames(ctx);
+                CFRelease(audio_frame);
                return AVERROR(EIO);
            }

@@ -1173,7 +1213,7 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)

                    if (ret != kCMBlockBufferNoErr) {
                        av_free(input_buffer);
-                        unlock_frames(ctx);
+                        CFRelease(audio_frame);
                        return AVERROR(EIO);
                    }
                }
@@ -1191,7 +1231,7 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
                av_free(input_buffer);

                if (ret != noErr) {
-                    unlock_frames(ctx);
+                    CFRelease(audio_frame);
                    return AVERROR(EIO);
                }

@@ -1199,7 +1239,7 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            } else {
                 ret = CMBlockBufferCopyDataBytes(block_buffer, 0, pkt->size, pkt->data);
                 if (ret != kCMBlockBufferNoErr) {
-                     unlock_frames(ctx);
+                     CFRelease(audio_frame);
                     return AVERROR(EIO);
                 }
            }
@@ -1207,7 +1247,7 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            CMItemCount count;
            CMSampleTimingInfo timing_info;

-            if (CMSampleBufferGetOutputSampleTimingInfoArray(ctx->current_audio_frame, 1, &timing_info, &count) == noErr) {
+            if (CMSampleBufferGetOutputSampleTimingInfoArray(audio_frame, 1, &timing_info, &count) == noErr) {
                AVRational timebase_q = av_make_q(1, timing_info.presentationTimeStamp.timescale);
                pkt->pts = pkt->dts = av_rescale_q(timing_info.presentationTimeStamp.value, timebase_q, avf_time_base_q);
            }
@@ -1215,21 +1255,15 @@  static int avf_read_packet(AVFormatContext *s, AVPacket *pkt)
            pkt->stream_index  = ctx->audio_stream_index;
            pkt->flags        |= AV_PKT_FLAG_KEY;

-            CFRelease(ctx->current_audio_frame);
-            ctx->current_audio_frame = nil;
-
-            unlock_frames(ctx);
+            CFRelease(audio_frame);
        } else {
            pkt->data = NULL;
-            unlock_frames(ctx);
            if (ctx->observed_quit) {
                return AVERROR_EOF;
            } else {
                return AVERROR(EAGAIN);
            }
        }
-
-        unlock_frames(ctx);
    } while (!pkt->data);

    return 0;