semaphore: Use condvars with separate signaled flag to prevent races (#1615)

* Revert "semaphore: Use binary_semaphore instead of condvar"

This reverts commit 85dc57b868.

* semaphore: Use separate signaled flag to prevent races

* mutex: Few misc fixes

* condvar: Few misc fixes

* signals: Add thread name to unhandled signal message.
This commit is contained in:
squidbus 2024-11-30 09:19:07 -08:00 committed by GitHub
parent 899a41823a
commit 1d14a9a3d0
5 changed files with 40 additions and 25 deletions

View file

@ -122,7 +122,7 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
SleepqUnlock(this); SleepqUnlock(this);
//_thr_cancel_enter2(curthread, 0); //_thr_cancel_enter2(curthread, 0);
int error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT; error = curthread->Sleep(abstime, usec) ? 0 : POSIX_ETIMEDOUT;
//_thr_cancel_leave(curthread, 0); //_thr_cancel_leave(curthread, 0);
SleepqLock(this); SleepqLock(this);
@ -145,7 +145,10 @@ int PthreadCond::Wait(PthreadMutexT* mutex, const OrbisKernelTimespec* abstime,
} }
SleepqUnlock(this); SleepqUnlock(this);
curthread->mutex_obj = nullptr; curthread->mutex_obj = nullptr;
mp->CvLock(recurse); int error2 = mp->CvLock(recurse);
if (error == 0) {
error = error2;
}
return error; return error;
} }

View file

@ -121,6 +121,7 @@ int PthreadMutex::SelfTryLock() {
switch (Type()) { switch (Type()) {
case PthreadMutexType::ErrorCheck: case PthreadMutexType::ErrorCheck:
case PthreadMutexType::Normal: case PthreadMutexType::Normal:
case PthreadMutexType::AdaptiveNp:
return POSIX_EBUSY; return POSIX_EBUSY;
case PthreadMutexType::Recursive: { case PthreadMutexType::Recursive: {
/* Increment the lock count: */ /* Increment the lock count: */
@ -224,7 +225,7 @@ int PthreadMutex::Lock(const OrbisKernelTimespec* abstime, u64 usec) {
[[unlikely]] { [[unlikely]] {
ret = POSIX_EINVAL; ret = POSIX_EINVAL;
} else { } else {
if (THR_RELTIME) { if (abstime == THR_RELTIME) {
ret = m_lock.try_lock_for(std::chrono::microseconds(usec)) ? 0 : POSIX_ETIMEDOUT; ret = m_lock.try_lock_for(std::chrono::microseconds(usec)) ? 0 : POSIX_ETIMEDOUT;
} else { } else {
ret = m_lock.try_lock_until(abstime->TimePoint()) ? 0 : POSIX_ETIMEDOUT; ret = m_lock.try_lock_until(abstime->TimePoint()) ? 0 : POSIX_ETIMEDOUT;
@ -336,7 +337,7 @@ int PS4_SYSV_ABI posix_pthread_mutex_isowned_np(PthreadMutexT* mutex) {
return m->m_owner == g_curthread; return m->m_owner == g_curthread;
} }
bool PthreadMutex::IsOwned(Pthread* curthread) const { int PthreadMutex::IsOwned(Pthread* curthread) const {
if (this <= THR_MUTEX_DESTROYED) [[unlikely]] { if (this <= THR_MUTEX_DESTROYED) [[unlikely]] {
if (this == THR_MUTEX_DESTROYED) { if (this == THR_MUTEX_DESTROYED) {
return POSIX_EINVAL; return POSIX_EINVAL;

View file

@ -79,7 +79,7 @@ struct PthreadMutex {
return Unlock(); return Unlock();
} }
bool IsOwned(Pthread* curthread) const; int IsOwned(Pthread* curthread) const;
}; };
using PthreadMutexT = PthreadMutex*; using PthreadMutexT = PthreadMutex*;

View file

@ -49,9 +49,7 @@ public:
const auto it = AddWaiter(&waiter); const auto it = AddWaiter(&waiter);
// Perform the wait. // Perform the wait.
lk.unlock(); const s32 result = waiter.Wait(lk, timeout);
const s32 result = waiter.Wait(timeout);
lk.lock();
if (result == SCE_KERNEL_ERROR_ETIMEDOUT) { if (result == SCE_KERNEL_ERROR_ETIMEDOUT) {
wait_list.erase(it); wait_list.erase(it);
} }
@ -74,7 +72,8 @@ public:
} }
it = wait_list.erase(it); it = wait_list.erase(it);
token_count -= waiter->need_count; token_count -= waiter->need_count;
waiter->sema.release(); waiter->was_signaled = true;
waiter->cv.notify_one();
} }
return true; return true;
@ -87,7 +86,7 @@ public:
} }
for (auto* waiter : wait_list) { for (auto* waiter : wait_list) {
waiter->was_cancled = true; waiter->was_cancled = true;
waiter->sema.release(); waiter->cv.notify_one();
} }
wait_list.clear(); wait_list.clear();
token_count = set_count < 0 ? init_count : set_count; token_count = set_count < 0 ? init_count : set_count;
@ -98,20 +97,21 @@ public:
std::scoped_lock lk{mutex}; std::scoped_lock lk{mutex};
for (auto* waiter : wait_list) { for (auto* waiter : wait_list) {
waiter->was_deleted = true; waiter->was_deleted = true;
waiter->sema.release(); waiter->cv.notify_one();
} }
wait_list.clear(); wait_list.clear();
} }
public: public:
struct WaitingThread { struct WaitingThread {
std::binary_semaphore sema; std::condition_variable cv;
u32 priority; u32 priority;
s32 need_count; s32 need_count;
bool was_signaled{};
bool was_deleted{}; bool was_deleted{};
bool was_cancled{}; bool was_cancled{};
explicit WaitingThread(s32 need_count, bool is_fifo) : sema{0}, need_count{need_count} { explicit WaitingThread(s32 need_count, bool is_fifo) : need_count{need_count} {
// Retrieve calling thread priority for sorting into waiting threads list. // Retrieve calling thread priority for sorting into waiting threads list.
if (!is_fifo) { if (!is_fifo) {
priority = g_curthread->attr.prio; priority = g_curthread->attr.prio;
@ -131,24 +131,25 @@ public:
return SCE_OK; return SCE_OK;
} }
int Wait(u32* timeout) { int Wait(std::unique_lock<std::mutex>& lk, u32* timeout) {
if (!timeout) { if (!timeout) {
// Wait indefinitely until we are woken up. // Wait indefinitely until we are woken up.
sema.acquire(); cv.wait(lk);
return GetResult(false); return GetResult(false);
} }
// Wait until timeout runs out, recording how much remaining time there was. // Wait until timeout runs out, recording how much remaining time there was.
const auto start = std::chrono::high_resolution_clock::now(); const auto start = std::chrono::high_resolution_clock::now();
const auto sema_timeout = !sema.try_acquire_for(std::chrono::microseconds(*timeout)); const auto signaled = cv.wait_for(lk, std::chrono::microseconds(*timeout),
[this] { return was_signaled; });
const auto end = std::chrono::high_resolution_clock::now(); const auto end = std::chrono::high_resolution_clock::now();
const auto time = const auto time =
std::chrono::duration_cast<std::chrono::microseconds>(end - start).count(); std::chrono::duration_cast<std::chrono::microseconds>(end - start).count();
if (sema_timeout) { if (signaled) {
*timeout = 0;
} else {
*timeout -= time; *timeout -= time;
} else {
*timeout = 0;
} }
return GetResult(sema_timeout); return GetResult(!signaled);
} }
}; };

View file

@ -41,6 +41,14 @@ static LONG WINAPI SignalHandler(EXCEPTION_POINTERS* pExp) noexcept {
#else #else
static std::string GetThreadName() {
char name[256];
if (pthread_getname_np(pthread_self(), name, sizeof(name)) != 0) {
return "<unknown name>";
}
return std::string{name};
}
static std::string DisassembleInstruction(void* code_address) { static std::string DisassembleInstruction(void* code_address) {
char buffer[256] = "<unable to decode>"; char buffer[256] = "<unable to decode>";
@ -71,16 +79,18 @@ static void SignalHandler(int sig, siginfo_t* info, void* raw_context) {
case SIGBUS: { case SIGBUS: {
const bool is_write = Common::IsWriteError(raw_context); const bool is_write = Common::IsWriteError(raw_context);
if (!signals->DispatchAccessViolation(raw_context, info->si_addr)) { if (!signals->DispatchAccessViolation(raw_context, info->si_addr)) {
UNREACHABLE_MSG("Unhandled access violation at code address {}: {} address {}", UNREACHABLE_MSG(
fmt::ptr(code_address), is_write ? "Write to" : "Read from", "Unhandled access violation in thread '{}' at code address {}: {} address {}",
fmt::ptr(info->si_addr)); GetThreadName(), fmt::ptr(code_address), is_write ? "Write to" : "Read from",
fmt::ptr(info->si_addr));
} }
break; break;
} }
case SIGILL: case SIGILL:
if (!signals->DispatchIllegalInstruction(raw_context)) { if (!signals->DispatchIllegalInstruction(raw_context)) {
UNREACHABLE_MSG("Unhandled illegal instruction at code address {}: {}", UNREACHABLE_MSG("Unhandled illegal instruction in thread '{}' at code address {}: {}",
fmt::ptr(code_address), DisassembleInstruction(code_address)); GetThreadName(), fmt::ptr(code_address),
DisassembleInstruction(code_address));
} }
break; break;
case SIGUSR1: { // Sleep thread until signal is received case SIGUSR1: { // Sleep thread until signal is received