u/sink: Add u_sink_simple_queue

This commit is contained in:
Moses Turner 2022-03-18 19:18:32 -05:00 committed by Mateo de Mayo
parent 662b72e2f6
commit 09da6e09dc
4 changed files with 234 additions and 0 deletions

View file

@ -196,6 +196,7 @@ add_library(
util/u_sink_converter.c util/u_sink_converter.c
util/u_sink_deinterleaver.c util/u_sink_deinterleaver.c
util/u_sink_queue.c util/u_sink_queue.c
util/u_sink_simple_queue.c
util/u_sink_quirk.c util/u_sink_quirk.c
util/u_sink_split.c util/u_sink_split.c
util/u_sink_stereo_sbs_to_slam_sbs.c util/u_sink_stereo_sbs_to_slam_sbs.c

View file

@ -69,6 +69,7 @@ lib_aux_util = static_library(
'util/u_sink_converter.c', 'util/u_sink_converter.c',
'util/u_sink_deinterleaver.c', 'util/u_sink_deinterleaver.c',
'util/u_sink_queue.c', 'util/u_sink_queue.c',
'util/u_sink_simple_queue.c',
'util/u_sink_quirk.c', 'util/u_sink_quirk.c',
'util/u_sink_split.c', 'util/u_sink_split.c',
'util/u_sink_stereo_sbs_to_slam_sbs.c', 'util/u_sink_stereo_sbs_to_slam_sbs.c',

View file

@ -110,6 +110,16 @@ u_sink_queue_create(struct xrt_frame_context *xfctx,
struct xrt_frame_sink *downstream, struct xrt_frame_sink *downstream,
struct xrt_frame_sink **out_xfs); struct xrt_frame_sink **out_xfs);
/*!
* @public @memberof xrt_frame_sink
* @see xrt_frame_context
*/
bool
u_sink_simple_queue_create(struct xrt_frame_context *xfctx,
struct xrt_frame_sink *downstream,
struct xrt_frame_sink **out_xfs);
/*! /*!
* @public @memberof xrt_frame_sink * @public @memberof xrt_frame_sink
* @see xrt_frame_context * @see xrt_frame_context

View file

@ -0,0 +1,222 @@
// Copyright 2019-2021, Collabora, Ltd.
// SPDX-License-Identifier: BSL-1.0
/*!
* @file
* @brief An @ref xrt_frame_sink queue.
* @author Jakob Bornecrantz <jakob@collabora.com>
* @ingroup aux_util
*/
#include "util/u_misc.h"
#include "util/u_sink.h"
#include "util/u_trace_marker.h"
#include <stdio.h>
#include <pthread.h>
/*!
* An @ref xrt_frame_sink queue, any frames received will be pushed to the
* downstream consumer on the queue thread. Will drop frames should multiple
* frames be queued up.
*
* @implements xrt_frame_sink
* @implements xrt_frame_node
*/
struct u_sink_queue
{
//! Base sink.
struct xrt_frame_sink base;
//! For tracking on the frame context.
struct xrt_frame_node node;
//! The consumer of the frames that are queued.
struct xrt_frame_sink *consumer;
//! The current queued frame.
struct xrt_frame *frame;
pthread_t thread;
pthread_mutex_t mutex;
pthread_cond_t cond;
struct
{
uint64_t current;
uint64_t last;
} seq;
//! Should we keep running.
bool running;
};
static void *
queue_mainloop(void *ptr)
{
SINK_TRACE_MARKER();
struct u_sink_queue *q = (struct u_sink_queue *)ptr;
struct xrt_frame *frame = NULL;
pthread_mutex_lock(&q->mutex);
while (q->running) {
// No new frame, wait.
if (q->seq.last >= q->seq.current) {
pthread_cond_wait(&q->cond, &q->mutex);
}
// Where we woken up to turn off.
if (!q->running) {
break;
}
// Just in case.
if (q->seq.last >= q->seq.current || q->frame == NULL) {
continue;
}
SINK_TRACE_IDENT(queue_frame);
// We have a new frame, send it out.
q->seq.last = q->seq.current;
/*
* We need to take a reference on the current frame, this is to
* keep it alive during the call to the consumer should it be
* replaced. But we no longer need to hold onto the frame on the
* queue so we just move the pointer.
*/
frame = q->frame;
q->frame = NULL;
/*
* Unlock the mutex when we do the work, so a new frame can be
* queued.
*/
pthread_mutex_unlock(&q->mutex);
// Send to the consumer that does the work.
q->consumer->push_frame(q->consumer, frame);
/*
* Drop our reference we don't need it anymore, or it's held by
* the consumer.
*/
xrt_frame_reference(&frame, NULL);
// Have to lock it again.
pthread_mutex_lock(&q->mutex);
}
pthread_mutex_unlock(&q->mutex);
return NULL;
}
static void
queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf)
{
SINK_TRACE_MARKER();
struct u_sink_queue *q = (struct u_sink_queue *)xfs;
pthread_mutex_lock(&q->mutex);
// Only schedule new frames if we are running.
if (q->running) {
q->seq.current++;
xrt_frame_reference(&q->frame, xf);
}
// Wake up the thread.
pthread_cond_signal(&q->cond);
pthread_mutex_unlock(&q->mutex);
}
static void
queue_break_apart(struct xrt_frame_node *node)
{
struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
void *retval = NULL;
// The fields are protected.
pthread_mutex_lock(&q->mutex);
// Stop the thread and inhibit any new frames to be added to the queue.
q->running = false;
// Release any frame waiting for submission.
xrt_frame_reference(&q->frame, NULL);
// Wake up the thread.
pthread_cond_signal(&q->cond);
// No longer need to protect fields.
pthread_mutex_unlock(&q->mutex);
// Wait for thread to finish.
pthread_join(q->thread, &retval);
}
static void
queue_destroy(struct xrt_frame_node *node)
{
struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
// Destroy resources.
pthread_mutex_destroy(&q->mutex);
pthread_cond_destroy(&q->cond);
free(q);
}
/*
*
* Exported functions.
*
*/
bool
u_sink_simple_queue_create(struct xrt_frame_context *xfctx,
struct xrt_frame_sink *downstream,
struct xrt_frame_sink **out_xfs)
{
struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue);
int ret = 0;
q->base.push_frame = queue_frame;
q->node.break_apart = queue_break_apart;
q->node.destroy = queue_destroy;
q->consumer = downstream;
q->running = true;
ret = pthread_mutex_init(&q->mutex, NULL);
if (ret != 0) {
free(q);
return false;
}
ret = pthread_cond_init(&q->cond, NULL);
if (ret) {
pthread_mutex_destroy(&q->mutex);
free(q);
return false;
}
ret = pthread_create(&q->thread, NULL, queue_mainloop, q);
if (ret != 0) {
pthread_cond_destroy(&q->cond);
pthread_mutex_destroy(&q->mutex);
free(q);
return false;
}
xrt_frame_context_add(xfctx, &q->node);
*out_xfs = &q->base;
return true;
}