ipc/server: Factor out start_server_thread function.

This commit is contained in:
Lubosz Sarnecki 2020-11-25 14:02:19 +01:00
parent db95b34e6c
commit 3202922ce7

View file

@ -445,6 +445,63 @@ init_epoll(struct ipc_server *s)
return 0;
}
static void
start_client_listener_thread(struct ipc_server *vs, int fd)
{
volatile struct ipc_client_state *ics = NULL;
int32_t cs_index = -1;
os_mutex_lock(&vs->global_state_lock);
// find the next free thread in our array (server_thread_index is -1)
// and have it handle this connection
for (uint32_t i = 0; i < IPC_MAX_CLIENTS; i++) {
volatile struct ipc_client_state *_cs = &vs->threads[i].ics;
if (_cs->server_thread_index < 0) {
ics = _cs;
cs_index = i;
break;
}
}
if (ics == NULL) {
close(fd);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
U_LOG_E("Max client count reached!");
return;
}
struct ipc_thread *it = &vs->threads[cs_index];
if (it->state != IPC_THREAD_READY && it->state != IPC_THREAD_STOPPING) {
// we should not get here
close(fd);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
U_LOG_E("Client state management error!");
return;
}
if (it->state != IPC_THREAD_READY) {
os_thread_join(&it->thread);
os_thread_destroy(&it->thread);
it->state = IPC_THREAD_READY;
}
it->state = IPC_THREAD_STARTING;
ics->imc.socket_fd = fd;
ics->server = vs;
ics->server_thread_index = cs_index;
ics->io_active = true;
os_thread_start(&it->thread, ipc_server_client_thread, (void *)ics);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
}
static int
init_all(struct ipc_server *s)
{
@ -544,62 +601,7 @@ handle_listen(struct ipc_server *vs)
U_LOG_E("accept '%i'", ret);
vs->running = false;
}
volatile struct ipc_client_state *ics = NULL;
int32_t cs_index = -1;
// The return is the new fd;
int fd = ret;
os_mutex_lock(&vs->global_state_lock);
// find the next free thread in our array (server_thread_index is -1)
// and have it handle this connection
for (uint32_t i = 0; i < IPC_MAX_CLIENTS; i++) {
volatile struct ipc_client_state *_cs = &vs->threads[i].ics;
if (_cs->server_thread_index < 0) {
ics = _cs;
cs_index = i;
break;
}
}
if (ics == NULL) {
close(fd);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
U_LOG_E("Max client count reached!");
return;
}
struct ipc_thread *it = &vs->threads[cs_index];
if (it->state != IPC_THREAD_READY && it->state != IPC_THREAD_STOPPING) {
// we should not get here
close(fd);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
U_LOG_E("Client state management error!");
return;
}
if (it->state != IPC_THREAD_READY) {
os_thread_join(&it->thread);
os_thread_destroy(&it->thread);
it->state = IPC_THREAD_READY;
}
it->state = IPC_THREAD_STARTING;
ics->imc.socket_fd = fd;
ics->server = vs;
ics->server_thread_index = cs_index;
ics->io_active = true;
os_thread_start(&it->thread, ipc_server_client_thread, (void *)ics);
// Unlock when we are done.
os_mutex_unlock(&vs->global_state_lock);
start_client_listener_thread(vs, ret);
}
#define NUM_POLL_EVENTS 8