u/sink: Allow u_sink_queue to queue more than one frame

Passing max_size=1 gives a similar behaviour as before, although instead
of discarding the last enqueued frame, it just ignores the new frame.
This commit is contained in:
Mateo de Mayo 2021-11-29 16:15:57 -03:00 committed by Jakob Bornecrantz
parent ba7f9d9578
commit e245f4e5a9
6 changed files with 105 additions and 28 deletions

View file

@ -96,6 +96,7 @@ u_sink_deinterleaver_create(struct xrt_frame_context *xfctx,
*/ */
bool bool
u_sink_queue_create(struct xrt_frame_context *xfctx, u_sink_queue_create(struct xrt_frame_context *xfctx,
uint64_t max_size,
struct xrt_frame_sink *downstream, struct xrt_frame_sink *downstream,
struct xrt_frame_sink **out_xfs); struct xrt_frame_sink **out_xfs);

View file

@ -14,6 +14,11 @@
#include <stdio.h> #include <stdio.h>
#include <pthread.h> #include <pthread.h>
struct u_sink_queue_elem
{
struct xrt_frame *frame;
struct u_sink_queue_elem *next;
};
/*! /*!
* An @ref xrt_frame_sink queue, any frames received will be pushed to the * An @ref xrt_frame_sink queue, any frames received will be pushed to the
@ -33,23 +38,92 @@ struct u_sink_queue
//! The consumer of the frames that are queued. //! The consumer of the frames that are queued.
struct xrt_frame_sink *consumer; struct xrt_frame_sink *consumer;
//! The current queued frame. //! Front of the queue (oldest frame, first to be consumed)
struct xrt_frame *frame; struct u_sink_queue_elem *front;
//! Back of the queue (newest frame, back->next is always null)
struct u_sink_queue_elem *back;
//! Number of currently enqueued frames
uint64_t size;
//! Max amount of frames before dropping new ones. 0 means unbounded.
uint64_t max_size;
pthread_t thread; pthread_t thread;
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cond; pthread_cond_t cond;
struct
{
uint64_t current;
uint64_t last;
} seq;
//! Should we keep running. //! Should we keep running.
bool running; bool running;
}; };
//! Call with q->mutex locked.
static bool
queue_is_empty(struct u_sink_queue *q)
{
return q->size == 0;
}
//! Call with q->mutex locked.
static bool
queue_is_full(struct u_sink_queue *q)
{
bool is_unbounded = q->max_size == 0;
return q->size >= q->max_size && !is_unbounded;
}
//! Pops the oldest frame, reference counting unchanged.
//! Call with q->mutex locked.
static struct xrt_frame *
queue_pop(struct u_sink_queue *q)
{
assert(!queue_is_empty(q));
struct xrt_frame *frame = q->front->frame;
struct u_sink_queue_elem *old_front = q->front;
q->front = q->front->next;
free(old_front);
q->size--;
if (q->front == NULL) {
assert(queue_is_empty(q));
q->back = NULL;
}
return frame;
}
//! Tries to push a frame and increases its reference count.
//! Call with q->mutex locked.
static bool
queue_try_refpush(struct u_sink_queue *q, struct xrt_frame *xf)
{
if (!queue_is_full(q)) {
struct u_sink_queue_elem *elem = U_TYPED_CALLOC(struct u_sink_queue_elem);
xrt_frame_reference(&elem->frame, xf);
elem->next = NULL;
if (q->back == NULL) { // First frame
q->front = elem;
} else { // Next frame
q->back->next = elem;
}
q->back = elem;
q->size++;
return true;
}
return false;
}
//! Clears the queue and unreferences all of its frames.
//! Call with q->mutex locked.
static void
queue_refclear(struct u_sink_queue *q)
{
while (!queue_is_empty(q)) {
assert((q->size > 1) ^ (q->front == q->back));
struct xrt_frame *xf = queue_pop(q);
xrt_frame_reference(&xf, NULL);
}
}
static void * static void *
queue_mainloop(void *ptr) queue_mainloop(void *ptr)
{ {
@ -63,7 +137,7 @@ queue_mainloop(void *ptr)
while (q->running) { while (q->running) {
// No new frame, wait. // No new frame, wait.
if (q->seq.last >= q->seq.current) { if (queue_is_empty(q)) {
pthread_cond_wait(&q->cond, &q->mutex); pthread_cond_wait(&q->cond, &q->mutex);
} }
@ -73,23 +147,20 @@ queue_mainloop(void *ptr)
} }
// Just in case. // Just in case.
if (q->seq.last >= q->seq.current || q->frame == NULL) { if (queue_is_empty(q)) {
continue; continue;
} }
SINK_TRACE_IDENT(queue_frame); SINK_TRACE_IDENT(queue_frame);
// We have a new frame, send it out.
q->seq.last = q->seq.current;
/* /*
* Dequeue frame.
* We need to take a reference on the current frame, this is to * We need to take a reference on the current frame, this is to
* keep it alive during the call to the consumer should it be * keep it alive during the call to the consumer should it be
* replaced. But we no longer need to hold onto the frame on the * replaced. But we no longer need to hold onto the frame on the
* queue so we just move the pointer. * queue so we just dequeue it.
*/ */
frame = q->frame; frame = queue_pop(q);
q->frame = NULL;
/* /*
* Unlock the mutex when we do the work, so a new frame can be * Unlock the mutex when we do the work, so a new frame can be
@ -126,8 +197,7 @@ queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf)
// Only schedule new frames if we are running. // Only schedule new frames if we are running.
if (q->running) { if (q->running) {
q->seq.current++; queue_try_refpush(q, xf);
xrt_frame_reference(&q->frame, xf);
} }
// Wake up the thread. // Wake up the thread.
@ -149,7 +219,7 @@ queue_break_apart(struct xrt_frame_node *node)
q->running = false; q->running = false;
// Release any frame waiting for submission. // Release any frame waiting for submission.
xrt_frame_reference(&q->frame, NULL); queue_refclear(q);
// Wake up the thread. // Wake up the thread.
pthread_cond_signal(&q->cond); pthread_cond_signal(&q->cond);
@ -180,7 +250,10 @@ queue_destroy(struct xrt_frame_node *node)
*/ */
bool bool
u_sink_queue_create(struct xrt_frame_context *xfctx, struct xrt_frame_sink *downstream, struct xrt_frame_sink **out_xfs) u_sink_queue_create(struct xrt_frame_context *xfctx,
uint64_t max_size,
struct xrt_frame_sink *downstream,
struct xrt_frame_sink **out_xfs)
{ {
struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue); struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue);
int ret = 0; int ret = 0;
@ -191,6 +264,9 @@ u_sink_queue_create(struct xrt_frame_context *xfctx, struct xrt_frame_sink *down
q->consumer = downstream; q->consumer = downstream;
q->running = true; q->running = true;
q->size = 0;
q->max_size = max_size;
ret = pthread_mutex_init(&q->mutex, NULL); ret = pthread_mutex_init(&q->mutex, NULL);
if (ret != 0) { if (ret != 0) {
free(q); free(q);

View file

@ -714,7 +714,7 @@ ht_device_create(struct xrt_prober *xp, struct t_stereo_camera_calibration *cali
// This puts u_sink_create_to_r8g8b8_or_l8 on its own thread, so that nothing gets backed up if it runs slower // This puts u_sink_create_to_r8g8b8_or_l8 on its own thread, so that nothing gets backed up if it runs slower
// than the native camera framerate. // than the native camera framerate.
u_sink_queue_create(&htd->camera.xfctx, tmp, &tmp); u_sink_queue_create(&htd->camera.xfctx, 1, tmp, &tmp);
// Converts images (we'd expect YUV422 or MJPEG) to R8G8B8. Can take a long time, especially on unoptimized // Converts images (we'd expect YUV422 or MJPEG) to R8G8B8. Can take a long time, especially on unoptimized
// builds. If it's really slow, triple-check that you built Monado with optimizations! // builds. If it's really slow, triple-check that you built Monado with optimizations!
@ -722,7 +722,7 @@ ht_device_create(struct xrt_prober *xp, struct t_stereo_camera_calibration *cali
// Puts the hand tracking code on its own thread, so that nothing upstream of it gets backed up if the hand // Puts the hand tracking code on its own thread, so that nothing upstream of it gets backed up if the hand
// tracking code runs slower than the upstream framerate. // tracking code runs slower than the upstream framerate.
u_sink_queue_create(&htd->camera.xfctx, tmp, &tmp); u_sink_queue_create(&htd->camera.xfctx, 1, tmp, &tmp);
xrt_fs_mode *modes; xrt_fs_mode *modes;
uint32_t count; uint32_t count;

View file

@ -320,16 +320,16 @@ scene_render_select(struct gui_scene *scene, struct gui_program *p)
p->texs[p->num_texs++] = gui_ogl_sink_create("Calibration", cs->xfctx, &rgb); p->texs[p->num_texs++] = gui_ogl_sink_create("Calibration", cs->xfctx, &rgb);
u_sink_create_to_r8g8b8_or_l8(cs->xfctx, rgb, &rgb); u_sink_create_to_r8g8b8_or_l8(cs->xfctx, rgb, &rgb);
u_sink_queue_create(cs->xfctx, rgb, &rgb); u_sink_queue_create(cs->xfctx, 1, rgb, &rgb);
p->texs[p->num_texs++] = gui_ogl_sink_create("Raw", cs->xfctx, &raw); p->texs[p->num_texs++] = gui_ogl_sink_create("Raw", cs->xfctx, &raw);
u_sink_create_to_r8g8b8_or_l8(cs->xfctx, raw, &raw); u_sink_create_to_r8g8b8_or_l8(cs->xfctx, raw, &raw);
u_sink_queue_create(cs->xfctx, raw, &raw); u_sink_queue_create(cs->xfctx, 1, raw, &raw);
t_calibration_stereo_create(cs->xfctx, &cs->params, &cs->status, rgb, &cali); t_calibration_stereo_create(cs->xfctx, &cs->params, &cs->status, rgb, &cali);
u_sink_split_create(cs->xfctx, raw, cali, &cali); u_sink_split_create(cs->xfctx, raw, cali, &cali);
u_sink_deinterleaver_create(cs->xfctx, cali, &cali); u_sink_deinterleaver_create(cs->xfctx, cali, &cali);
u_sink_queue_create(cs->xfctx, cali, &cali); u_sink_queue_create(cs->xfctx, 1, cali, &cali);
// Just after the camera create a quirk stream. // Just after the camera create a quirk stream.
struct u_sink_quirk_params qp; struct u_sink_quirk_params qp;

View file

@ -123,7 +123,7 @@ create_pipeline(struct gui_record_window *rw)
if (do_convert) { if (do_convert) {
u_sink_create_to_r8g8b8_or_l8(&rw->gst.xfctx, tmp, &tmp); u_sink_create_to_r8g8b8_or_l8(&rw->gst.xfctx, tmp, &tmp);
} }
u_sink_queue_create(&rw->gst.xfctx, tmp, &tmp); u_sink_queue_create(&rw->gst.xfctx, 1, tmp, &tmp);
os_mutex_lock(&rw->gst.mutex); os_mutex_lock(&rw->gst.mutex);
rw->gst.gs = gs; rw->gst.gs = gs;
@ -273,7 +273,7 @@ gui_window_record_init(struct gui_record_window *rw)
struct xrt_frame_sink *tmp = NULL; struct xrt_frame_sink *tmp = NULL;
rw->texture.ogl = gui_ogl_sink_create("View", &rw->texture.xfctx, &tmp); rw->texture.ogl = gui_ogl_sink_create("View", &rw->texture.xfctx, &tmp);
u_sink_create_to_r8g8b8_or_l8(&rw->texture.xfctx, tmp, &tmp); u_sink_create_to_r8g8b8_or_l8(&rw->texture.xfctx, tmp, &tmp);
u_sink_queue_create(&rw->texture.xfctx, tmp, &rw->texture.sink); u_sink_queue_create(&rw->texture.xfctx, 1, tmp, &rw->texture.sink);
return true; return true;
} }

View file

@ -200,7 +200,7 @@ p_factory_ensure_frameserver(struct p_factory *fact)
u_sink_create_to_yuv_or_yuyv(&fact->xfctx, xsink, &xsink); u_sink_create_to_yuv_or_yuyv(&fact->xfctx, xsink, &xsink);
// Put a queue before it to multi-thread the filter. // Put a queue before it to multi-thread the filter.
u_sink_queue_create(&fact->xfctx, xsink, &xsink); u_sink_queue_create(&fact->xfctx, 1, xsink, &xsink);
struct xrt_frame_sink *ht_sink = NULL; struct xrt_frame_sink *ht_sink = NULL;
t_hand_create(&fact->xfctx, fact->data, &fact->xth, &ht_sink); t_hand_create(&fact->xfctx, fact->data, &fact->xth, &ht_sink);