ipc/server, t/server_lib: Implement mainloop code for Android, using a pipe.

This commit is contained in:
Ryan Pavlik 2021-02-12 09:23:59 -06:00
parent 28c84af396
commit 01fbbc4ed5
7 changed files with 401 additions and 36 deletions

View file

@ -114,6 +114,9 @@ if(ANDROID)
PROPERTIES
CXX_STANDARD 17
CXX_STANDARD_REQUIRED ON)
target_sources(ipc_server PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/server/ipc_server_mainloop_android.c
)
target_link_libraries(ipc_server PUBLIC
${ANDROID_LIBRARY}
PRIVATE

View file

@ -19,6 +19,8 @@ import androidx.annotation.NonNull;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
/**
* Java implementation of the IMonado IPC interface.
* <p>
@ -39,10 +41,38 @@ public class MonadoImpl extends IMonado.Stub {
System.loadLibrary("monado-service");
}
private final Thread compositorThread = new Thread(
this::threadEntry,
"CompositorThread");
private boolean started = false;
private void launchThreadIfNeeded() {
synchronized (compositorThread) {
if (!started) {
compositorThread.start();
nativeWaitForServerStartup();
started = true;
}
}
}
@Override
public void connect(@NotNull ParcelFileDescriptor parcelFileDescriptor) {
Log.i(TAG, "connect");
nativeAddClient(parcelFileDescriptor.detachFd());
/// @todo launch this thread earlier/elsewhere
launchThreadIfNeeded();
int fd = parcelFileDescriptor.getFd();
Log.i(TAG, "connect: given fd " + fd);
if (nativeAddClient(fd) != 0) {
Log.e(TAG, "Failed to transfer client fd ownership!");
try {
parcelFileDescriptor.close();
} catch (IOException e) {
// do nothing, probably already closed.
}
} else {
Log.i(TAG, "connect: fd ownership transferred");
parcelFileDescriptor.detachFd();
}
}
@Override
@ -55,6 +85,24 @@ public class MonadoImpl extends IMonado.Stub {
nativeAppSurface(surface);
}
private void threadEntry() {
Log.i(TAG, "threadEntry");
nativeThreadEntry();
Log.i(TAG, "native thread has exited");
}
/**
* Native thread entry point.
*/
@SuppressWarnings("JavaJniMissingFunction")
private native void nativeThreadEntry();
/**
* Native method that waits until the server reports that it is, in fact, started up.
*/
@SuppressWarnings("JavaJniMissingFunction")
private native void nativeWaitForServerStartup();
/**
* Native handling of receiving a surface: should convert it to an ANativeWindow then do stuff
* with it.
@ -63,7 +111,7 @@ public class MonadoImpl extends IMonado.Stub {
* See `src/xrt/targets/service-lib/service_target.cpp` for the implementation.
*
* @param surface The surface to pass to native code
* @todo figure out a good way to make the MonadoImpl pointer a client ID
* @todo figure out a good way to have a client ID
*/
@SuppressWarnings("JavaJniMissingFunction")
private native void nativeAppSurface(@NonNull Surface surface);
@ -80,8 +128,9 @@ public class MonadoImpl extends IMonado.Stub {
* See `src/xrt/targets/service-lib/service_target.cpp` for the implementation.
*
* @param fd The incoming file descriptor: ownership is transferred to native code here.
* @todo figure out a good way to make the MonadoImpl pointer a client ID
* @return 0 on success, anything else means the fd wasn't sent and ownership not transferred.
* @todo figure out a good way to have a client ID
*/
@SuppressWarnings("JavaJniMissingFunction")
private native void nativeAddClient(int fd);
private native int nativeAddClient(int fd);
}

View file

@ -159,7 +159,24 @@ struct ipc_device
struct ipc_server_mainloop
{
#if defined(XRT_OS_ANDROID)
int _unused;
//! For waiting on various events in the main thread.
int epoll_fd;
//! File descriptor for the read end of our pipe for submitting new clients
int pipe_read;
//! File descriptor for the write end of our pipe for submitting new clients
int pipe_write;
//! The last client fd we accepted, to unblock the right client.
int last_accepted_fd;
//! Mutex for accepting clients
pthread_mutex_t accept_mutex;
//! Condition variable for accepting clients
pthread_cond_t accept_cond;
#elif defined(XRT_OS_LINUX)
//! Socket that we accept connections on.
@ -258,14 +275,19 @@ int
ipc_server_main(int argc, char **argv);
#endif
#ifdef XRT_OS_ANDROID
/*!
* Android entry point to the IPC server process.
* Main entrypoint to the server process.
*
* @param ps Pointer to populate with the server struct.
* @param startup_complete_callback Function to call upon completing startup and populating *ps, but before entering the
* mainloop.
* @param data user data to pass to your callback.
*
* @ingroup ipc_server
*/
#ifdef XRT_OS_ANDROID
int
ipc_server_main_android(int fd);
ipc_server_main_android(struct ipc_server **ps, void (*startup_complete_callback)(void *data), void *data);
#endif
/*!

View file

@ -0,0 +1,205 @@
// Copyright 2020-2021, Collabora, Ltd.
// SPDX-License-Identifier: BSL-1.0
/*!
* @file
* @brief Server mainloop details on Android.
* @author Ryan Pavlik <ryan.pavlik@collabora.com>
* @author Pete Black <pblack@collabora.com>
* @author Jakob Bornecrantz <jakob@collabora.com>
* @ingroup ipc_server
*/
#include "xrt/xrt_config_have.h"
#include "xrt/xrt_config_os.h"
#include "os/os_time.h"
#include "util/u_var.h"
#include "util/u_misc.h"
#include "util/u_debug.h"
#include "server/ipc_server.h"
#include "server/ipc_server_mainloop_android.h"
#include <stdlib.h>
#include <unistd.h>
#include <inttypes.h>
#include <stdbool.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#define SHUTTING_DOWN (-1)
/*
*
* Static functions.
*
*/
static int
init_pipe(struct ipc_server_mainloop *ml)
{
int pipefd[2];
int ret = pipe(pipefd);
if (ret < 0) {
U_LOG_E("pipe2() failed '%i'", ret);
return ret;
}
ml->pipe_read = pipefd[0];
ml->pipe_write = pipefd[1];
return 0;
}
static int
init_epoll(struct ipc_server_mainloop *ml)
{
int ret = epoll_create1(EPOLL_CLOEXEC);
if (ret < 0) {
return ret;
}
pthread_mutex_init(&ml->accept_mutex, NULL);
pthread_cond_init(&ml->accept_cond, NULL);
ml->epoll_fd = ret;
struct epoll_event ev = {0};
ev.events = EPOLLIN;
ev.data.fd = ml->pipe_read;
ret = epoll_ctl(ml->epoll_fd, EPOLL_CTL_ADD, ml->pipe_read, &ev);
if (ret < 0) {
U_LOG_E("epoll_ctl(pipe_read) failed '%i'", ret);
return ret;
}
return 0;
}
static void
handle_listen(struct ipc_server *vs, struct ipc_server_mainloop *ml)
{
int newfd = 0;
if (read(ml->pipe_read, &newfd, sizeof(newfd)) == sizeof(newfd)) {
ipc_server_start_client_listener_thread(vs, newfd);
// Release the thread that gave us this fd.
pthread_mutex_lock(&ml->accept_mutex);
ml->last_accepted_fd = newfd;
pthread_cond_broadcast(&ml->accept_cond);
pthread_mutex_unlock(&ml->accept_mutex);
} else {
U_LOG_E("error on pipe read");
ipc_server_handle_failure(vs);
return;
}
}
#define NUM_POLL_EVENTS 8
#define NO_SLEEP 0
/*
*
* Exported functions
*
*/
void
ipc_server_mainloop_poll(struct ipc_server *vs, struct ipc_server_mainloop *ml)
{
int epoll_fd = ml->epoll_fd;
struct epoll_event events[NUM_POLL_EVENTS] = {0};
// No sleeping, returns immediately.
int ret = epoll_wait(epoll_fd, events, NUM_POLL_EVENTS, NO_SLEEP);
if (ret < 0) {
U_LOG_E("epoll_wait failed with '%i'.", ret);
ipc_server_handle_failure(vs);
return;
}
for (int i = 0; i < ret; i++) {
// Somebody new at the door.
if (events[i].data.fd == ml->pipe_read) {
handle_listen(vs, ml);
}
}
}
int
ipc_server_mainloop_init(struct ipc_server_mainloop *ml)
{
int ret = init_pipe(ml);
if (ret < 0) {
ipc_server_mainloop_deinit(ml);
return ret;
}
ret = init_epoll(ml);
if (ret < 0) {
ipc_server_mainloop_deinit(ml);
return ret;
}
return 0;
}
void
ipc_server_mainloop_deinit(struct ipc_server_mainloop *ml)
{
if (ml == NULL) {
return;
}
if (ml->pipe_read > 0) {
// Close pipe on exit
close(ml->pipe_read);
ml->pipe_read = -1;
}
//! @todo close pipe_write or epoll_fd?
// Tell everybody we're done and they should go away.
pthread_mutex_lock(&ml->accept_mutex);
while (ml->last_accepted_fd != 0) {
// Don't accidentally intervene in somebody else's message,
// wait until there's no unblocks pending.
pthread_cond_wait(&ml->accept_cond, &ml->accept_mutex);
}
ml->last_accepted_fd = SHUTTING_DOWN;
pthread_cond_broadcast(&ml->accept_cond);
pthread_mutex_unlock(&ml->accept_mutex);
}
int
ipc_server_mainloop_add_fd(struct ipc_server *vs, struct ipc_server_mainloop *ml, int newfd)
{
// Take the lock here, so we don't accidentally miss our fd being accepted.
pthread_mutex_lock(&ml->accept_mutex);
// Write our fd number: the other side of the pipe is in the same process, so passing just the number is OK.
int ret = write(ml->pipe_write, &newfd, sizeof(newfd));
if (ret < 0) {
U_LOG_E("write to pipe failed with '%i'.", ret);
pthread_mutex_unlock(&ml->accept_mutex);
return ret;
}
// Normal looping on the condition variable's condition.
while (ml->last_accepted_fd != newfd && ml->last_accepted_fd != SHUTTING_DOWN) {
ret = pthread_cond_wait(&ml->accept_cond, &ml->accept_mutex);
if (ret < 0) {
U_LOG_E("pthread_cond_wait failed with '%i'.", ret);
pthread_mutex_unlock(&ml->accept_mutex);
return ret;
}
}
// OK, we have now been accepted. Zero out the last accepted fd to avoid confusing any other thread.
ml->last_accepted_fd = 0;
pthread_mutex_unlock(&ml->accept_mutex);
return 0;
}

View file

@ -0,0 +1,27 @@
// Copyright 2020-2021, Collabora, Ltd.
// SPDX-License-Identifier: BSL-1.0
/*!
* @file
* @brief Additional server entry points needed for Android.
* @author Ryan Pavlik <ryan.pavlik@collabora.com>
* @ingroup ipc_server
*/
#pragma once
#include "ipc_server.h"
#ifdef __cplusplus
extern "C" {
#endif
/*!
* Pass an fd for a new client to the mainloop.
*/
int
ipc_server_mainloop_add_fd(struct ipc_server *vs, struct ipc_server_mainloop *ml, int newfd);
#ifdef __cplusplus
}
#endif

View file

@ -88,25 +88,6 @@ teardown_idev(struct ipc_device *idev)
* Static functions.
*
*/
#ifdef XRT_OS_ANDROID
// Stub
void
ipc_server_mainloop_deinit(struct ipc_server_mainloop *ml)
{}
// Stub
int
ipc_server_mainloop_init(struct ipc_server_mainloop *ml)
{
return 0;
}
// Stub
void
ipc_server_mainloop_poll(struct ipc_server *vs, struct ipc_server_mainloop *ml)
{}
#endif
static void
teardown_all(struct ipc_server *s)
{
@ -1110,10 +1091,10 @@ ipc_server_main(int argc, char **argv)
#ifdef XRT_OS_ANDROID
int
ipc_server_main_android(int fd)
ipc_server_main_android(struct ipc_server **ps, void (*startup_complete_callback)(void *data), void *data)
{
struct ipc_server *s = U_TYPED_CALLOC(struct ipc_server);
U_LOG_D("Created IPC server on fd '%d'!", fd);
U_LOG_D("Created IPC server!");
int ret = init_all(s);
if (ret < 0) {
@ -1122,7 +1103,10 @@ ipc_server_main_android(int fd)
}
init_server_state(s);
ipc_server_start_client_listener_thread(s, fd);
*ps = s;
startup_complete_callback(data);
ret = main_loop(s);
teardown_all(s);
@ -1132,4 +1116,4 @@ ipc_server_main_android(int fd)
return ret;
}
#endif
#endif // XRT_OS_ANDROID

View file

@ -15,26 +15,101 @@
#include "wrap/android.view.h"
#include "server/ipc_server.h"
#include "server/ipc_server_mainloop_android.h"
#include "util/u_logging.h"
#include <android/native_window.h>
#include <android/native_window_jni.h>
#include "android/android_globals.h"
#include <thread>
using wrap::android::view::Surface;
namespace {
struct Singleton
{
public:
static Singleton &
instance()
{
static Singleton singleton{};
return singleton;
}
void
waitForStartupComplete()
{
std::unique_lock<std::mutex> lock{running_mutex};
running_cond.wait(lock, [&]() { return this->startup_complete; });
}
//! static trampoline for the startup complete callback
static void
signalStartupComplete()
{
instance().signalStartupCompleteNonstatic();
}
private:
void
signalStartupCompleteNonstatic()
{
std::unique_lock<std::mutex> lock{running_mutex};
startup_complete = true;
running_cond.notify_all();
}
Singleton() {}
//! Mutex for starting thread
std::mutex running_mutex;
//! Condition variable for starting thread
std::condition_variable running_cond;
bool startup_complete = false;
};
} // namespace
static struct ipc_server *server = NULL;
static void
signalStartupCompleteTrampoline(void *data)
{
static_cast<Singleton *>(data)->signalStartupComplete();
}
extern "C" void
Java_org_freedesktop_monado_ipc_MonadoImpl_nativeThreadEntry(JNIEnv *env, jobject thiz)
{
jni::init(env);
jni::Object monadoImpl(thiz);
U_LOG_D("service: Called nativeThreadEntry");
auto &singleton = Singleton::instance();
ipc_server_main_android(&server, signalStartupCompleteTrampoline, &singleton);
}
extern "C" JNIEXPORT void JNICALL
Java_org_freedesktop_monado_ipc_MonadoImpl_nativeWaitForServerStartup(JNIEnv *env, jobject thiz)
{
Singleton::instance().waitForStartupComplete();
}
extern "C" JNIEXPORT jint JNICALL
Java_org_freedesktop_monado_ipc_MonadoImpl_nativeAddClient(JNIEnv *env, jobject thiz, int fd)
{
jni::init(env);
jni::Object monadoImpl(thiz);
U_LOG_D("service: Called nativeAddClient with fd %d", fd);
// "entry point" of the native code
ipc_server_main_android(fd);
if (server == nullptr) {
// Should not happen.
U_LOG_E("service: nativeAddClient called before service started up!");
return -1;
}
// We try pushing the fd number to the server. If and only if we get a 0 return, has the server taken ownership.
return ipc_server_mainloop_add_fd(server, &server->ml, fd);
}
extern "C" void
extern "C" JNIEXPORT void JNICALL
Java_org_freedesktop_monado_ipc_MonadoImpl_nativeAppSurface(JNIEnv *env, jobject thiz, jobject surface)
{
jni::init(env);
@ -43,5 +118,5 @@ Java_org_freedesktop_monado_ipc_MonadoImpl_nativeAppSurface(JNIEnv *env, jobject
ANativeWindow *nativeWindow = ANativeWindow_fromSurface(env, surface);
android_globals_store_window((struct _ANativeWindow *)nativeWindow);
U_LOG_D("Stored ANativeWindow: %p", nativeWindow);
U_LOG_D("Stored ANativeWindow: %p", (void *)nativeWindow);
}