diff --git a/src/xrt/auxiliary/util/u_sink.h b/src/xrt/auxiliary/util/u_sink.h index ade578c78..073fb049b 100644 --- a/src/xrt/auxiliary/util/u_sink.h +++ b/src/xrt/auxiliary/util/u_sink.h @@ -96,6 +96,7 @@ u_sink_deinterleaver_create(struct xrt_frame_context *xfctx, */ bool u_sink_queue_create(struct xrt_frame_context *xfctx, + uint64_t max_size, struct xrt_frame_sink *downstream, struct xrt_frame_sink **out_xfs); diff --git a/src/xrt/auxiliary/util/u_sink_queue.c b/src/xrt/auxiliary/util/u_sink_queue.c index 11151b06d..a51d8f7d9 100644 --- a/src/xrt/auxiliary/util/u_sink_queue.c +++ b/src/xrt/auxiliary/util/u_sink_queue.c @@ -14,6 +14,11 @@ #include #include +struct u_sink_queue_elem +{ + struct xrt_frame *frame; + struct u_sink_queue_elem *next; +}; /*! * An @ref xrt_frame_sink queue, any frames received will be pushed to the @@ -33,23 +38,92 @@ struct u_sink_queue //! The consumer of the frames that are queued. struct xrt_frame_sink *consumer; - //! The current queued frame. - struct xrt_frame *frame; + //! Front of the queue (oldest frame, first to be consumed) + struct u_sink_queue_elem *front; + + //! Back of the queue (newest frame, back->next is always null) + struct u_sink_queue_elem *back; + + //! Number of currently enqueued frames + uint64_t size; + + //! Max amount of frames before dropping new ones. 0 means unbounded. + uint64_t max_size; pthread_t thread; pthread_mutex_t mutex; pthread_cond_t cond; - struct - { - uint64_t current; - uint64_t last; - } seq; - //! Should we keep running. bool running; }; +//! Call with q->mutex locked. +static bool +queue_is_empty(struct u_sink_queue *q) +{ + return q->size == 0; +} + +//! Call with q->mutex locked. +static bool +queue_is_full(struct u_sink_queue *q) +{ + bool is_unbounded = q->max_size == 0; + return q->size >= q->max_size && !is_unbounded; +} + +//! Pops the oldest frame, reference counting unchanged. +//! Call with q->mutex locked. +static struct xrt_frame * +queue_pop(struct u_sink_queue *q) +{ + assert(!queue_is_empty(q)); + struct xrt_frame *frame = q->front->frame; + struct u_sink_queue_elem *old_front = q->front; + q->front = q->front->next; + free(old_front); + q->size--; + if (q->front == NULL) { + assert(queue_is_empty(q)); + q->back = NULL; + } + return frame; +} + +//! Tries to push a frame and increases its reference count. +//! Call with q->mutex locked. +static bool +queue_try_refpush(struct u_sink_queue *q, struct xrt_frame *xf) +{ + if (!queue_is_full(q)) { + struct u_sink_queue_elem *elem = U_TYPED_CALLOC(struct u_sink_queue_elem); + xrt_frame_reference(&elem->frame, xf); + elem->next = NULL; + if (q->back == NULL) { // First frame + q->front = elem; + } else { // Next frame + q->back->next = elem; + } + q->back = elem; + q->size++; + return true; + } + return false; +} + +//! Clears the queue and unreferences all of its frames. +//! Call with q->mutex locked. +static void +queue_refclear(struct u_sink_queue *q) +{ + while (!queue_is_empty(q)) { + assert((q->size > 1) ^ (q->front == q->back)); + struct xrt_frame *xf = queue_pop(q); + xrt_frame_reference(&xf, NULL); + } +} + static void * queue_mainloop(void *ptr) { @@ -63,7 +137,7 @@ queue_mainloop(void *ptr) while (q->running) { // No new frame, wait. - if (q->seq.last >= q->seq.current) { + if (queue_is_empty(q)) { pthread_cond_wait(&q->cond, &q->mutex); } @@ -73,23 +147,20 @@ queue_mainloop(void *ptr) } // Just in case. - if (q->seq.last >= q->seq.current || q->frame == NULL) { + if (queue_is_empty(q)) { continue; } SINK_TRACE_IDENT(queue_frame); - // We have a new frame, send it out. - q->seq.last = q->seq.current; - /* + * Dequeue frame. * We need to take a reference on the current frame, this is to * keep it alive during the call to the consumer should it be * replaced. But we no longer need to hold onto the frame on the - * queue so we just move the pointer. + * queue so we just dequeue it. */ - frame = q->frame; - q->frame = NULL; + frame = queue_pop(q); /* * Unlock the mutex when we do the work, so a new frame can be @@ -126,8 +197,7 @@ queue_frame(struct xrt_frame_sink *xfs, struct xrt_frame *xf) // Only schedule new frames if we are running. if (q->running) { - q->seq.current++; - xrt_frame_reference(&q->frame, xf); + queue_try_refpush(q, xf); } // Wake up the thread. @@ -149,7 +219,7 @@ queue_break_apart(struct xrt_frame_node *node) q->running = false; // Release any frame waiting for submission. - xrt_frame_reference(&q->frame, NULL); + queue_refclear(q); // Wake up the thread. pthread_cond_signal(&q->cond); @@ -180,7 +250,10 @@ queue_destroy(struct xrt_frame_node *node) */ bool -u_sink_queue_create(struct xrt_frame_context *xfctx, struct xrt_frame_sink *downstream, struct xrt_frame_sink **out_xfs) +u_sink_queue_create(struct xrt_frame_context *xfctx, + uint64_t max_size, + struct xrt_frame_sink *downstream, + struct xrt_frame_sink **out_xfs) { struct u_sink_queue *q = U_TYPED_CALLOC(struct u_sink_queue); int ret = 0; @@ -191,6 +264,9 @@ u_sink_queue_create(struct xrt_frame_context *xfctx, struct xrt_frame_sink *down q->consumer = downstream; q->running = true; + q->size = 0; + q->max_size = max_size; + ret = pthread_mutex_init(&q->mutex, NULL); if (ret != 0) { free(q); diff --git a/src/xrt/drivers/ht/ht_driver.cpp b/src/xrt/drivers/ht/ht_driver.cpp index 26a08ede4..46af07cbd 100644 --- a/src/xrt/drivers/ht/ht_driver.cpp +++ b/src/xrt/drivers/ht/ht_driver.cpp @@ -714,7 +714,7 @@ ht_device_create(struct xrt_prober *xp, struct t_stereo_camera_calibration *cali // This puts u_sink_create_to_r8g8b8_or_l8 on its own thread, so that nothing gets backed up if it runs slower // than the native camera framerate. - u_sink_queue_create(&htd->camera.xfctx, tmp, &tmp); + u_sink_queue_create(&htd->camera.xfctx, 1, tmp, &tmp); // Converts images (we'd expect YUV422 or MJPEG) to R8G8B8. Can take a long time, especially on unoptimized // builds. If it's really slow, triple-check that you built Monado with optimizations! @@ -722,7 +722,7 @@ ht_device_create(struct xrt_prober *xp, struct t_stereo_camera_calibration *cali // Puts the hand tracking code on its own thread, so that nothing upstream of it gets backed up if the hand // tracking code runs slower than the upstream framerate. - u_sink_queue_create(&htd->camera.xfctx, tmp, &tmp); + u_sink_queue_create(&htd->camera.xfctx, 1, tmp, &tmp); xrt_fs_mode *modes; uint32_t count; diff --git a/src/xrt/state_trackers/gui/gui_scene_calibrate.c b/src/xrt/state_trackers/gui/gui_scene_calibrate.c index 38a13c321..f0479ba00 100644 --- a/src/xrt/state_trackers/gui/gui_scene_calibrate.c +++ b/src/xrt/state_trackers/gui/gui_scene_calibrate.c @@ -320,16 +320,16 @@ scene_render_select(struct gui_scene *scene, struct gui_program *p) p->texs[p->num_texs++] = gui_ogl_sink_create("Calibration", cs->xfctx, &rgb); u_sink_create_to_r8g8b8_or_l8(cs->xfctx, rgb, &rgb); - u_sink_queue_create(cs->xfctx, rgb, &rgb); + u_sink_queue_create(cs->xfctx, 1, rgb, &rgb); p->texs[p->num_texs++] = gui_ogl_sink_create("Raw", cs->xfctx, &raw); u_sink_create_to_r8g8b8_or_l8(cs->xfctx, raw, &raw); - u_sink_queue_create(cs->xfctx, raw, &raw); + u_sink_queue_create(cs->xfctx, 1, raw, &raw); t_calibration_stereo_create(cs->xfctx, &cs->params, &cs->status, rgb, &cali); u_sink_split_create(cs->xfctx, raw, cali, &cali); u_sink_deinterleaver_create(cs->xfctx, cali, &cali); - u_sink_queue_create(cs->xfctx, cali, &cali); + u_sink_queue_create(cs->xfctx, 1, cali, &cali); // Just after the camera create a quirk stream. struct u_sink_quirk_params qp; diff --git a/src/xrt/state_trackers/gui/gui_window_record.c b/src/xrt/state_trackers/gui/gui_window_record.c index f54a5e38b..e4649fd23 100644 --- a/src/xrt/state_trackers/gui/gui_window_record.c +++ b/src/xrt/state_trackers/gui/gui_window_record.c @@ -123,7 +123,7 @@ create_pipeline(struct gui_record_window *rw) if (do_convert) { u_sink_create_to_r8g8b8_or_l8(&rw->gst.xfctx, tmp, &tmp); } - u_sink_queue_create(&rw->gst.xfctx, tmp, &tmp); + u_sink_queue_create(&rw->gst.xfctx, 1, tmp, &tmp); os_mutex_lock(&rw->gst.mutex); rw->gst.gs = gs; @@ -273,7 +273,7 @@ gui_window_record_init(struct gui_record_window *rw) struct xrt_frame_sink *tmp = NULL; rw->texture.ogl = gui_ogl_sink_create("View", &rw->texture.xfctx, &tmp); u_sink_create_to_r8g8b8_or_l8(&rw->texture.xfctx, tmp, &tmp); - u_sink_queue_create(&rw->texture.xfctx, tmp, &rw->texture.sink); + u_sink_queue_create(&rw->texture.xfctx, 1, tmp, &rw->texture.sink); return true; } diff --git a/src/xrt/state_trackers/prober/p_tracking.c b/src/xrt/state_trackers/prober/p_tracking.c index bed819a66..49175452f 100644 --- a/src/xrt/state_trackers/prober/p_tracking.c +++ b/src/xrt/state_trackers/prober/p_tracking.c @@ -200,7 +200,7 @@ p_factory_ensure_frameserver(struct p_factory *fact) u_sink_create_to_yuv_or_yuyv(&fact->xfctx, xsink, &xsink); // Put a queue before it to multi-thread the filter. - u_sink_queue_create(&fact->xfctx, xsink, &xsink); + u_sink_queue_create(&fact->xfctx, 1, xsink, &xsink); struct xrt_frame_sink *ht_sink = NULL; t_hand_create(&fact->xfctx, fact->data, &fact->xth, &ht_sink);