d/remote: Properly shut down the run_thread

This commit is contained in:
Jakob Bornecrantz 2022-05-28 16:49:06 +01:00
parent 2b35699d85
commit 46305b77db

View file

@ -104,32 +104,46 @@ setup_accept_fd(struct r_hub *r)
return 0; return 0;
} }
static bool
wait_for_read_and_to_continue(struct r_hub *r, int socket)
{
struct timeval timeout = {.tv_sec = 1};
fd_set set;
int ret = 0;
// To be more roboust
if (socket < 0) {
return false;
}
while (os_thread_helper_is_running(&r->oth) && ret == 0) {
FD_ZERO(&set);
FD_SET(socket, &set);
ret = select(socket + 1, &set, NULL, NULL, &timeout);
}
if (ret < 0) {
R_ERROR(r, "select: %i", ret);
return false;
} else if (ret > 0) {
return true;
} else {
return false;
}
}
int int
do_accept(struct r_hub *r) do_accept(struct r_hub *r)
{ {
struct sockaddr_in addr = {0}; struct sockaddr_in addr = {0};
struct timeval timeout = {.tv_sec = 1}; int ret = 0;
fd_set set;
int ret;
int conn_fd; int conn_fd;
// Shutting down. if (!wait_for_read_and_to_continue(r, r->accept_fd)) {
if (r->accept_fd < 0) {
return -1; return -1;
} }
do {
FD_ZERO(&set);
FD_SET(r->accept_fd, &set);
ret = select(r->accept_fd + 1, &set, NULL, NULL, &timeout);
} while (ret == 0);
if (ret < 0) {
R_ERROR(r, "select: %i", ret);
return ret;
}
socklen_t addr_length = (socklen_t)sizeof(addr); socklen_t addr_length = (socklen_t)sizeof(addr);
ret = accept(r->accept_fd, (struct sockaddr *)&addr, &addr_length); ret = accept(r->accept_fd, (struct sockaddr *)&addr, &addr_length);
if (ret < 0) { if (ret < 0) {
@ -154,6 +168,36 @@ do_accept(struct r_hub *r)
return 0; return 0;
} }
static int
read_one(struct r_hub *r, struct r_remote_data *data)
{
struct r_remote_connection *rc = &r->rc;
const size_t size = sizeof(*data);
size_t current = 0;
while (current < size) {
void *ptr = (uint8_t *)data + current;
if (!wait_for_read_and_to_continue(r, rc->fd)) {
return -1;
}
ssize_t ret = read(rc->fd, ptr, size - current);
if (ret < 0) {
RC_ERROR(rc, "read: %zi", ret);
return ret;
} else if (ret > 0) {
current += (size_t)ret;
} else {
R_INFO(r, "Disconnected!");
return -1;
}
}
return 0;
}
void * void *
run_thread(void *ptr) run_thread(void *ptr)
{ {
@ -166,7 +210,7 @@ run_thread(void *ptr)
return NULL; return NULL;
} }
while (r->accept_fd >= 0) { while (os_thread_helper_is_running(&r->oth)) {
R_INFO(r, "Listening on port '%i'.", r->port); R_INFO(r, "Listening on port '%i'.", r->port);
ret = do_accept(r); ret = do_accept(r);
@ -181,7 +225,7 @@ run_thread(void *ptr)
while (true) { while (true) {
struct r_remote_data data; struct r_remote_data data;
ret = r_remote_connection_read_one(&r->rc, &data); ret = read_one(r, &data);
if (ret < 0) { if (ret < 0) {
break; break;
} }
@ -200,19 +244,17 @@ r_hub_destroy(struct xrt_system_devices *xsysd)
{ {
struct r_hub *r = (struct r_hub *)xsysd; struct r_hub *r = (struct r_hub *)xsysd;
/* R_DEBUG(r, "Destroying");
* Destroy all of the devices first.
*/
// Stop the thread first.
os_thread_helper_stop_and_wait(&r->oth);
// Destroy all of the devices now.
for (uint32_t i = 0; i < ARRAY_SIZE(r->base.xdevs); i++) { for (uint32_t i = 0; i < ARRAY_SIZE(r->base.xdevs); i++) {
xrt_device_destroy(&r->base.xdevs[i]); xrt_device_destroy(&r->base.xdevs[i]);
} }
// Should be safe to destroy the sockets now.
/*
* Harshly pull the plug on the sockets to wakeup the thread.
*/
if (r->accept_fd >= 0) { if (r->accept_fd >= 0) {
close(r->accept_fd); close(r->accept_fd);
r->accept_fd = -1; r->accept_fd = -1;
@ -223,13 +265,6 @@ r_hub_destroy(struct xrt_system_devices *xsysd)
r->rc.fd = -1; r->rc.fd = -1;
} }
/*
* Should be safe to stop the thread now.
*/
os_thread_helper_stop_and_wait(&r->oth);
free(r); free(r);
} }