diff --git a/src/xrt/drivers/remote/r_hub.c b/src/xrt/drivers/remote/r_hub.c index 6e9a20965..db5870c23 100644 --- a/src/xrt/drivers/remote/r_hub.c +++ b/src/xrt/drivers/remote/r_hub.c @@ -104,32 +104,46 @@ setup_accept_fd(struct r_hub *r) return 0; } +static bool +wait_for_read_and_to_continue(struct r_hub *r, int socket) +{ + struct timeval timeout = {.tv_sec = 1}; + fd_set set; + int ret = 0; + + // To be more roboust + if (socket < 0) { + return false; + } + + while (os_thread_helper_is_running(&r->oth) && ret == 0) { + FD_ZERO(&set); + FD_SET(socket, &set); + + ret = select(socket + 1, &set, NULL, NULL, &timeout); + } + + if (ret < 0) { + R_ERROR(r, "select: %i", ret); + return false; + } else if (ret > 0) { + return true; + } else { + return false; + } +} + int do_accept(struct r_hub *r) { struct sockaddr_in addr = {0}; - struct timeval timeout = {.tv_sec = 1}; - fd_set set; - int ret; + int ret = 0; int conn_fd; - // Shutting down. - if (r->accept_fd < 0) { + if (!wait_for_read_and_to_continue(r, r->accept_fd)) { return -1; } - do { - FD_ZERO(&set); - FD_SET(r->accept_fd, &set); - - ret = select(r->accept_fd + 1, &set, NULL, NULL, &timeout); - } while (ret == 0); - - if (ret < 0) { - R_ERROR(r, "select: %i", ret); - return ret; - } - socklen_t addr_length = (socklen_t)sizeof(addr); ret = accept(r->accept_fd, (struct sockaddr *)&addr, &addr_length); if (ret < 0) { @@ -154,6 +168,36 @@ do_accept(struct r_hub *r) return 0; } +static int +read_one(struct r_hub *r, struct r_remote_data *data) +{ + struct r_remote_connection *rc = &r->rc; + + const size_t size = sizeof(*data); + size_t current = 0; + + while (current < size) { + void *ptr = (uint8_t *)data + current; + + if (!wait_for_read_and_to_continue(r, rc->fd)) { + return -1; + } + + ssize_t ret = read(rc->fd, ptr, size - current); + if (ret < 0) { + RC_ERROR(rc, "read: %zi", ret); + return ret; + } else if (ret > 0) { + current += (size_t)ret; + } else { + R_INFO(r, "Disconnected!"); + return -1; + } + } + + return 0; +} + void * run_thread(void *ptr) { @@ -166,7 +210,7 @@ run_thread(void *ptr) return NULL; } - while (r->accept_fd >= 0) { + while (os_thread_helper_is_running(&r->oth)) { R_INFO(r, "Listening on port '%i'.", r->port); ret = do_accept(r); @@ -181,7 +225,7 @@ run_thread(void *ptr) while (true) { struct r_remote_data data; - ret = r_remote_connection_read_one(&r->rc, &data); + ret = read_one(r, &data); if (ret < 0) { break; } @@ -200,19 +244,17 @@ r_hub_destroy(struct xrt_system_devices *xsysd) { struct r_hub *r = (struct r_hub *)xsysd; - /* - * Destroy all of the devices first. - */ + R_DEBUG(r, "Destroying"); + // Stop the thread first. + os_thread_helper_stop_and_wait(&r->oth); + + // Destroy all of the devices now. for (uint32_t i = 0; i < ARRAY_SIZE(r->base.xdevs); i++) { xrt_device_destroy(&r->base.xdevs[i]); } - - /* - * Harshly pull the plug on the sockets to wakeup the thread. - */ - + // Should be safe to destroy the sockets now. if (r->accept_fd >= 0) { close(r->accept_fd); r->accept_fd = -1; @@ -223,13 +265,6 @@ r_hub_destroy(struct xrt_system_devices *xsysd) r->rc.fd = -1; } - - /* - * Should be safe to stop the thread now. - */ - - os_thread_helper_stop_and_wait(&r->oth); - free(r); }