mirror of
https://gitlab.freedesktop.org/monado/monado.git
synced 2025-01-24 15:41:55 +00:00
299 lines
6.2 KiB
C
299 lines
6.2 KiB
C
// Copyright 2019-2021, Collabora, Ltd.
|
|
// SPDX-License-Identifier: BSL-1.0
|
|
/*!
|
|
* @file
|
|
* @brief An @ref xrt_frame_sink queue.
|
|
* @author Jakob Bornecrantz <jakob@collabora.com>
|
|
* @ingroup aux_util
|
|
*/
|
|
|
|
#include "util/u_misc.h"
|
|
#include "util/u_sink.h"
|
|
#include "util/u_trace_marker.h"
|
|
|
|
#include <stdio.h>
|
|
#include <pthread.h>
|
|
|
|
struct u_sink_queue_elem
|
|
{
|
|
struct xrt_frame *frame;
|
|
struct u_sink_queue_elem *next;
|
|
};
|
|
|
|
/*!
|
|
* An @ref xrt_frame_sink queue, any frames received will be pushed to the
|
|
* downstream consumer on the queue thread. Will drop frames should multiple
|
|
* frames be queued up.
|
|
*
|
|
* @implements xrt_frame_sink
|
|
* @implements xrt_frame_node
|
|
*/
|
|
struct u_sink_queue
|
|
{
|
|
//! Base sink.
|
|
struct xrt_frame_sink base;
|
|
//! For tracking on the frame context.
|
|
struct xrt_frame_node node;
|
|
|
|
//! The consumer of the frames that are queued.
|
|
struct xrt_frame_sink *consumer;
|
|
|
|
//! Front of the queue (oldest frame, first to be consumed)
|
|
struct u_sink_queue_elem *front;
|
|
|
|
//! Back of the queue (newest frame, back->next is always null)
|
|
struct u_sink_queue_elem *back;
|
|
|
|
//! Number of currently enqueued frames
|
|
uint64_t size;
|
|
|
|
//! Max amount of frames before dropping new ones. 0 means unbounded.
|
|
uint64_t max_size;
|
|
|
|
pthread_t thread;
|
|
pthread_mutex_t mutex;
|
|
|
|
//! So we can wake the mainloop up
|
|
pthread_cond_t cond;
|
|
|
|
//! Should we keep running.
|
|
bool running;
|
|
};
|
|
|
|
//! Call with q->mutex locked.
|
|
static bool
|
|
queue_is_empty(struct u_sink_queue *q)
|
|
{
|
|
return q->size == 0;
|
|
}
|
|
|
|
//! Call with q->mutex locked.
|
|
static bool
|
|
queue_is_full(struct u_sink_queue *q)
|
|
{
|
|
bool is_unbounded = q->max_size == 0;
|
|
return q->size >= q->max_size && !is_unbounded;
|
|
}
|
|
|
|
//! Pops the oldest frame, reference counting unchanged.
|
|
//! Call with q->mutex locked.
|
|
static struct xrt_frame *
|
|
queue_pop(struct u_sink_queue *q)
|
|
{
|
|
assert(!queue_is_empty(q));
|
|
struct xrt_frame *frame = q->front->frame;
|
|
struct u_sink_queue_elem *old_front = q->front;
|
|
q->front = q->front->next;
|
|
free(old_front);
|
|
q->size--;
|
|
if (q->front == NULL) {
|
|
assert(queue_is_empty(q));
|
|
q->back = NULL;
|
|
}
|
|
return frame;
|
|
}
|
|
|
|
//! Tries to push a frame and increases its reference count.
|
|
//! Call with q->mutex locked.
|
|
static bool
|
|
queue_try_refpush(struct u_sink_queue *q, struct xrt_frame *xf)
|
|
{
|
|
if (queue_is_full(q)) {
|
|
return false;
|
|
}
|
|
struct u_sink_queue_elem *elem = U_TYPED_CALLOC(struct u_sink_queue_elem);
|
|
xrt_frame_reference(&elem->frame, xf);
|
|
elem->next = NULL;
|
|
if (q->back == NULL) { // First frame
|
|
q->front = elem;
|
|
} else { // Next frame
|
|
q->back->next = elem;
|
|
}
|
|
q->back = elem;
|
|
q->size++;
|
|
return true;
|
|
}
|
|
|
|
//! Clears the queue and unreferences all of its frames.
|
|
//! Call with q->mutex locked.
|
|
static void
|
|
queue_refclear(struct u_sink_queue *q)
|
|
{
|
|
while (!queue_is_empty(q)) {
|
|
assert((q->size > 1) ^ (q->front == q->back));
|
|
struct xrt_frame *xf = queue_pop(q);
|
|
xrt_frame_reference(&xf, NULL);
|
|
}
|
|
}
|
|
|
|
static void *
|
|
queue_mainloop(void *ptr)
|
|
{
|
|
SINK_TRACE_MARKER();
|
|
|
|
struct u_sink_queue *q = (struct u_sink_queue *)ptr;
|
|
struct xrt_frame *frame = NULL;
|
|
|
|
pthread_mutex_lock(&q->mutex);
|
|
|
|
while (q->running) {
|
|
|
|
// No new frame, wait.
|
|
if (queue_is_empty(q)) {
|
|
pthread_cond_wait(&q->cond, &q->mutex);
|
|
}
|
|
|
|
// In this case, queue_break_apart woke us up to turn us off.
|
|
if (!q->running) {
|
|
break;
|
|
}
|
|
|
|
// Just in case.
|
|
if (queue_is_empty(q)) {
|
|
continue;
|
|
}
|
|
|
|
SINK_TRACE_IDENT(queue_frame);
|
|
|
|
/*
|
|
* Dequeue frame.
|
|
* We need to take a reference on the current frame, this is to
|
|
* keep it alive during the call to the consumer should it be
|
|
* replaced. But we no longer need to hold onto the frame on the
|
|
* queue so we just dequeue it.
|
|
*/
|
|
frame = queue_pop(q);
|
|
|
|
/*
|
|
* Unlock the mutex when we do the work, so a new frame can be
|
|
* queued.
|
|
*/
|
|
pthread_mutex_unlock(&q->mutex);
|
|
|
|
// Send to the consumer that does the work.
|
|
q->consumer->push_frame(q->consumer, frame);
|
|
|
|
/*
|
|
* Drop our reference we don't need it anymore, or it's held by
|
|
* the consumer.
|
|
*/
|
|
xrt_frame_reference(&frame, NULL);
|
|
|
|
// Have to lock it again.
|
|
pthread_mutex_lock(&q->mutex);
|
|
}
|
|
|
|
pthread_mutex_unlock(&q->mutex);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void
|
|
queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf)
|
|
{
|
|
SINK_TRACE_MARKER();
|
|
|
|
struct u_sink_queue *q = (struct u_sink_queue *)xfs;
|
|
|
|
pthread_mutex_lock(&q->mutex);
|
|
|
|
// Only schedule new frames if we are running.
|
|
if (q->running) {
|
|
queue_try_refpush(q, xf);
|
|
}
|
|
|
|
// Wake up the thread.
|
|
pthread_cond_signal(&q->cond);
|
|
|
|
pthread_mutex_unlock(&q->mutex);
|
|
}
|
|
|
|
static void
|
|
queue_break_apart(struct xrt_frame_node *node)
|
|
{
|
|
struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
|
|
void *retval = NULL;
|
|
|
|
// The fields are protected.
|
|
pthread_mutex_lock(&q->mutex);
|
|
|
|
// Stop the thread and inhibit any new frames to be added to the queue.
|
|
q->running = false;
|
|
|
|
// Release any frame waiting for submission.
|
|
queue_refclear(q);
|
|
|
|
// Wake up the thread.
|
|
pthread_cond_signal(&q->cond);
|
|
|
|
// No longer need to protect fields.
|
|
pthread_mutex_unlock(&q->mutex);
|
|
|
|
// Wait for thread to finish.
|
|
pthread_join(q->thread, &retval);
|
|
}
|
|
|
|
static void
|
|
queue_destroy(struct xrt_frame_node *node)
|
|
{
|
|
struct u_sink_queue *q = container_of(node, struct u_sink_queue, node);
|
|
|
|
// Destroy resources.
|
|
pthread_mutex_destroy(&q->mutex);
|
|
pthread_cond_destroy(&q->cond);
|
|
free(q);
|
|
}
|
|
|
|
|
|
/*
|
|
*
|
|
* Exported functions.
|
|
*
|
|
*/
|
|
|
|
bool
|
|
u_sink_queue_create(struct xrt_frame_context *xfctx,
|
|
uint64_t max_size,
|
|
struct xrt_frame_sink *downstream,
|
|
struct xrt_frame_sink **out_xfs)
|
|
{
|
|
struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue);
|
|
int ret = 0;
|
|
|
|
q->base.push_frame = queue_frame;
|
|
q->node.break_apart = queue_break_apart;
|
|
q->node.destroy = queue_destroy;
|
|
q->consumer = downstream;
|
|
q->running = true;
|
|
|
|
q->size = 0;
|
|
q->max_size = max_size;
|
|
|
|
ret = pthread_mutex_init(&q->mutex, NULL);
|
|
if (ret != 0) {
|
|
free(q);
|
|
return false;
|
|
}
|
|
|
|
ret = pthread_cond_init(&q->cond, NULL);
|
|
if (ret) {
|
|
pthread_mutex_destroy(&q->mutex);
|
|
free(q);
|
|
return false;
|
|
}
|
|
|
|
ret = pthread_create(&q->thread, NULL, queue_mainloop, q);
|
|
if (ret != 0) {
|
|
pthread_cond_destroy(&q->cond);
|
|
pthread_mutex_destroy(&q->mutex);
|
|
free(q);
|
|
return false;
|
|
}
|
|
|
|
xrt_frame_context_add(xfctx, &q->node);
|
|
|
|
*out_xfs = &q->base;
|
|
|
|
return true;
|
|
}
|