diff --git a/src/xrt/auxiliary/CMakeLists.txt b/src/xrt/auxiliary/CMakeLists.txt index 3cb125c0c..d550b227e 100644 --- a/src/xrt/auxiliary/CMakeLists.txt +++ b/src/xrt/auxiliary/CMakeLists.txt @@ -214,6 +214,10 @@ add_library( util/u_verify.h util/u_process.c util/u_process.h + util/u_worker.c + util/u_worker.cpp + util/u_worker.h + util/u_worker.hpp "${CMAKE_CURRENT_BINARY_DIR}/u_git_tag.c" ) diff --git a/src/xrt/auxiliary/meson.build b/src/xrt/auxiliary/meson.build index f6525520b..bcdced939 100644 --- a/src/xrt/auxiliary/meson.build +++ b/src/xrt/auxiliary/meson.build @@ -87,6 +87,10 @@ lib_aux_util = static_library( 'util/u_verify.h', 'util/u_process.c', 'util/u_process.h', + 'util/u_worker.c', + 'util/u_worker.cpp', + 'util/u_worker.h', + 'util/u_worker.hpp', ) + [ u_git_tag_c, ], diff --git a/src/xrt/auxiliary/util/u_worker.c b/src/xrt/auxiliary/util/u_worker.c new file mode 100644 index 000000000..440037f5d --- /dev/null +++ b/src/xrt/auxiliary/util/u_worker.c @@ -0,0 +1,534 @@ +// Copyright 2022, Collabora, Ltd. +// SPDX-License-Identifier: BSL-1.0 +/*! + * @file + * @brief Simple worker pool. + * @author Jakob Bornecrantz + * + * @ingroup aux_util + */ + +#include "os/os_threading.h" + +#include "util/u_logging.h" +#include "util/u_worker.h" +#include "util/u_trace_marker.h" + + +#define MAX_TASK_COUNT (64) +#define MAX_THREAD_COUNT (16) + +struct group; +struct pool; + +struct task +{ + //! Group this task was submitted from. + struct group *g; + + //! Function. + u_worker_group_func_t func; + + //! Function data. + void *data; +}; + +struct thread +{ + //! Pool this thread belongs to. + struct pool *p; + + // Native thread. + struct os_thread thread; +}; + +struct pool +{ + struct u_worker_thread_pool base; + + //! Big contenious mutex. + struct os_mutex mutex; + + //! Array of tasks. + struct task tasks[MAX_TASK_COUNT]; + + //! Number of tasks in array. + size_t tasks_in_array_count; + + struct + { + size_t count; + struct os_cond cond; + } available; //!< For worker threads. + + //! Given at creation. + uint32_t initial_worker_limit; + + //! Currently the number of works that can work, waiting increases this. + uint32_t worker_limit; + + //! Number of threads working on tasks. + size_t working_count; + + //! Number of created threads. + size_t thread_count; + + //! The worker threads. + struct thread threads[MAX_THREAD_COUNT]; + + //! Is the pool up and running? + bool running; +}; + +struct group +{ + //! Base struct has to come first. + struct u_worker_group base; + + //! Pointer to poll of threads. + struct u_worker_thread_pool *uwtp; + + //! Number of tasks that is pending or being worked on in this group. + size_t current_submitted_tasks_count; + + //! Number of threads that have been released or newly entered wait. + size_t released_count; + + struct + { + size_t count; + struct os_cond cond; + } waiting; //!< For wait_all +}; + + +/* + * + * Helper functions. + * + */ + +static inline struct group * +group(struct u_worker_group *uwp) +{ + return (struct group *)uwp; +} + +static inline struct pool * +pool(struct u_worker_thread_pool *uwtp) +{ + return (struct pool *)uwtp; +} + + +/* + * + * Internal pool functions. + * + */ + +static void +locked_pool_pop_task(struct pool *p, struct task *out_task) +{ + assert(p->tasks_in_array_count > 0); + + for (size_t i = 0; i < MAX_TASK_COUNT; i++) { + if (p->tasks[i].func == NULL) { + continue; + } + + *out_task = p->tasks[i]; + p->tasks[i] = (struct task){NULL, NULL, NULL}; + p->tasks_in_array_count--; + return; + } + + assert(false); +} + +static void +locked_pool_push_task(struct pool *p, struct group *g, u_worker_group_func_t func, void *data) +{ + assert(p->tasks_in_array_count < MAX_TASK_COUNT); + + for (size_t i = 0; i < MAX_TASK_COUNT; i++) { + if (p->tasks[i].func != NULL) { + continue; + } + + p->tasks[i] = (struct task){g, func, data}; + p->tasks_in_array_count++; + g->current_submitted_tasks_count++; + return; + } + + assert(false); +} + +static void +locked_pool_wake_worker_if_allowed(struct pool *p) +{ + // No tasks in array, don't wake any thread. + if (p->tasks_in_array_count == 0) { + return; + } + + // The number of working threads is at the limit. + if (p->working_count >= p->worker_limit) { + return; + } + + // No waiting thread. + if (p->available.count == 0) { + //! @todo Is this a error? + return; + } + + os_cond_signal(&p->available.cond); +} + + +/* + * + * Thread group functions. + * + */ + +static bool +locked_group_should_enter_wait_loop(struct pool *p, struct group *g) +{ + if (g->current_submitted_tasks_count == 0) { + return false; + } + + // Enter the loop as a released thread. + g->released_count++; + + return true; +} + +static bool +locked_group_should_wait(struct pool *p, struct group *g) +{ + /* + * There are several cases that needs to be covered by this function. + * + * A thread is entering the wait_all function for the first time, and + * work is outstanding what we should do then is increase the worker + * limit and wait on the conditional. + * + * Similar to above, we where woken up, there are more work outstanding + * on the group and we had been released, remove one released and up the + * worker limit, then wait on the conditional. + * + * A thread (or more) has been woken up and no new tasks has been + * submitted, then break out of the loop and decrement the released + * count. + * + * As above, but we where one of many woken up but only one thread had + * been released and that released count had been taken, then we should + * do nothing and wait again. + */ + + // Tasks available. + if (g->current_submitted_tasks_count > 0) { + + // We have been released or newly entered the loop. + if (g->released_count > 0) { + g->released_count--; + p->worker_limit++; + + // Wake a worker with the new worker limit. + locked_pool_wake_worker_if_allowed(p); + } + + return true; + } + + // No tasks, and we have been released, party! + if (g->released_count > 0) { + g->released_count--; + return false; + } + + // We where woken up, but nothing had been released, loop again. + return true; +} + +static void +locked_group_wake_waiter_if_allowed(struct pool *p, struct group *g) +{ + // Are there still outstanding tasks? + if (g->current_submitted_tasks_count > 0) { + return; + } + + // Is there a thread waiting or not? + if (g->waiting.count == 0) { + return; + } + + // Wake one waiting thread. + os_cond_signal(&g->waiting.cond); + + assert(p->worker_limit > p->initial_worker_limit); + + // Remove one waiting threads. + p->worker_limit--; + + // We have released one thread. + g->released_count++; +} + +static void +locked_group_wait(struct pool *p, struct group *g) +{ + // Update tracking. + g->waiting.count++; + + // The wait, also unlocks the mutex. + os_cond_wait(&g->waiting.cond, &p->mutex); + + // Update tracking. + g->waiting.count--; +} + + +/* + * + * Thread internal functions. + * + */ + +static bool +locked_thread_allowed_to_work(struct pool *p) +{ + // No work for you! + if (p->tasks_in_array_count == 0) { + return false; + } + + // Reached the limit. + if (p->working_count >= p->worker_limit) { + return false; + } + + return true; +} + +static void +locked_thread_wait_for_work(struct pool *p) +{ + // Update tracking. + p->available.count++; + + // The wait, also unlocks the mutex. + os_cond_wait(&p->available.cond, &p->mutex); + + // Update tracking. + p->available.count--; +} + +static void * +run_func(void *ptr) +{ + struct thread *t = (struct thread *)ptr; + struct pool *p = t->p; + + os_mutex_lock(&p->mutex); + + while (p->running) { + + if (!locked_thread_allowed_to_work(p)) { + locked_thread_wait_for_work(p); + + // Check running first when woken up. + continue; + } + + // Pop a task from the pool. + struct task task = {NULL, NULL, NULL}; + locked_pool_pop_task(p, &task); + + // We are now counting as working, needed for wake below. + p->working_count++; + + // Signal another thread if conditions are met. + locked_pool_wake_worker_if_allowed(p); + + // Do the actual work here. + os_mutex_unlock(&p->mutex); + task.func(task.data); + os_mutex_lock(&p->mutex); + + // No longer working. + p->working_count--; + + // Only now decrement the task count on the owning group. + task.g->current_submitted_tasks_count--; + + // Wake up any waiter. + locked_group_wake_waiter_if_allowed(p, task.g); + } + + // Make sure all threads are woken up. + os_cond_signal(&p->available.cond); + + os_mutex_unlock(&p->mutex); + + return NULL; +} + + +/* + * + * 'Exported' thread pool functions. + * + */ + +struct u_worker_thread_pool * +u_worker_thread_pool_create(uint32_t starting_worker_count, uint32_t thread_count) +{ + XRT_TRACE_MARKER(); + + assert(starting_worker_count < thread_count); + if (starting_worker_count >= thread_count) { + return NULL; + } + + assert(thread_count <= MAX_THREAD_COUNT); + if (thread_count > MAX_THREAD_COUNT) { + return NULL; + } + + struct pool *p = U_TYPED_CALLOC(struct pool); + p->base.reference.count = 1; + p->initial_worker_limit = starting_worker_count; + p->worker_limit = starting_worker_count; + p->thread_count = thread_count; + p->running = true; + + for (size_t i = 0; i < thread_count; i++) { + p->threads[i].p = p; + os_thread_init(&p->threads[i].thread); + os_thread_start(&p->threads[i].thread, run_func, &p->threads[i]); + } + + return (struct u_worker_thread_pool *)p; +} + +void +u_worker_thread_pool_destroy(struct u_worker_thread_pool *uwtp) +{ + XRT_TRACE_MARKER(); + + struct pool *p = pool(uwtp); + + os_mutex_lock(&p->mutex); + + p->running = false; + os_cond_signal(&p->available.cond); + os_mutex_unlock(&p->mutex); + + // Wait for all threads. + for (size_t i = 0; i < p->thread_count; i++) { + os_thread_join(&p->threads[i].thread); + os_thread_destroy(&p->threads[i].thread); + } + + os_mutex_destroy(&p->mutex); + os_cond_destroy(&p->available.cond); + + free(p); +} + + +/* + * + * 'Exported' group functions. + * + */ + +struct u_worker_group * +u_worker_group_create(struct u_worker_thread_pool *uwtp) +{ + XRT_TRACE_MARKER(); + + struct group *g = U_TYPED_CALLOC(struct group); + g->base.reference.count = 1; + u_worker_thread_pool_reference(&g->uwtp, uwtp); + + os_cond_init(&g->waiting.cond); + + return (struct u_worker_group *)g; +} + +void +u_worker_group_push(struct u_worker_group *uwp, u_worker_group_func_t f, void *data) +{ + XRT_TRACE_MARKER(); + + struct group *g = group(uwp); + struct pool *p = pool(g->uwtp); + + os_mutex_lock(&p->mutex); + while (p->tasks_in_array_count >= MAX_TASK_COUNT) { + os_mutex_unlock(&p->mutex); + + //! @todo Don't wait all, wait one. + u_worker_group_wait_all(uwp); + + os_mutex_lock(&p->mutex); + } + + locked_pool_push_task(p, g, f, data); + + // There are worker threads available, wake one up. + if (p->available.count > 0) { + os_cond_signal(&p->available.cond); + } + + os_mutex_unlock(&p->mutex); +} + +void +u_worker_group_wait_all(struct u_worker_group *uwp) +{ + XRT_TRACE_MARKER(); + + struct group *g = group(uwp); + struct pool *p = pool(g->uwtp); + + os_mutex_lock(&p->mutex); + + // Can we early out? + if (!locked_group_should_enter_wait_loop(p, g)) { + os_mutex_unlock(&p->mutex); + return; + } + + // Wait here until all work been started and completed. + while (locked_group_should_wait(p, g)) { + // Do the wait. + locked_group_wait(p, g); + } + + os_mutex_unlock(&p->mutex); +} + +void +u_worker_group_destroy(struct u_worker_group *uwp) +{ + XRT_TRACE_MARKER(); + + struct group *g = group(uwp); + assert(g->base.reference.count == 0); + + u_worker_group_wait_all(uwp); + + u_worker_thread_pool_reference(&g->uwtp, NULL); + + os_cond_destroy(&g->waiting.cond); + + free(uwp); +} diff --git a/src/xrt/auxiliary/util/u_worker.cpp b/src/xrt/auxiliary/util/u_worker.cpp new file mode 100644 index 000000000..329d3f008 --- /dev/null +++ b/src/xrt/auxiliary/util/u_worker.cpp @@ -0,0 +1,20 @@ +// Copyright 2022, Collabora, Ltd. +// SPDX-License-Identifier: BSL-1.0 +/*! + * @file + * @brief C++ wrappers for workers. + * @author Jakob Bornecrantz + * + * @ingroup aux_util + */ + +#include "util/u_worker.hpp" + + +void +xrt::auxiliary::util::TaskCollection::cCallback(void *data_ptr) +{ + auto &f = *static_cast(data_ptr); + f(); + f = nullptr; +} diff --git a/src/xrt/auxiliary/util/u_worker.h b/src/xrt/auxiliary/util/u_worker.h new file mode 100644 index 000000000..16e06de0a --- /dev/null +++ b/src/xrt/auxiliary/util/u_worker.h @@ -0,0 +1,172 @@ +// Copyright 2022, Collabora, Ltd. +// SPDX-License-Identifier: BSL-1.0 +/*! + * @file + * @brief Worker and threading pool. + * @author Jakob Bornecrantz + * + * @ingroup aux_util + */ + +#include "xrt/xrt_defines.h" + + +#ifdef __cplusplus +extern "C" { +#endif + + +/* + * + * Worker thread pool. + * + */ + +/*! + * A worker pool, can shared between multiple groups worker pool. + * + * @ingroup aux_util + */ +struct u_worker_thread_pool +{ + struct xrt_reference reference; +}; + +/*! + * Creates a new thread pool to be used by a worker group. + * + * @param starting_worker_count How many worker threads can be active at the + * same time without any "donated" threads. + * @param thread_count The number of threads to be created in total, + * this is the maximum threads that can be in + * flight at the same time. + * + * @ingroup aux_util + */ +struct u_worker_thread_pool * +u_worker_thread_pool_create(uint32_t starting_worker_count, uint32_t thread_count); + +/*! + * Internal function, only called by reference. + * + * @ingroup aux_util + */ +void +u_worker_thread_pool_destroy(struct u_worker_thread_pool *uwtp); + +/*! + * Standard Monado reference function. + * + * @ingroup aux_util + */ +static inline void +u_worker_thread_pool_reference(struct u_worker_thread_pool **dst, struct u_worker_thread_pool *src) +{ + struct u_worker_thread_pool *old_dst = *dst; + + if (old_dst == src) { + return; + } + + if (src) { + xrt_reference_inc(&src->reference); + } + + *dst = src; + + if (old_dst) { + if (xrt_reference_dec(&old_dst->reference)) { + u_worker_thread_pool_destroy(old_dst); + } + } +} + + +/* + * + * Worker group. + * + */ + +/*! + * A worker group where you submit tasks to. Can share a thread pool with + * multiple groups. Also can "donate" a thread to the thread pool by waiting. + * + * @ingroup aux_util + */ +struct u_worker_group +{ + struct xrt_reference reference; +}; + +/*! + * Function typedef for tasks. + * + * @ingroup aux_util + */ +typedef void (*u_worker_group_func_t)(void *); + +/*! + * Create a new worker group. + * + * @ingroup aux_util + */ +struct u_worker_group * +u_worker_group_create(struct u_worker_thread_pool *uwtp); + +/*! + * Push a new task to worker group. + * + * @ingroup aux_util + */ +void +u_worker_group_push(struct u_worker_group *uwg, u_worker_group_func_t f, void *data); + +/*! + * Wait for all pushed tasks to be completed, "donates" this thread to the + * shared thread pool. + * + * @ingroup aux_util + */ +void +u_worker_group_wait_all(struct u_worker_group *uwg); + +/*! + * Destroy a worker pool. + * + * @ingroup aux_util + */ +void +u_worker_group_destroy(struct u_worker_group *uwg); + +/*! + * Standard Monado reference function. + * + * @ingroup aux_util + */ +static inline void +u_worker_group_reference(struct u_worker_group **dst, struct u_worker_group *src) +{ + struct u_worker_group *old_dst = *dst; + + if (old_dst == src) { + return; + } + + if (src) { + xrt_reference_inc(&src->reference); + } + + *dst = src; + + if (old_dst) { + if (xrt_reference_dec(&old_dst->reference)) { + u_worker_group_destroy(old_dst); + } + } +} + + +#ifdef __cplusplus +} +#endif diff --git a/src/xrt/auxiliary/util/u_worker.hpp b/src/xrt/auxiliary/util/u_worker.hpp new file mode 100644 index 000000000..c43d13989 --- /dev/null +++ b/src/xrt/auxiliary/util/u_worker.hpp @@ -0,0 +1,191 @@ +// Copyright 2022, Collabora, Ltd. +// SPDX-License-Identifier: BSL-1.0 +/*! + * @file + * @brief C++ wrappers for workers. + * @author Jakob Bornecrantz + * + * @ingroup aux_util + */ + +#pragma once + +#include "util/u_worker.h" + +#include +#include +#include + + +namespace xrt::auxiliary::util { + +class TaskCollection; +class SharedThreadGroup; + +/*! + * Wrapper around @ref u_worker_thread_pool. + * + * @ingroup aux_util + */ +class SharedThreadPool +{ +private: + u_worker_thread_pool *mPool = nullptr; + + +public: + explicit SharedThreadPool(SharedThreadPool const ©) + { + u_worker_thread_pool_reference(&mPool, copy.mPool); + } + + /*! + * Take a C thread pool as argument in case the pool is shared between + * different C++ components over C interfaces, or created externally. + */ + explicit SharedThreadPool(u_worker_thread_pool *uwtp) + { + u_worker_thread_pool_reference(&mPool, uwtp); + } + + /*! + * @copydoc u_worker_thread_pool_create. + */ + explicit SharedThreadPool(uint32_t starting_worker_count, uint32_t thread_count) + { + mPool = u_worker_thread_pool_create(starting_worker_count, thread_count); + } + + ~SharedThreadPool() + { + u_worker_thread_pool_reference(&mPool, nullptr); + } + + SharedThreadPool & + operator=(const SharedThreadPool &other) + { + if (this == &other) { + return *this; + } + + u_worker_thread_pool_reference(&mPool, other.mPool); + return *this; + } + + friend SharedThreadGroup; + + // No default contstructor. + SharedThreadPool() = delete; + // No move. + SharedThreadPool(SharedThreadPool &&) = delete; + // No move assign. + SharedThreadPool & + operator=(SharedThreadPool &&) = delete; +}; + +/*! + * Wrapper around @ref u_worker_group, use @ref TaskCollection to dispatch work. + * + * @ingroup aux_util + */ +class SharedThreadGroup +{ +private: + u_worker_group *mGroup = nullptr; + + +public: + explicit SharedThreadGroup(SharedThreadPool const &stp) + { + mGroup = u_worker_group_create(stp.mPool); + } + + ~SharedThreadGroup() + { + u_worker_group_reference(&mGroup, nullptr); + } + + friend TaskCollection; + + // No default constructor. + SharedThreadGroup() = delete; + // Do not move or copy the shared thread group. + SharedThreadGroup(SharedThreadGroup const &) = delete; + SharedThreadGroup(SharedThreadGroup &&) = delete; + SharedThreadGroup & + operator=(SharedThreadGroup const &) = delete; + SharedThreadGroup & + operator=(SharedThreadGroup &&) = delete; +}; + +/*! + * Class to let users fall into a pit of success by + * being designed as a one shot dispatcher instance. + * + * @ingroup aux_util + */ +class TaskCollection +{ +public: + typedef std::function Functor; + + +private: + static constexpr size_t kSize = 16; + + Functor mFunctors[kSize] = {}; + u_worker_group *mGroup = nullptr; + + +public: + /*! + * Give all Functors when constructed, some what partially + * avoids use after leaving scope issues of function delegates. + */ + TaskCollection(SharedThreadGroup const &stc, std::vector funcs) + { + assert(funcs.size() <= kSize); + + u_worker_group_reference(&mGroup, stc.mGroup); + + for (size_t i = 0; i < kSize && i < funcs.size(); i++) { + mFunctors[i] = funcs[i]; + u_worker_group_push(mGroup, &cCallback, &mFunctors[i]); + } + } + + ~TaskCollection() + { + // Also unreferences the group. + waitAll(); + } + + /*! + * Waits for all given tasks to complete, also frees the group. + */ + void + waitAll() + { + if (mGroup == nullptr) { + return; + } + u_worker_group_wait_all(mGroup); + u_worker_group_reference(&mGroup, nullptr); + } + + + // Do not move or copy the task collection. + TaskCollection(TaskCollection const &) = delete; + TaskCollection(TaskCollection &&) = delete; + TaskCollection & + operator=(TaskCollection const &) = delete; + TaskCollection & + operator=(TaskCollection &&) = delete; + + +private: + static void + cCallback(void *data_ptr); +}; + +} // namespace xrt::auxiliary::util