diff --git a/winpr/include/winpr/thread.h b/winpr/include/winpr/thread.h index ce840ba49..4e603b3dc 100644 --- a/winpr/include/winpr/thread.h +++ b/winpr/include/winpr/thread.h @@ -213,6 +213,9 @@ extern "C" WINPR_API HANDLE _GetCurrentThread(void); WINPR_API DWORD GetCurrentThreadId(void); + typedef void (*PAPCFUNC)(ULONG_PTR Parameter); + WINPR_API DWORD QueueUserAPC(PAPCFUNC pfnAPC, HANDLE hThread, ULONG_PTR dwData); + WINPR_API DWORD ResumeThread(HANDLE hThread); WINPR_API DWORD SuspendThread(HANDLE hThread); WINPR_API BOOL SwitchToThread(void); diff --git a/winpr/libwinpr/synch/event.c b/winpr/libwinpr/synch/event.c index 96164ff54..f5b3946a4 100644 --- a/winpr/libwinpr/synch/event.c +++ b/winpr/libwinpr/synch/event.c @@ -41,14 +41,112 @@ #include #endif +#include #include #include "../handle/handle.h" #include "../pipe/pipe.h" #include "../log.h" +#include "event.h" #define TAG WINPR_TAG("synch.event") +#ifdef HAVE_SYS_EVENTFD_H +#if !defined(WITH_EVENTFD_READ_WRITE) +static int eventfd_read(int fd, eventfd_t* value) +{ + return (read(fd, value, sizeof(*value)) == sizeof(*value)) ? 0 : -1; +} + +static int eventfd_write(int fd, eventfd_t value) +{ + return (write(fd, &value, sizeof(value)) == sizeof(value)) ? 0 : -1; +} +#endif +#endif + +BOOL winpr_event_init(WINPR_EVENT_IMPL* event) +{ +#ifdef HAVE_SYS_EVENTFD_H + event->fds[1] = -1; + event->fds[0] = eventfd(0, EFD_NONBLOCK); + + return event->fds[0] >= 0; +#else + int flags; + + if (pipe(event->fds) < 0) + return FALSE; + + flags = fcntl(event->fds[0], F_GETFL); + if (flags < 0) + goto out_error; + + if (fcntl(event->fds[0], F_SETFL, flags | O_NONBLOCK) < 0) + goto out_error; + + return TRUE; + +out_error: + winpr_event_uninit(&event); + return FALSE; +#endif +} + +void winpr_event_init_from_fd(WINPR_EVENT_IMPL* event, int fd) +{ + event->fds[0] = fd; +#ifndef HAVE_SYS_EVENTFD_H + event->fds[1] = fd; +#endif +} + +BOOL winpr_event_set(WINPR_EVENT_IMPL* event) +{ + int ret; + do + { +#ifdef HAVE_SYS_EVENTFD_H + eventfd_t value = 1; + ret = eventfd_write(event->fds[0], value); +#else + ret = write(event->fds[1], "-", 1); +#endif + } while (ret < 0 && errno == EINTR); + + return ret >= 0; +} + +BOOL winpr_event_reset(WINPR_EVENT_IMPL* event) +{ + int ret; + do + { + do + { +#ifdef HAVE_SYS_EVENTFD_H + eventfd_t value = 1; + ret = eventfd_read(event->fds[0], &value); +#else + char value; + ret = read(event->fds[1], &value, 1); +#endif + } while (ret < 0 && errno == EINTR); + } while (ret >= 0); + + return (errno == EAGAIN); +} + +void winpr_event_uninit(WINPR_EVENT_IMPL* event) +{ + if (event->fds[0] != -1) + close(event->fds[0]); +#ifndef HAVE_SYS_EVENTFD_H + if (event->fds[1] != -1) + close(event->fds[1]); +#endif +} + static BOOL EventCloseHandle(HANDLE handle); static BOOL EventIsHandled(HANDLE handle) @@ -71,7 +169,7 @@ static int EventGetFd(HANDLE handle) if (!EventIsHandled(handle)) return -1; - return event->pipe_fd[0]; + return event->impl.fds[0]; } static BOOL EventCloseHandle_(WINPR_EVENT* event) @@ -80,19 +178,7 @@ static BOOL EventCloseHandle_(WINPR_EVENT* event) return FALSE; if (!event->bAttached) - { - if (event->pipe_fd[0] != -1) - { - close(event->pipe_fd[0]); - event->pipe_fd[0] = -1; - } - - if (event->pipe_fd[1] != -1) - { - close(event->pipe_fd[1]); - event->pipe_fd[1] = -1; - } - } + winpr_event_uninit(&event->impl); free(event->name); free(event); @@ -161,21 +247,9 @@ HANDLE CreateEventA(LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL bManualReset, if (!event->bManualReset) WLog_ERR(TAG, "auto-reset events not yet implemented"); - event->pipe_fd[0] = -1; - event->pipe_fd[1] = -1; -#ifdef HAVE_SYS_EVENTFD_H - event->pipe_fd[0] = eventfd(0, EFD_NONBLOCK); - - if (event->pipe_fd[0] < 0) + if (!winpr_event_init(&event->impl)) goto fail; -#else - - if (pipe(event->pipe_fd) < 0) - goto fail; - -#endif - if (bInitialState) { if (!SetEvent(event)) @@ -246,25 +320,10 @@ HANDLE OpenEventA(DWORD dwDesiredAccess, BOOL bInheritHandle, LPCSTR lpName) return NULL; } -#ifdef HAVE_SYS_EVENTFD_H -#if !defined(WITH_EVENTFD_READ_WRITE) -static int eventfd_read(int fd, eventfd_t* value) -{ - return (read(fd, value, sizeof(*value)) == sizeof(*value)) ? 0 : -1; -} - -static int eventfd_write(int fd, eventfd_t value) -{ - return (write(fd, &value, sizeof(value)) == sizeof(value)) ? 0 : -1; -} -#endif -#endif - BOOL SetEvent(HANDLE hEvent) { ULONG Type; WINPR_HANDLE* Object; - int length; BOOL status; WINPR_EVENT* event; status = FALSE; @@ -273,30 +332,7 @@ BOOL SetEvent(HANDLE hEvent) { event = (WINPR_EVENT*)Object; -#ifdef HAVE_SYS_EVENTFD_H - eventfd_t val = 1; - - do - { - length = eventfd_write(event->pipe_fd[0], val); - } while ((length < 0) && (errno == EINTR)); - - status = (length == 0) ? TRUE : FALSE; -#else - - if (WaitForSingleObject(hEvent, 0) != WAIT_OBJECT_0) - { - length = write(event->pipe_fd[1], "-", 1); - - if (length == 1) - status = TRUE; - } - else - { - status = TRUE; - } - -#endif + status = winpr_event_set(&event->impl); } return status; @@ -306,8 +342,6 @@ BOOL ResetEvent(HANDLE hEvent) { ULONG Type; WINPR_HANDLE* Object; - int length; - BOOL status = TRUE; WINPR_EVENT* event; if (!winpr_Handle_GetInfo(hEvent, &Type, &Object)) @@ -315,23 +349,7 @@ BOOL ResetEvent(HANDLE hEvent) event = (WINPR_EVENT*)Object; - while (status && WaitForSingleObject(hEvent, 0) == WAIT_OBJECT_0) - { - do - { -#ifdef HAVE_SYS_EVENTFD_H - eventfd_t value; - length = eventfd_read(event->pipe_fd[0], &value); -#else - length = read(event->pipe_fd[0], &length, 1); -#endif - } while ((length < 0) && (errno == EINTR)); - - if (length < 0) - status = FALSE; - } - - return status; + return winpr_event_reset(&event->impl); } #endif @@ -348,8 +366,7 @@ HANDLE CreateFileDescriptorEventW(LPSECURITY_ATTRIBUTES lpEventAttributes, BOOL { event->bAttached = TRUE; event->bManualReset = bManualReset; - event->pipe_fd[0] = FileDescriptor; - event->pipe_fd[1] = -1; + winpr_event_init_from_fd(&event->impl, FileDescriptor); event->ops = &ops; WINPR_HANDLE_SET_TYPE_AND_MODE(event, HANDLE_TYPE_EVENT, mode); handle = (HANDLE)event; @@ -416,12 +433,12 @@ int SetEventFileDescriptor(HANDLE hEvent, int FileDescriptor, ULONG mode) event = (WINPR_EVENT*)Object; - if (!event->bAttached && event->pipe_fd[0] >= 0 && event->pipe_fd[0] != FileDescriptor) - close(event->pipe_fd[0]); + if (!event->bAttached && event->impl.fds[0] >= 0 && event->impl.fds[0] != FileDescriptor) + close(event->impl.fds[0]); event->bAttached = TRUE; event->Mode = mode; - event->pipe_fd[0] = FileDescriptor; + event->impl.fds[0] = FileDescriptor; return 0; #else return -1; diff --git a/winpr/libwinpr/synch/event.h b/winpr/libwinpr/synch/event.h new file mode 100644 index 000000000..cce77a97b --- /dev/null +++ b/winpr/libwinpr/synch/event.h @@ -0,0 +1,56 @@ +/** + * FreeRDP: A Remote Desktop Protocol Implementation + * event implementation + * + * Copyright 2021 David Fort + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef WINPR_LIBWINPR_SYNCH_EVENT_H_ +#define WINPR_LIBWINPR_SYNCH_EVENT_H_ + +#include "../handle/handle.h" + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#ifdef HAVE_SYS_EVENTFD_H +#include +#endif + +struct winpr_event_impl +{ + int fds[2]; +}; + +typedef struct winpr_event_impl WINPR_EVENT_IMPL; + +struct winpr_event +{ + WINPR_HANDLE_DEF(); + + WINPR_EVENT_IMPL impl; + BOOL bAttached; + BOOL bManualReset; + char* name; +}; +typedef struct winpr_event WINPR_EVENT; + +BOOL winpr_event_init(WINPR_EVENT_IMPL* event); +void winpr_event_init_from_fd(WINPR_EVENT_IMPL* event, int fd); +BOOL winpr_event_set(WINPR_EVENT_IMPL* event); +BOOL winpr_event_reset(WINPR_EVENT_IMPL* event); +void winpr_event_uninit(WINPR_EVENT_IMPL* event); + +#endif /* WINPR_LIBWINPR_SYNCH_EVENT_H_ */ diff --git a/winpr/libwinpr/synch/pollset.c b/winpr/libwinpr/synch/pollset.c index 2e28484a4..e496dd9c3 100644 --- a/winpr/libwinpr/synch/pollset.c +++ b/winpr/libwinpr/synch/pollset.c @@ -1,7 +1,9 @@ +#ifndef _WIN32 #include #include "pollset.h" #include +#include #include "../log.h" #define TAG WINPR_TAG("sync.pollset") @@ -41,7 +43,9 @@ BOOL pollset_init(WINPR_POLL_SET* set, size_t nhandles) if (!set->fdIndex) return FALSE; + FD_ZERO(&set->rset_base); FD_ZERO(&set->rset); + FD_ZERO(&set->wset_base); FD_ZERO(&set->wset); set->maxFd = 0; set->nread = set->nwrite = 0; @@ -65,8 +69,8 @@ void pollset_uninit(WINPR_POLL_SET* set) void pollset_reset(WINPR_POLL_SET* set) { #ifndef HAVE_POLL_H - FD_ZERO(&set->rset); - FD_ZERO(&set->wset); + FD_ZERO(&set->rset_base); + FD_ZERO(&set->wset_base); set->maxFd = 0; set->nread = set->nwrite = 0; #endif @@ -88,13 +92,13 @@ BOOL pollset_add(WINPR_POLL_SET* set, int fd, ULONG mode) FdIndex* fdIndex = &set->fdIndex[set->fillIndex]; if (mode & WINPR_FD_READ) { - FD_SET(fd, &set->rset); + FD_SET(fd, &set->rset_base); set->nread++; } if (mode & WINPR_FD_WRITE) { - FD_SET(fd, &set->wset); + FD_SET(fd, &set->wset_base); set->nwrite++; } @@ -110,35 +114,85 @@ BOOL pollset_add(WINPR_POLL_SET* set, int fd, ULONG mode) int pollset_poll(WINPR_POLL_SET* set, DWORD dwMilliseconds) { - int ret; -#ifdef HAVE_POLL_H - do - { - ret = poll(set->pollset, set->fillIndex, dwMilliseconds); - } while (ret < 0 && errno == EINTR); -#else - struct timeval staticTimeout; - struct timeval* timeout; + int ret = 0; + UINT64 dueTime, now; - if (dwMilliseconds == INFINITE || dwMilliseconds == 0) - { - timeout = NULL; - } + now = GetTickCount64(); + if (dwMilliseconds == INFINITE) + dueTime = 0xFFFFFFFFFFFFFFFF; else - { - timeout = &staticTimeout; - timeout->tv_sec = dwMilliseconds / 1000; - timeout->tv_usec = (dwMilliseconds % 1000) * 1000; - } + dueTime = now + dwMilliseconds; + +#ifdef HAVE_POLL_H + int timeout; do { - ret = select(set->maxFd + 1, set->nread ? &set->rset : NULL, - set->nwrite ? &set->wset : NULL, NULL, timeout); - } while (ret < 0 && errno == EINTR); + if (dwMilliseconds == INFINITE) + timeout = -1; + else + timeout = (int)(dueTime - now); + + ret = poll(set->pollset, set->fillIndex, timeout); + if (ret >= 0) + return ret; + + if (errno != EINTR) + return -1; + + now = GetTickCount64(); + } while (now < dueTime); + +#else + do + { + struct timeval staticTimeout; + struct timeval* timeout; + + fd_set* rset = NULL; + fd_set* wset = NULL; + + if (dwMilliseconds == INFINITE) + { + timeout = NULL; + } + else + { + long waitTime = (long)(dueTime - now); + + timeout = &staticTimeout; + timeout->tv_sec = waitTime / 1000; + timeout->tv_usec = (waitTime % 1000) * 1000; + } + + if (set->nread) + { + rset = &set->rset; + memcpy(rset, &set->rset_base, sizeof(*rset)); + } + + if (set->nwrite) + { + wset = &set->wset; + memcpy(wset, &set->wset_base, sizeof(*wset)); + } + + ret = select(set->maxFd + 1, rset, wset, NULL, timeout); + if (ret >= 0) + return ret; + + if (errno != EINTR) + return -1; + + now = GetTickCount64(); + + } while (now < dueTime); + + FD_ZERO(&set->rset); + FD_ZERO(&set->wset); #endif - return ret; + return 0; /* timeout */ } BOOL pollset_isSignaled(WINPR_POLL_SET* set, size_t idx) @@ -166,3 +220,4 @@ BOOL pollset_isSignaled(WINPR_POLL_SET* set, size_t idx) return FALSE; #endif } +#endif diff --git a/winpr/libwinpr/synch/pollset.h b/winpr/libwinpr/synch/pollset.h index a12c43486..584355424 100644 --- a/winpr/libwinpr/synch/pollset.h +++ b/winpr/libwinpr/synch/pollset.h @@ -26,6 +26,8 @@ #include "config.h" #endif +#ifndef _WIN32 + #ifdef HAVE_POLL_H #include #else @@ -46,7 +48,9 @@ struct winpr_poll_set BOOL isStatic; #else FdIndex* fdIndex; + fd_set rset_base; fd_set rset; + fd_set wset_base; fd_set wset; int nread, nwrite; int maxFd; @@ -64,4 +68,6 @@ BOOL pollset_add(WINPR_POLL_SET* set, int fd, ULONG mode); int pollset_poll(WINPR_POLL_SET* set, DWORD dwMilliseconds); BOOL pollset_isSignaled(WINPR_POLL_SET* set, size_t idx); +#endif + #endif /* WINPR_LIBWINPR_SYNCH_POLLSET_H_ */ diff --git a/winpr/libwinpr/synch/sleep.c b/winpr/libwinpr/synch/sleep.c index 23c819521..05e7c53c5 100644 --- a/winpr/libwinpr/synch/sleep.c +++ b/winpr/libwinpr/synch/sleep.c @@ -26,6 +26,9 @@ #include #include "../log.h" +#include "../thread/apc.h" +#include "../thread/thread.h" +#include "../synch/pollset.h" #define TAG WINPR_TAG("synch.sleep") @@ -47,11 +50,64 @@ VOID Sleep(DWORD dwMilliseconds) DWORD SleepEx(DWORD dwMilliseconds, BOOL bAlertable) { - /* TODO: Implement bAlertable support */ - if (bAlertable) - WLog_WARN(TAG, "%s does not support bAlertable", __FUNCTION__); - Sleep(dwMilliseconds); - return 0; + WINPR_THREAD* thread = winpr_GetCurrentThread(); + WINPR_POLL_SET pollset; + int status; + DWORD ret = WAIT_FAILED; + BOOL autoSignalled; + + if (!thread) + { + WLog_ERR(TAG, "unable to retrieve currentThread"); + return WAIT_FAILED; + } + + /* treat re-entrancy if a completion is calling us */ + if (thread->apc.treatingCompletions) + bAlertable = FALSE; + + if (!bAlertable || !thread->apc.length) + { + usleep(dwMilliseconds * 1000); + return 0; + } + + if (!pollset_init(&pollset, thread->apc.length)) + { + WLog_ERR(TAG, "unable to initialize pollset"); + return WAIT_FAILED; + } + + if (!apc_collectFds(thread, &pollset, &autoSignalled)) + { + WLog_ERR(TAG, "unable to APC file descriptors"); + goto out; + } + + if (!autoSignalled) + { + /* we poll and wait only if no APC member is ready */ + status = pollset_poll(&pollset, dwMilliseconds); + if (status < 0) + { + WLog_ERR(TAG, "polling of apc fds failed"); + goto out; + } + } + + if (apc_executeCompletions(thread, &pollset, 0)) + { + ret = WAIT_IO_COMPLETION; + } + else + { + /* according to the spec return value is 0 see + * https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-sleepex*/ + ret = 0; + } +out: + pollset_uninit(&pollset); + return ret; } #endif diff --git a/winpr/libwinpr/synch/synch.h b/winpr/libwinpr/synch/synch.h index 5bb64ea93..9d0b2e799 100644 --- a/winpr/libwinpr/synch/synch.h +++ b/winpr/libwinpr/synch/synch.h @@ -29,6 +29,8 @@ #include #include "../handle/handle.h" +#include "../thread/apc.h" +#include "event.h" #ifndef _WIN32 @@ -65,26 +67,22 @@ struct winpr_semaphore }; typedef struct winpr_semaphore WINPR_SEMAPHORE; -struct winpr_event -{ - WINPR_HANDLE_DEF(); - - int pipe_fd[2]; - BOOL bAttached; - BOOL bManualReset; - char* name; -}; -typedef struct winpr_event WINPR_EVENT; - #ifdef HAVE_SYS_TIMERFD_H #include #include #include #include -#endif +#define TIMER_IMPL_TIMERFD -#if defined(__APPLE__) +#elif defined(WITH_POSIX_TIMER) +#include +#define TIMER_IMPL_POSIX + +#elif defined(__APPLE__) +#define TIMER_IMPL_DISPATCH #include +#else +#error missing timer implementation #endif struct winpr_timer @@ -98,17 +96,25 @@ struct winpr_timer PTIMERAPCROUTINE pfnCompletionRoutine; LPVOID lpArgToCompletionRoutine; -#ifdef WITH_POSIX_TIMER +#ifdef TIMER_IMPL_TIMERFD + struct itimerspec timeout; +#endif + +#ifdef TIMER_IMPL_POSIX + WINPR_EVENT_IMPL event; timer_t tid; struct itimerspec timeout; #endif -#if defined(__APPLE__) + +#ifdef TIMER_IMPL_DISPATCH + WINPR_EVENT_IMPL event; dispatch_queue_t queue; dispatch_source_t source; - int pipe[2]; BOOL running; #endif char* name; + + WINPR_APC_ITEM apcItem; }; typedef struct winpr_timer WINPR_TIMER; diff --git a/winpr/libwinpr/synch/test/CMakeLists.txt b/winpr/libwinpr/synch/test/CMakeLists.txt index 5d74ea4e7..645d631eb 100644 --- a/winpr/libwinpr/synch/test/CMakeLists.txt +++ b/winpr/libwinpr/synch/test/CMakeLists.txt @@ -15,7 +15,8 @@ set(${MODULE_PREFIX}_TESTS TestSynchMultipleThreads.c TestSynchTimerQueue.c TestSynchWaitableTimer.c - TestSynchWaitableTimerAPC.c) + TestSynchWaitableTimerAPC.c + TestSynchAPC.c) create_test_sourcelist(${MODULE_PREFIX}_SRCS ${${MODULE_PREFIX}_DRIVER} diff --git a/winpr/libwinpr/synch/test/TestSynchAPC.c b/winpr/libwinpr/synch/test/TestSynchAPC.c new file mode 100644 index 000000000..ff372d61f --- /dev/null +++ b/winpr/libwinpr/synch/test/TestSynchAPC.c @@ -0,0 +1,174 @@ +/** + * FreeRDP: A Remote Desktop Protocol Implementation + * TestSyncAPC + * + * Copyright 2021 David Fort + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include + +typedef struct +{ + BOOL error; + BOOL called; +} UserApcArg; + +void CALLBACK userApc(ULONG_PTR arg) +{ + UserApcArg* userArg = (UserApcArg*)arg; + userArg->called = TRUE; +} + +static DWORD WINAPI uncleanThread(LPVOID lpThreadParameter) +{ + /* this thread post an APC that will never get executed */ + UserApcArg* userArg = (UserApcArg*)lpThreadParameter; + if (!QueueUserAPC((PAPCFUNC)userApc, _GetCurrentThread(), (ULONG_PTR)lpThreadParameter)) + { + userArg->error = TRUE; + return 1; + } + + return 0; +} + +static DWORD WINAPI cleanThread(LPVOID lpThreadParameter) +{ + Sleep(500); + + SleepEx(500, TRUE); + return 0; +} + +typedef struct +{ + HANDLE timer1; + DWORD timer1Calls; + HANDLE timer2; + DWORD timer2Calls; + BOOL endTest; +} UncleanCloseData; + +static VOID CALLBACK Timer1APCProc(LPVOID lpArg, DWORD dwTimerLowValue, DWORD dwTimerHighValue) +{ + UncleanCloseData* data = (UncleanCloseData*)lpArg; + data->timer1Calls++; + CloseHandle(data->timer2); + data->endTest = TRUE; +} + +static VOID CALLBACK Timer2APCProc(LPVOID lpArg, DWORD dwTimerLowValue, DWORD dwTimerHighValue) +{ + UncleanCloseData* data = (UncleanCloseData*)lpArg; + data->timer2Calls++; +} + +static DWORD /*WINAPI*/ closeHandleTest(LPVOID lpThreadParameter) +{ + LARGE_INTEGER dueTime; + UncleanCloseData* data = (UncleanCloseData*)lpThreadParameter; + data->endTest = FALSE; + + dueTime.QuadPart = -500; + if (!SetWaitableTimer(data->timer1, &dueTime, 0, Timer1APCProc, lpThreadParameter, FALSE)) + return 1; + + dueTime.QuadPart = -900; + if (!SetWaitableTimer(data->timer2, &dueTime, 0, Timer2APCProc, lpThreadParameter, FALSE)) + return 1; + + while (!data->endTest) + { + SleepEx(100, TRUE); + } + return 0; +} + +int TestSynchAPC(int argc, char* argv[]) +{ + HANDLE thread = NULL; + UserApcArg userApcArg; + UncleanCloseData uncleanCloseData; + + userApcArg.error = FALSE; + userApcArg.called = FALSE; + + WINPR_UNUSED(argc); + WINPR_UNUSED(argv); + + /* first post an APC and check it is executed during a SleepEx */ + if (!QueueUserAPC((PAPCFUNC)userApc, _GetCurrentThread(), (ULONG_PTR)&userApcArg)) + return 1; + + if (SleepEx(100, FALSE) != 0) + return 2; + + if (SleepEx(100, TRUE) != WAIT_IO_COMPLETION) + return 3; + + if (!userApcArg.called) + return 4; + + userApcArg.called = FALSE; + + /* test that the APC is cleaned up even when not called */ + thread = CreateThread(NULL, 0, uncleanThread, &userApcArg, 0, NULL); + if (!thread) + return 10; + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + + if (userApcArg.called || userApcArg.error) + return 11; + + /* test a remote APC queuing */ + thread = CreateThread(NULL, 0, cleanThread, &userApcArg, 0, NULL); + if (!thread) + return 20; + + if (!QueueUserAPC((PAPCFUNC)userApc, thread, (ULONG_PTR)&userApcArg)) + return 21; + + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + + if (!userApcArg.called) + return 22; + +#if 0 + /* test cleanup of timer completions */ + memset(&uncleanCloseData, 0, sizeof(uncleanCloseData)); + uncleanCloseData.timer1 = CreateWaitableTimerA(NULL, FALSE, NULL); + if (!uncleanCloseData.timer1) + return 31; + + uncleanCloseData.timer2 = CreateWaitableTimerA(NULL, FALSE, NULL); + if (!uncleanCloseData.timer2) + return 32; + + thread = CreateThread(NULL, 0, closeHandleTest, &uncleanCloseData, 0, NULL); + if (!thread) + return 33; + + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + + if (uncleanCloseData.timer1Calls != 1 || uncleanCloseData.timer2Calls != 0) + return 34; + CloseHandle(uncleanCloseData.timer1); +#endif + return 0; +} diff --git a/winpr/libwinpr/synch/test/TestSynchMultipleThreads.c b/winpr/libwinpr/synch/test/TestSynchMultipleThreads.c index 0fa567d74..dda6e9b35 100644 --- a/winpr/libwinpr/synch/test/TestSynchMultipleThreads.c +++ b/winpr/libwinpr/synch/test/TestSynchMultipleThreads.c @@ -7,9 +7,7 @@ static DWORD WINAPI test_thread(LPVOID arg) { - long timeout = rand(); - timeout %= 1000; - timeout += 100; + long timeout = 100 + (rand() % 1000); Sleep(timeout); ExitThread(0); return 0; @@ -54,14 +52,16 @@ int TestSynchMultipleThreads(int argc, char* argv[]) #define THREADS 24 DWORD rc = 0, ev, i; HANDLE threads[THREADS]; + DWORD ret; /* WaitForAll, timeout */ if (start_threads(THREADS, threads)) return 1; - if (WaitForMultipleObjects(THREADS, threads, TRUE, 50) != WAIT_TIMEOUT) + ret = WaitForMultipleObjects(THREADS, threads, TRUE, 50); + if (ret != WAIT_TIMEOUT) { - printf("WaitForMultipleObjects bWaitAll, timeout 50 failed\n"); + printf("WaitForMultipleObjects bWaitAll, timeout 50 failed, ret=%d\n", ret); rc = 2; } @@ -82,7 +82,6 @@ int TestSynchMultipleThreads(int argc, char* argv[]) return 5; ev = WaitForMultipleObjects(THREADS, threads, FALSE, INFINITE); - if (ev > (WAIT_OBJECT_0 + THREADS)) { printf("WaitForMultipleObjects INFINITE failed\n"); @@ -105,9 +104,10 @@ int TestSynchMultipleThreads(int argc, char* argv[]) if (start_threads(THREADS, threads)) return 9; - if (WaitForMultipleObjects(THREADS, threads, FALSE, 50) != WAIT_TIMEOUT) + ret = WaitForMultipleObjects(THREADS, threads, FALSE, 50); + if (ret != WAIT_TIMEOUT) { - printf("WaitForMultipleObjects timeout 50 failed\n"); + printf("WaitForMultipleObjects timeout 50 failed, ret=%d\n", ret); rc = 10; } @@ -129,9 +129,10 @@ int TestSynchMultipleThreads(int argc, char* argv[]) for (i = 0; i < THREADS; i++) { - if (WaitForMultipleObjects(THREADS, threads, FALSE, 0) != WAIT_TIMEOUT) + ret = WaitForMultipleObjects(THREADS, threads, FALSE, 0); + if (ret != WAIT_TIMEOUT) { - printf("WaitForMultipleObjects timeout 50 failed\n"); + printf("WaitForMultipleObjects timeout 50 failed, ret=%d\n", ret); rc = 15; } } diff --git a/winpr/libwinpr/synch/test/TestSynchWaitableTimerAPC.c b/winpr/libwinpr/synch/test/TestSynchWaitableTimerAPC.c index c9af7aab4..8b6e746b3 100644 --- a/winpr/libwinpr/synch/test/TestSynchWaitableTimerAPC.c +++ b/winpr/libwinpr/synch/test/TestSynchWaitableTimerAPC.c @@ -35,6 +35,7 @@ static VOID CALLBACK TimerAPCProc(LPVOID lpArg, DWORD dwTimerLowValue, DWORD dwT int TestSynchWaitableTimerAPC(int argc, char* argv[]) { int status = -1; + DWORD rc; HANDLE hTimer = NULL; BOOL bSuccess; LARGE_INTEGER due; @@ -50,40 +51,24 @@ int TestSynchWaitableTimerAPC(int argc, char* argv[]) } hTimer = CreateWaitableTimer(NULL, FALSE, NULL); - if (!hTimer) goto cleanup; - due.QuadPart = -15000000LL; /* 1.5 seconds */ + due.QuadPart = -1000 * 1000LL; /* 1 seconds */ apcData.StartTime = GetTickCount(); - bSuccess = SetWaitableTimer(hTimer, &due, 2000, TimerAPCProc, &apcData, FALSE); + bSuccess = SetWaitableTimer(hTimer, &due, 100, TimerAPCProc, &apcData, FALSE); if (!bSuccess) goto cleanup; - /** - * See Remarks at - * https://msdn.microsoft.com/en-us/library/windows/desktop/ms686786(v=vs.85).aspx The - * SetWaitableTimer completion routine is executed by the thread that activates the timer using - * SetWaitableTimer. However, the thread must be in an ALERTABLE state. - */ - - /** - * Note: On WIN32 we need to use WaitForSingleObjectEx with parameter bAlertable = TRUE - * However, WinPR currently (May 2016) does not have a working WaitForSingleObjectEx - *implementation but its non-WIN32 WaitForSingleObject implementations seem to be alertable by - *WinPR's timer implementations. - **/ + /* nothing shall happen after 1.2 second, because thread is not in alertable state */ + rc = WaitForSingleObject(g_Event, 1200); + if (rc != WAIT_TIMEOUT) + goto cleanup; for (;;) { - DWORD rc; -#ifdef _WIN32 rc = WaitForSingleObjectEx(g_Event, INFINITE, TRUE); -#else - rc = WaitForSingleObject(g_Event, INFINITE); -#endif - if (rc == WAIT_OBJECT_0) break; diff --git a/winpr/libwinpr/synch/timer.c b/winpr/libwinpr/synch/timer.c index b1e2f8b73..bb2fb5444 100644 --- a/winpr/libwinpr/synch/timer.c +++ b/winpr/libwinpr/synch/timer.c @@ -3,6 +3,7 @@ * Synchronization Functions * * Copyright 2012 Marc-Andre Moreau + * Copyright 2021 David Fort * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,11 +35,13 @@ #include #endif +#include "event.h" #include "synch.h" #ifndef _WIN32 #include "../handle/handle.h" +#include "../thread/thread.h" #include "../log.h" #define TAG WINPR_TAG("synch.timer") @@ -80,11 +83,15 @@ static DWORD TimerCleanupHandle(HANDLE handle) if (timer->bManualReset) return WAIT_OBJECT_0; - length = read(timer->fd, (void*)&expirations, sizeof(UINT64)); +#ifdef TIMER_IMPL_TIMERFD + do + { + length = read(timer->fd, (void*)&expirations, sizeof(UINT64)); + } while (length < 0 && errno == EINTR); if (length != 8) { - if (length == -1) + if (length < 0) { switch (errno) { @@ -105,10 +112,31 @@ static DWORD TimerCleanupHandle(HANDLE handle) return WAIT_FAILED; } +#else + if (!winpr_event_reset(&timer->event)) + { + WLog_ERR(TAG, "timer reset() failure"); + return WAIT_FAILED; + } +#endif return WAIT_OBJECT_0; } +typedef struct +{ + WINPR_APC_ITEM apcItem; + WINPR_TIMER* timer; +} TimerDeleter; + +static void TimerPostDelete_APC(LPVOID arg) +{ + TimerDeleter* deleter = (TimerDeleter*)arg; + free(deleter->timer); + deleter->apcItem.markedForFree = TRUE; + deleter->apcItem.markedForRemove = TRUE; +} + BOOL TimerCloseHandle(HANDLE handle) { WINPR_TIMER* timer; @@ -117,96 +145,97 @@ BOOL TimerCloseHandle(HANDLE handle) if (!TimerIsHandled(handle)) return FALSE; - if (!timer->lpArgToCompletionRoutine) - { -#ifdef HAVE_SYS_TIMERFD_H - - if (timer->fd != -1) - close(timer->fd); - +#ifdef TIMER_IMPL_TIMERFD + if (timer->fd != -1) + close(timer->fd); #endif - } - else - { -#ifdef WITH_POSIX_TIMER - timer_delete(timer->tid); -#endif - } -#if defined(__APPLE__) +#ifdef TIMER_IMPL_POSIX + timer_delete(timer->tid); +#endif + +#ifdef TIMER_IMPL_DISPATCH dispatch_release(timer->queue); dispatch_release(timer->source); - - if (timer->pipe[0] != -1) - close(timer->pipe[0]); - - if (timer->pipe[1] != -1) - close(timer->pipe[1]); - #endif + +#if defined(TIMER_IMPL_POSIX) || defined(TIMER_IMPL_DISPATCH) + winpr_event_uninit(&timer->event); +#endif + free(timer->name); + if (timer->apcItem.linked) + { + TimerDeleter* deleter; + WINPR_APC_ITEM* apcItem; + + switch (apc_remove(&timer->apcItem)) + { + case APC_REMOVE_OK: + break; + case APC_REMOVE_DELAY_FREE: + { + WINPR_THREAD* thread = winpr_GetCurrentThread(); + if (!thread) + return FALSE; + + deleter = calloc(1, sizeof(*deleter)); + if (!deleter) + { + WLog_ERR(TAG, "unable to allocate a timer deleter"); + return TRUE; + } + + deleter->timer = timer; + apcItem = &deleter->apcItem; + apcItem->type = APC_TYPE_HANDLE_FREE; + apcItem->alwaysSignaled = TRUE; + apcItem->completion = TimerPostDelete_APC; + apcItem->completionArgs = deleter; + apc_register(thread, apcItem); + return TRUE; + } + case APC_REMOVE_ERROR: + default: + WLog_ERR(TAG, "unable to remove timer from APC list"); + break; + } + } + free(timer); return TRUE; } -#ifdef WITH_POSIX_TIMER +#ifdef TIMER_IMPL_POSIX -static BOOL g_WaitableTimerSignalHandlerInstalled = FALSE; - -static void WaitableTimerHandler(void* arg) -{ - WINPR_TIMER* timer = (WINPR_TIMER*)arg; - - if (!timer) - return; - - if (timer->pfnCompletionRoutine) - { - timer->pfnCompletionRoutine(timer->lpArgToCompletionRoutine, 0, 0); - - if (timer->lPeriod) - { - timer->timeout.it_interval.tv_sec = (timer->lPeriod / 1000); /* seconds */ - timer->timeout.it_interval.tv_nsec = - ((timer->lPeriod % 1000) * 1000000); /* nanoseconds */ - - if ((timer_settime(timer->tid, 0, &(timer->timeout), NULL)) != 0) - { - WLog_ERR(TAG, "timer_settime"); - } - } - } -} static void WaitableTimerSignalHandler(int signum, siginfo_t* siginfo, void* arg) { WINPR_TIMER* timer = siginfo->si_value.sival_ptr; + UINT64 data = 1; WINPR_UNUSED(arg); if (!timer || (signum != SIGALRM)) return; - WaitableTimerHandler(timer); + if (!winpr_event_set(&timer->event)) + WLog_ERR(TAG, "error when notifying event"); } -static int InstallWaitableTimerSignalHandler(void) +static INIT_ONCE TimerSignalHandler_InitOnce = INIT_ONCE_STATIC_INIT; + +static BOOL InstallTimerSignalHandler(PINIT_ONCE InitOnce, PVOID Parameter, PVOID* Context) { - if (!g_WaitableTimerSignalHandlerInstalled) - { - struct sigaction action; - sigemptyset(&action.sa_mask); - sigaddset(&action.sa_mask, SIGALRM); - action.sa_flags = SA_RESTART | SA_SIGINFO; - action.sa_sigaction = WaitableTimerSignalHandler; - sigaction(SIGALRM, &action, NULL); - g_WaitableTimerSignalHandlerInstalled = TRUE; - } - - return 0; + struct sigaction action; + sigemptyset(&action.sa_mask); + sigaddset(&action.sa_mask, SIGALRM); + action.sa_flags = SA_RESTART | SA_SIGINFO; + action.sa_sigaction = WaitableTimerSignalHandler; + sigaction(SIGALRM, &action, NULL); + return TRUE; } - #endif -#if defined(__APPLE__) +#ifdef TIMER_IMPL_DISPATCH static void WaitableTimerHandler(void* arg) { UINT64 data = 1; @@ -215,10 +244,7 @@ static void WaitableTimerHandler(void* arg) if (!timer) return; - if (timer->pfnCompletionRoutine) - timer->pfnCompletionRoutine(timer->lpArgToCompletionRoutine, 0, 0); - - if (write(timer->pipe[1], &data, sizeof(data)) != sizeof(data)) + if (!winpr_event_set(&timer->event)) WLog_ERR(TAG, "failed to write to pipe"); if (timer->lPeriod == 0) @@ -235,47 +261,45 @@ static int InitializeWaitableTimer(WINPR_TIMER* timer) { int result = 0; - if (!timer->lpArgToCompletionRoutine) +#ifdef TIMER_IMPL_TIMERFD + timer->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (timer->fd <= 0) + return -1; +#elif defined(TIMER_IMPL_POSIX) + struct sigevent sigev; + InitOnceExecuteOnce(&TimerSignalHandler_InitOnce, InstallTimerSignalHandler, NULL, NULL); + ZeroMemory(&sigev, sizeof(struct sigevent)); + sigev.sigev_notify = SIGEV_SIGNAL; + sigev.sigev_signo = SIGALRM; + sigev.sigev_value.sival_ptr = (void*)timer; + + if ((timer_create(CLOCK_MONOTONIC, &sigev, &(timer->tid))) != 0) { -#ifdef HAVE_SYS_TIMERFD_H - timer->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); - - if (timer->fd <= 0) - return -1; - -#elif defined(__APPLE__) -#else - WLog_ERR(TAG, "%s: os specific implementation is missing", __FUNCTION__); - result = -1; -#endif + WLog_ERR(TAG, "timer_create"); + return -1; } - else - { -#ifdef WITH_POSIX_TIMER - struct sigevent sigev; - InstallWaitableTimerSignalHandler(); - ZeroMemory(&sigev, sizeof(struct sigevent)); - sigev.sigev_notify = SIGEV_SIGNAL; - sigev.sigev_signo = SIGALRM; - sigev.sigev_value.sival_ptr = (void*)timer; - - if ((timer_create(CLOCK_MONOTONIC, &sigev, &(timer->tid))) != 0) - { - WLog_ERR(TAG, "timer_create"); - return -1; - } - -#elif defined(__APPLE__) -#else - WLog_ERR(TAG, "%s: os specific implementation is missing", __FUNCTION__); - result = -1; +#elif !defined(TIMER_IMPL_DISPATCH) + WLog_ERR(TAG, "%s: os specific implementation is missing", __FUNCTION__); + result = -1; #endif - } timer->bInit = TRUE; return result; } +static BOOL timer_drain_fd(int fd) +{ + UINT64 expr; + int ret; + + do + { + ret = read(fd, &expr, sizeof(expr)); + } while (ret < 0 && errno == EINTR); + + return ret >= 0; +} + static HANDLE_OPS ops = { TimerIsHandled, TimerCloseHandle, TimerGetFd, TimerCleanupHandle, NULL, NULL, @@ -317,11 +341,12 @@ HANDLE CreateWaitableTimerA(LPSECURITY_ATTRIBUTES lpTimerAttributes, BOOL bManua timer->name = strdup(lpTimerName); timer->ops = &ops; -#if defined(__APPLE__) - - if (pipe(timer->pipe) != 0) +#if defined(TIMER_IMPL_DISPATCH) || defined(TIMER_IMPL_POSIX) + if (!winpr_event_init(&timer->event)) goto fail; +#endif +#if defined(TIMER_IMPL_DISPATCH) timer->queue = dispatch_queue_create(TAG, DISPATCH_QUEUE_SERIAL); if (!timer->queue) @@ -334,16 +359,12 @@ HANDLE CreateWaitableTimerA(LPSECURITY_ATTRIBUTES lpTimerAttributes, BOOL bManua dispatch_set_context(timer->source, timer); dispatch_source_set_event_handler_f(timer->source, WaitableTimerHandler); - timer->fd = timer->pipe[0]; - - if (fcntl(timer->fd, F_SETFL, O_NONBLOCK) < 0) - goto fail; - #endif } return handle; -#if defined(__APPLE__) + +#if defined(TIMER_IMPL_DISPATCH) || defined(TIMER_IMPL_POSIX) fail: TimerCloseHandle(handle); return NULL; @@ -394,6 +415,36 @@ HANDLE CreateWaitableTimerExW(LPSECURITY_ATTRIBUTES lpTimerAttributes, LPCWSTR l return handle; } +static void timerAPC(LPVOID arg) +{ + WINPR_TIMER* timer = (WINPR_TIMER*)arg; + + if (!timer->lPeriod) + { + /* this is a one time shot timer with a completion, let's remove us from + the APC list */ + switch (apc_remove(&timer->apcItem)) + { + case APC_REMOVE_OK: + case APC_REMOVE_DELAY_FREE: + break; + case APC_REMOVE_ERROR: + default: + WLog_ERR(TAG, "error removing the APC routine"); + } + } + + if (timer->pfnCompletionRoutine) + timer->pfnCompletionRoutine(timer->lpArgToCompletionRoutine, 0, 0); + +#ifdef TIMER_IMPL_TIMERFD + while (timer_drain_fd(timer->fd)) + ; +#else + winpr_event_reset(&timer->event); +#endif +} + BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPeriod, PTIMERAPCROUTINE pfnCompletionRoutine, LPVOID lpArgToCompletionRoutine, BOOL fResume) @@ -401,13 +452,9 @@ BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPerio ULONG Type; WINPR_HANDLE* Object; WINPR_TIMER* timer; -#if defined(WITH_POSIX_TIMER) || defined(__APPLE__) LONGLONG seconds = 0; LONGLONG nanoseconds = 0; -#ifdef HAVE_SYS_TIMERFD_H int status = 0; -#endif /* HAVE_SYS_TIMERFD_H */ -#endif /* WITH_POSIX_TIMER */ if (!winpr_Handle_GetInfo(hTimer, &Type, &Object)) return FALSE; @@ -438,7 +485,7 @@ BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPerio return FALSE; } -#ifdef WITH_POSIX_TIMER +#if defined(TIMER_IMPL_TIMERFD) || defined(TIMER_IMPL_POSIX) ZeroMemory(&(timer->timeout), sizeof(struct itimerspec)); if (lpDueTime->QuadPart < 0) @@ -475,30 +522,24 @@ BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPerio timer->timeout.it_value.tv_nsec = timer->timeout.it_interval.tv_nsec; /* nanoseconds */ } - if (!timer->pfnCompletionRoutine) +#ifdef TIMER_IMPL_TIMERFD + status = timerfd_settime(timer->fd, 0, &(timer->timeout), NULL); + if (status) { -#ifdef HAVE_SYS_TIMERFD_H - status = timerfd_settime(timer->fd, 0, &(timer->timeout), NULL); - - if (status) - { - WLog_ERR(TAG, "timerfd_settime failure: %d", status); - return FALSE; - } - + WLog_ERR(TAG, "timerfd_settime failure: %d", status); + return FALSE; + } +#else + status = timer_settime(timer->tid, 0, &(timer->timeout), NULL); + if (status != 0) + { + WLog_ERR(TAG, "timer_settime failure"); + return FALSE; + } +#endif #endif - } - else - { - if ((timer_settime(timer->tid, 0, &(timer->timeout), NULL)) != 0) - { - WLog_ERR(TAG, "timer_settime"); - return FALSE; - } - } - -#elif defined(__APPLE__) +#ifdef TIMER_IMPL_DISPATCH if (lpDueTime->QuadPart < 0) { LONGLONG due = lpDueTime->QuadPart * (-1); @@ -516,12 +557,9 @@ BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPerio return FALSE; } + if (!winpr_event_reset(&timer->event)) { - /* Clean out old data from FD */ - BYTE buffer[32]; - - while (read(timer->fd, buffer, sizeof(buffer)) > 0) - ; + WLog_ERR(TAG, "error when resetting timer event"); } { @@ -538,8 +576,36 @@ BOOL SetWaitableTimer(HANDLE hTimer, const LARGE_INTEGER* lpDueTime, LONG lPerio dispatch_resume(timer->source); timer->running = TRUE; } - #endif + + if (pfnCompletionRoutine) + { + WINPR_APC_ITEM* apcItem = &timer->apcItem; + + /* install our APC routine that will call the completion */ + apcItem->type = APC_TYPE_TIMER; + apcItem->alwaysSignaled = FALSE; + apcItem->pollFd = timer->fd; + apcItem->pollMode = WINPR_FD_READ; + apcItem->completion = timerAPC; + apcItem->completionArgs = timer; + + if (!apcItem->linked) + { + WINPR_THREAD* thread = winpr_GetCurrentThread(); + if (!thread) + return FALSE; + + apc_register(thread, apcItem); + } + } + else + { + if (timer->apcItem.linked) + { + apc_remove(&timer->apcItem); + } + } return TRUE; } diff --git a/winpr/libwinpr/synch/wait.c b/winpr/libwinpr/synch/wait.c index b10705134..901d03fc1 100644 --- a/winpr/libwinpr/synch/wait.c +++ b/winpr/libwinpr/synch/wait.c @@ -26,20 +26,13 @@ #include #endif -#ifdef HAVE_POLL_H -#include -#else -#ifndef _WIN32 -#include -#endif -#endif - #include #include #include #include #include +#include #include "synch.h" #include "pollset.h" @@ -122,6 +115,11 @@ int _mach_safe_clock_gettime(int clk_id, struct timespec* t) #endif +/* Drop in replacement for pthread_mutex_timedlock + */ +#if !defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) +#include + static long long ts_difftime(const struct timespec* o, const struct timespec* n) { long long oldValue = o->tv_sec * 1000000000LL + o->tv_nsec; @@ -129,11 +127,6 @@ static long long ts_difftime(const struct timespec* o, const struct timespec* n) return newValue - oldValue; } -/* Drop in replacement for pthread_mutex_timedlock - */ -#if !defined(HAVE_PTHREAD_MUTEX_TIMEDLOCK) -#include - static int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec* timeout) { struct timespec timenow; @@ -162,7 +155,6 @@ static int pthread_mutex_timedlock(pthread_mutex_t* mutex, const struct timespec } #endif - static void ts_add_ms(struct timespec* ts, DWORD dwMilliseconds) { ts->tv_sec += dwMilliseconds / 1000L; @@ -171,11 +163,11 @@ static void ts_add_ms(struct timespec* ts, DWORD dwMilliseconds) ts->tv_nsec = ts->tv_nsec % 1000000000L; } - -DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) +DWORD WaitForSingleObjectEx(HANDLE hHandle, DWORD dwMilliseconds, BOOL bAlertable) { ULONG Type; WINPR_HANDLE* Object; + WINPR_POLL_SET pollset; if (!winpr_Handle_GetInfo(hHandle, &Type, &Object)) { @@ -225,9 +217,30 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) else { int status; - WINPR_POLL_SET pollset; - int fd = winpr_Handle_getFd(Object); + WINPR_THREAD* thread; + BOOL isSet = FALSE; + size_t extraFds = 0; + DWORD ret; + BOOL autoSignaled = FALSE; + if (bAlertable) + { + thread = (WINPR_THREAD*)_GetCurrentThread(); + if (!thread) + { + WLog_ERR(TAG, "failed to retrieve currentThread"); + return WAIT_FAILED; + } + + /* treat reentrancy, we can't switch to alertable state when we're already + treating completions */ + if (thread->apc.treatingCompletions) + bAlertable = FALSE; + else + extraFds = thread->apc.length; + } + + int fd = winpr_Handle_getFd(Object); if (fd < 0) { WLog_ERR(TAG, "winpr_Handle_getFd did not return a fd!"); @@ -235,7 +248,7 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) return WAIT_FAILED; } - if (!pollset_init(&pollset, 1)) + if (!pollset_init(&pollset, 1 + extraFds)) { WLog_ERR(TAG, "unable to initialize pollset"); SetLastError(ERROR_INTERNAL_ERROR); @@ -244,59 +257,67 @@ DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) if (!pollset_add(&pollset, fd, Object->Mode)) { - pollset_uninit(&pollset); - return WAIT_FAILED; + WLog_ERR(TAG, "unable to add fd in pollset"); + goto out; } - status = pollset_poll(&pollset, dwMilliseconds); + if (bAlertable && !apc_collectFds(thread, &pollset, &autoSignaled)) + { + WLog_ERR(TAG, "unable to collect APC fds"); + goto out; + } + + if (!autoSignaled) + { + status = pollset_poll(&pollset, dwMilliseconds); + if (status < 0) + { + WLog_ERR(TAG, "waitOnFd() failure [%d] %s", errno, strerror(errno)); + goto out; + } + } + + ret = WAIT_TIMEOUT; + if (bAlertable && apc_executeCompletions(thread, &pollset, 1)) + ret = WAIT_IO_COMPLETION; + + isSet = pollset_isSignaled(&pollset, 0); pollset_uninit(&pollset); - if (status < 0) - { - WLog_ERR(TAG, "waitOnFd() failure [%d] %s", errno, strerror(errno)); - SetLastError(ERROR_INTERNAL_ERROR); - return WAIT_FAILED; - } - - if (status != 1) - return WAIT_TIMEOUT; + if (!isSet) + return ret; return winpr_Handle_cleanup(Object); } +out: + pollset_uninit(&pollset); SetLastError(ERROR_INTERNAL_ERROR); return WAIT_FAILED; } -DWORD WaitForSingleObjectEx(HANDLE hHandle, DWORD dwMilliseconds, BOOL bAlertable) +DWORD WaitForSingleObject(HANDLE hHandle, DWORD dwMilliseconds) { - if (bAlertable) - { - /* TODO: Implement */ - WLog_ERR(TAG, "%s: Not implemented: bAlertable", __FUNCTION__); - return WAIT_FAILED; - } - return WaitForSingleObject(hHandle, dwMilliseconds); + return WaitForSingleObjectEx(hHandle, dwMilliseconds, FALSE); } -DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAll, - DWORD dwMilliseconds) +DWORD WaitForMultipleObjectsEx(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAll, + DWORD dwMilliseconds, BOOL bAlertable) { - struct timespec starttime; - struct timespec timenow; - unsigned long long diff; DWORD signalled; DWORD polled; DWORD* poll_map = NULL; - BOOL* signalled_idx = NULL; + BOOL* signalled_handles = NULL; int fd = -1; DWORD index; int status; ULONG Type; - BOOL signal_handled = FALSE; WINPR_HANDLE* Object; + WINPR_THREAD* thread; WINPR_POLL_SET pollset; DWORD ret = WAIT_FAILED; + size_t extraFds = 0; + UINT64 now, dueTime; if (!nCount || (nCount > MAXIMUM_WAIT_OBJECTS)) { @@ -304,34 +325,55 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl return WAIT_FAILED; } - if (!pollset_init(&pollset, nCount)) + if (bAlertable) { - WLog_ERR(TAG, "unable to initialize pollset for nCount=%" PRIu32 "", nCount); + thread = winpr_GetCurrentThread(); + if (!thread) + return WAIT_FAILED; + + /* treat reentrancy, we can't switch to alertable state when we're already + treating completions */ + if (thread->apc.treatingCompletions) + bAlertable = FALSE; + else + extraFds = thread->apc.length; + } + + if (!pollset_init(&pollset, nCount + extraFds)) + { + WLog_ERR(TAG, "unable to initialize pollset for nCount=%" PRIu32 " extraCount=%" PRIu32 "", + nCount, extraFds); return WAIT_FAILED; } if (bWaitAll) { - signalled_idx = alloca(nCount * sizeof(BOOL)); - memset(signalled_idx, FALSE, nCount * sizeof(BOOL)); + signalled_handles = alloca(nCount * sizeof(BOOL)); + memset(signalled_handles, FALSE, nCount * sizeof(BOOL)); + poll_map = alloca(nCount * sizeof(DWORD)); memset(poll_map, 0, nCount * sizeof(DWORD)); } signalled = 0; - if (bWaitAll && (dwMilliseconds != INFINITE)) - clock_gettime(CLOCK_MONOTONIC, &starttime); + now = GetTickCount64(); + if (dwMilliseconds != INFINITE) + dueTime = now + dwMilliseconds; + else + dueTime = 0xFFFFFFFFFFFFFFFF; do { + BOOL autoSignaled = FALSE; polled = 0; + /* first collect file descriptors to poll */ for (index = 0; index < nCount; index++) { if (bWaitAll) { - if (signalled_idx[index]) + if (signalled_handles[index]) continue; poll_map[polled] = index; @@ -362,115 +404,127 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl polled++; } - status = pollset_poll(&pollset, dwMilliseconds); - if (status < 0) + /* treat file descriptors of the APC if needed */ + if (bAlertable && !apc_collectFds(thread, &pollset, &autoSignaled)) { -#ifdef HAVE_POLL_H - WLog_ERR(TAG, "poll() handle %d (%" PRIu32 ") failure [%d] %s", index, nCount, errno, - strerror(errno)); -#else - WLog_ERR(TAG, "select() handle %d (%" PRIu32 ") failure [%d] %s", index, nCount, errno, - strerror(errno)); -#endif - winpr_log_backtrace(TAG, WLOG_ERROR, 20); + WLog_ERR(TAG, "unable to register APC fds"); SetLastError(ERROR_INTERNAL_ERROR); goto out; } - if (status == 0) + /* poll file descriptors */ + status = 0; + if (!autoSignaled) { - ret = WAIT_TIMEOUT; + DWORD waitTime; + + if (dwMilliseconds == INFINITE) + waitTime = INFINITE; + else + waitTime = (DWORD)(dueTime - now); + + status = pollset_poll(&pollset, waitTime); + if (status < 0) + { +#ifdef HAVE_POLL_H + WLog_ERR(TAG, "poll() handle %d (%" PRIu32 ") failure [%d] %s", index, nCount, + errno, strerror(errno)); +#else + WLog_ERR(TAG, "select() handle %d (%" PRIu32 ") failure [%d] %s", index, nCount, + errno, strerror(errno)); +#endif + winpr_log_backtrace(TAG, WLOG_ERROR, 20); + SetLastError(ERROR_INTERNAL_ERROR); + goto out; + } + } + + /* give priority to the APC queue, to return WAIT_IO_COMPLETION */ + if (bAlertable && apc_executeCompletions(thread, &pollset, polled)) + { + ret = WAIT_IO_COMPLETION; goto out; } - if (bWaitAll && (dwMilliseconds != INFINITE)) + /* then treat pollset */ + if (status) { - clock_gettime(CLOCK_MONOTONIC, &timenow); - diff = ts_difftime(&timenow, &starttime); - - if (diff / 1000 > dwMilliseconds) + for (index = 0; index < polled; index++) { - ret = WAIT_TIMEOUT; - goto out; - } - - dwMilliseconds -= (diff / 1000); - } - - signal_handled = FALSE; - - for (index = 0; index < polled; index++) - { - DWORD idx; - BOOL signal_set = FALSE; - - if (bWaitAll) - idx = poll_map[index]; - else - idx = index; - - signal_set = pollset_isSignaled(&pollset, index); - - if (signal_set) - { - DWORD rc = winpr_Handle_cleanup(lpHandles[idx]); - if (rc != WAIT_OBJECT_0) - { - WLog_ERR(TAG, "error in cleanup function for handle at index=%d", idx); - ret = rc; - goto out; - } + DWORD handlesIndex; + BOOL signal_set = FALSE; if (bWaitAll) - { - signalled_idx[idx] = TRUE; + handlesIndex = poll_map[index]; + else + handlesIndex = index; - /* Continue checks from last position. */ - for (; signalled < nCount; signalled++) + signal_set = pollset_isSignaled(&pollset, index); + if (signal_set) + { + DWORD rc = winpr_Handle_cleanup(lpHandles[handlesIndex]); + if (rc != WAIT_OBJECT_0) { - if (!signalled_idx[signalled]) - break; + WLog_ERR(TAG, "error in cleanup function for handle at index=%d", + handlesIndex); + ret = rc; + goto out; + } + + if (bWaitAll) + { + signalled_handles[handlesIndex] = TRUE; + + /* Continue checks from last position. */ + for (; signalled < nCount; signalled++) + { + if (!signalled_handles[signalled]) + break; + } + } + else + { + ret = (WAIT_OBJECT_0 + handlesIndex); + goto out; + } + + if (signalled >= nCount) + { + ret = WAIT_OBJECT_0; + goto out; } } - - if (!bWaitAll) - { - ret = (WAIT_OBJECT_0 + index); - goto out; - } - - if (signalled >= nCount) - { - ret = WAIT_OBJECT_0; - goto out; - } - - signal_handled = TRUE; } } - pollset_reset(&pollset); - } while (bWaitAll || !signal_handled); + if (bAlertable && thread->apc.length > extraFds) + { + pollset_uninit(&pollset); + extraFds = thread->apc.length; + if (!pollset_init(&pollset, nCount + extraFds)) + { + WLog_ERR(TAG, "unable reallocate pollset"); + SetLastError(ERROR_INTERNAL_ERROR); + return WAIT_FAILED; + } + } + else + pollset_reset(&pollset); - WLog_ERR(TAG, "failed (unknown error)"); - SetLastError(ERROR_INTERNAL_ERROR); + now = GetTickCount64(); + } while (now < dueTime); + + ret = WAIT_TIMEOUT; out: pollset_uninit(&pollset); return ret; } -DWORD WaitForMultipleObjectsEx(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAll, - DWORD dwMilliseconds, BOOL bAlertable) +DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAll, + DWORD dwMilliseconds) { - if (bAlertable) - { - /* TODO: Implement */ - WLog_ERR(TAG, "%s: Not implemented: bAlertable", __FUNCTION__); - return WAIT_FAILED; - } - - return WaitForMultipleObjects(nCount, lpHandles, bWaitAll, dwMilliseconds); + return WaitForMultipleObjectsEx(nCount, lpHandles, bWaitAll, dwMilliseconds, FALSE); } DWORD SignalObjectAndWait(HANDLE hObjectToSignal, HANDLE hObjectToWaitOn, DWORD dwMilliseconds, diff --git a/winpr/libwinpr/thread/CMakeLists.txt b/winpr/libwinpr/thread/CMakeLists.txt index 1b78a74bf..bfc04ddda 100644 --- a/winpr/libwinpr/thread/CMakeLists.txt +++ b/winpr/libwinpr/thread/CMakeLists.txt @@ -16,6 +16,8 @@ # limitations under the License. winpr_module_add( + apc.h + apc.c argv.c process.c processor.c diff --git a/winpr/libwinpr/thread/apc.c b/winpr/libwinpr/thread/apc.c new file mode 100644 index 000000000..6b8ac5e91 --- /dev/null +++ b/winpr/libwinpr/thread/apc.c @@ -0,0 +1,244 @@ +/** + * FreeRDP: A Remote Desktop Protocol Implementation + * APC implementation + * + * Copyright 2021 David Fort + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _WIN32 + +#include "apc.h" +#include "thread.h" +#include "../log.h" +#include "../synch/pollset.h" + +#define TAG WINPR_TAG("apc") + +BOOL apc_init(APC_QUEUE* apc) +{ + pthread_mutexattr_t attr; + BOOL ret = FALSE; + + pthread_mutexattr_init(&attr); + if (pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE) != 0) + { + WLog_ERR(TAG, "failed to initialize mutex attributes to recursive"); + return FALSE; + } + + memset(apc, 0, sizeof(*apc)); + + if (pthread_mutex_init(&apc->mutex, &attr) != 0) + { + WLog_ERR(TAG, "failed to initialize main thread APC mutex"); + goto out; + } + + ret = TRUE; +out: + pthread_mutexattr_destroy(&attr); + return ret; +} + +BOOL apc_uninit(APC_QUEUE* apc) +{ + return pthread_mutex_destroy(&apc->mutex) == 0; +} + +void apc_register(WINPR_THREAD* thread, WINPR_APC_ITEM* addItem) +{ + WINPR_APC_ITEM** nextp; + APC_QUEUE* apc = &thread->apc; + + pthread_mutex_lock(&apc->mutex); + if (apc->tail) + { + nextp = &apc->tail->next; + addItem->last = apc->tail; + } + else + { + nextp = &apc->head; + } + + *nextp = addItem; + apc->tail = addItem; + apc->length++; + + addItem->markedForRemove = FALSE; + addItem->boundThread = GetCurrentThreadId(); + addItem->linked = TRUE; + pthread_mutex_unlock(&apc->mutex); +} + +static INLINE void apc_item_remove(APC_QUEUE* apc, WINPR_APC_ITEM* item) +{ + if (!item->last) + apc->head = item->next; + else + item->last->next = item->next; + + if (!item->next) + apc->tail = item->last; + else + item->next->last = item->last; + + apc->length--; +} + +APC_REMOVE_RESULT apc_remove(WINPR_APC_ITEM* item) +{ + WINPR_THREAD* thread = winpr_GetCurrentThread(); + APC_QUEUE* apc; + APC_REMOVE_RESULT ret = APC_REMOVE_OK; + + if (!item->linked) + return APC_REMOVE_OK; + + if (item->boundThread != GetCurrentThreadId()) + { + WLog_ERR(TAG, "removing an APC entry should be done in the creating thread"); + return APC_REMOVE_ERROR; + } + + if (!thread) + { + WLog_ERR(TAG, "unable to retrieve current thread"); + return APC_REMOVE_ERROR; + } + + apc = &thread->apc; + pthread_mutex_lock(&apc->mutex); + if (apc->treatingCompletions) + { + item->markedForRemove = TRUE; + ret = APC_REMOVE_DELAY_FREE; + goto out; + } + + apc_item_remove(apc, item); + +out: + pthread_mutex_unlock(&apc->mutex); + item->boundThread = 0xFFFFFFFF; + item->linked = FALSE; + return ret; +} + +BOOL apc_collectFds(WINPR_THREAD* thread, WINPR_POLL_SET* set, BOOL* haveAutoSignaled) +{ + WINPR_APC_ITEM* item; + BOOL ret = FALSE; + APC_QUEUE* apc = &thread->apc; + + *haveAutoSignaled = FALSE; + pthread_mutex_lock(&apc->mutex); + item = apc->head; + for (; item; item = item->next) + { + if (item->alwaysSignaled) + *haveAutoSignaled = TRUE; + else if (!pollset_add(set, item->pollFd, item->pollMode)) + goto out; + } + + ret = TRUE; +out: + pthread_mutex_unlock(&apc->mutex); + return ret; +} + +int apc_executeCompletions(WINPR_THREAD* thread, WINPR_POLL_SET* set, size_t idx) +{ + APC_QUEUE* apc = &thread->apc; + WINPR_APC_ITEM *item, *nextItem; + int ret = 0; + + pthread_mutex_lock(&apc->mutex); + apc->treatingCompletions = TRUE; + + /* first pass to compute signaled items */ + for (item = apc->head; item; item = item->next) + { + item->isSignaled = item->alwaysSignaled || pollset_isSignaled(set, idx); + if (!item->alwaysSignaled) + idx++; + } + + /* second pass: run completions */ + for (item = apc->head; item; item = nextItem) + { + if (item->isSignaled) + { + if (item->completion && !item->markedForRemove) + item->completion(item->completionArgs); + ret++; + } + + nextItem = item->next; + + if (item->markedForRemove) + { + apc_item_remove(apc, item); + + if (item->markedForFree) + free(item); + } + } + + /* third pass: to do final cleanup */ + for (item = apc->head; item; item = nextItem) + { + nextItem = item->next; + + if (item->markedForRemove) + { + apc_item_remove(apc, item); + if (item->markedForFree) + free(item); + } + } + + apc->treatingCompletions = FALSE; + pthread_mutex_unlock(&apc->mutex); + + return ret; +} + +void apc_cleanupThread(WINPR_THREAD* thread) +{ + WINPR_APC_ITEM* item; + WINPR_APC_ITEM* nextItem; + APC_QUEUE* apc = &thread->apc; + + pthread_mutex_lock(&apc->mutex); + item = apc->head; + for (; item; item = nextItem) + { + nextItem = item->next; + + if (item->type == APC_TYPE_HANDLE_FREE) + item->completion(item->completionArgs); + + item->last = item->next = NULL; + item->linked = FALSE; + if (item->markedForFree) + free(item); + } + + apc->head = apc->tail = NULL; + pthread_mutex_unlock(&apc->mutex); +} + +#endif diff --git a/winpr/libwinpr/thread/apc.h b/winpr/libwinpr/thread/apc.h new file mode 100644 index 000000000..c69920d5d --- /dev/null +++ b/winpr/libwinpr/thread/apc.h @@ -0,0 +1,85 @@ +/** + * WinPR: Windows Portable Runtime + * APC implementation + * + * Copyright 2021 David Fort + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef WINPR_APC_H +#define WINPR_APC_H + +#include +#include + +#ifndef _WIN32 + +#include + +typedef struct winpr_thread WINPR_THREAD; +typedef struct winpr_APC_item WINPR_APC_ITEM; +typedef struct winpr_poll_set WINPR_POLL_SET; + +typedef void (*apc_treatment)(LPVOID arg); + +typedef enum +{ + APC_TYPE_USER, + APC_TYPE_TIMER, + APC_TYPE_HANDLE_FREE +} ApcType; + +struct winpr_APC_item +{ + ApcType type; + int pollFd; + DWORD pollMode; + apc_treatment completion; + LPVOID completionArgs; + BOOL markedForFree; + + /* private fields used by the APC */ + BOOL alwaysSignaled; + BOOL isSignaled; + DWORD boundThread; + BOOL linked; + BOOL markedForRemove; + WINPR_APC_ITEM *last, *next; +}; + +typedef enum +{ + APC_REMOVE_OK, + APC_REMOVE_ERROR, + APC_REMOVE_DELAY_FREE +} APC_REMOVE_RESULT; + +typedef struct +{ + pthread_mutex_t mutex; + DWORD length; + WINPR_APC_ITEM *head, *tail; + BOOL treatingCompletions; +} APC_QUEUE; + +BOOL apc_init(APC_QUEUE* apc); +BOOL apc_uninit(APC_QUEUE* apc); +void apc_register(WINPR_THREAD* thread, WINPR_APC_ITEM* addItem); +APC_REMOVE_RESULT apc_remove(WINPR_APC_ITEM* item); +BOOL apc_collectFds(WINPR_THREAD* thread, WINPR_POLL_SET* set, BOOL* haveAutoSignaled); +int apc_executeCompletions(WINPR_THREAD* thread, WINPR_POLL_SET* set, size_t startIndex); +void apc_cleanupThread(WINPR_THREAD* thread); +#endif + +#endif /* WINPR_APC_H */ diff --git a/winpr/libwinpr/thread/thread.c b/winpr/libwinpr/thread/thread.c index 09daf84fd..e671b6490 100644 --- a/winpr/libwinpr/thread/thread.c +++ b/winpr/libwinpr/thread/thread.c @@ -4,6 +4,8 @@ * * Copyright 2012 Marc-Andre Moreau * Copyright 2015 Hewlett-Packard Development Company, L.P. + * Copyright 2021 David Fort + * * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,11 +91,13 @@ #include #include "thread.h" +#include "apc.h" #include "../handle/handle.h" #include "../log.h" #define TAG WINPR_TAG("thread") +static WINPR_THREAD mainThread; static wListDictionary* thread_list = NULL; static BOOL ThreadCloseHandle(HANDLE handle); @@ -119,7 +123,7 @@ static int ThreadGetFd(HANDLE handle) if (!ThreadIsHandled(handle)) return -1; - return pThread->pipe_fd[0]; + return pThread->event.fds[0]; } static DWORD ThreadCleanupHandle(HANDLE handle) @@ -224,58 +228,12 @@ static void dump_thread(WINPR_THREAD* thread) */ static BOOL set_event(WINPR_THREAD* thread) { - int length; - BOOL status = FALSE; -#ifdef HAVE_SYS_EVENTFD_H - eventfd_t val = 1; - - do - { - length = eventfd_write(thread->pipe_fd[0], val); - } while ((length < 0) && (errno == EINTR)); - - status = (length == 0) ? TRUE : FALSE; -#else - - if (WaitForSingleObject(thread, 0) != WAIT_OBJECT_0) - { - length = write(thread->pipe_fd[1], "-", 1); - - if (length == 1) - status = TRUE; - } - else - { - status = TRUE; - } - -#endif - return status; + return winpr_event_set(&thread->event); } static BOOL reset_event(WINPR_THREAD* thread) { - int length; - BOOL status = FALSE; -#ifdef HAVE_SYS_EVENTFD_H - eventfd_t value; - - do - { - length = eventfd_read(thread->pipe_fd[0], &value); - } while ((length < 0) && (errno == EINTR)); - - if ((length > 0) && (!status)) - status = TRUE; - -#else - length = read(thread->pipe_fd[0], &length, 1); - - if ((length == 1) && (!status)) - status = TRUE; - -#endif - return status; + return winpr_event_reset(&thread->event); } static BOOL thread_compare(const void* a, const void* b) @@ -286,6 +244,31 @@ static BOOL thread_compare(const void* a, const void* b) return rc; } +static INIT_ONCE threads_InitOnce = INIT_ONCE_STATIC_INIT; +static pthread_t mainThreadId; +static DWORD currentThreadTlsIndex = TLS_OUT_OF_INDEXES; + +BOOL initializeThreads(PINIT_ONCE InitOnce, PVOID Parameter, PVOID* Context) +{ + if (!apc_init(&mainThread.apc)) + { + WLog_ERR(TAG, "failed to initialize APC"); + goto out; + } + + mainThread.Type = HANDLE_TYPE_THREAD; + mainThreadId = pthread_self(); + + currentThreadTlsIndex = TlsAlloc(); + if (currentThreadTlsIndex == TLS_OUT_OF_INDEXES) + { + WLog_ERR(TAG, "Major bug, unable to allocate a TLS value for currentThread"); + } + +out: + return TRUE; +} + /* Thread launcher function responsible for registering * cleanup handlers and calling pthread_exit, if not done * in thread function. */ @@ -301,6 +284,12 @@ static void* thread_launcher(void* arg) goto exit; } + if (!TlsSetValue(currentThreadTlsIndex, thread)) + { + WLog_ERR(TAG, "thread %d, unable to set current thread value", pthread_self()); + goto exit; + } + if (!(fkt = thread->lpStartAddress)) { WLog_ERR(TAG, "Thread function argument is %p", (void*)fkt); @@ -329,6 +318,8 @@ exit: if (thread) { + apc_cleanupThread(thread); + if (!thread->exited) thread->dwExitCode = rc; @@ -404,38 +395,25 @@ HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize thread->create_stack = winpr_backtrace(20); dump_thread(thread); #endif - thread->pipe_fd[0] = -1; - thread->pipe_fd[1] = -1; -#ifdef HAVE_SYS_EVENTFD_H - thread->pipe_fd[0] = eventfd(0, EFD_NONBLOCK); - if (thread->pipe_fd[0] < 0) + if (!winpr_event_init(&thread->event)) { - WLog_ERR(TAG, "failed to create thread pipe fd 0"); - goto error_pipefd0; + WLog_ERR(TAG, "failed to create event"); + goto error_event; } -#else - - if (pipe(thread->pipe_fd) < 0) - { - WLog_ERR(TAG, "failed to create thread pipe"); - goto error_pipefd0; - } - - { - int flags = fcntl(thread->pipe_fd[0], F_GETFL); - fcntl(thread->pipe_fd[0], F_SETFL, flags | O_NONBLOCK); - } - -#endif - - if (pthread_mutex_init(&thread->mutex, 0) != 0) + if (pthread_mutex_init(&thread->mutex, NULL) != 0) { WLog_ERR(TAG, "failed to initialize thread mutex"); goto error_mutex; } + if (!apc_init(&thread->apc)) + { + WLog_ERR(TAG, "failed to initialize APC"); + goto error_APC; + } + if (pthread_mutex_init(&thread->threadIsReadyMutex, NULL) != 0) { WLog_ERR(TAG, "failed to initialize a mutex for a condition variable"); @@ -453,6 +431,7 @@ HANDLE CreateThread(LPSECURITY_ATTRIBUTES lpThreadAttributes, SIZE_T dwStackSize if (!thread_list) { + InitOnceExecuteOnce(&threads_InitOnce, initializeThreads, NULL, NULL); thread_list = ListDictionary_New(TRUE); if (!thread_list) @@ -481,16 +460,12 @@ error_thread_list: error_thread_ready: pthread_mutex_destroy(&thread->threadIsReadyMutex); error_thread_ready_mutex: + apc_uninit(&thread->apc); +error_APC: pthread_mutex_destroy(&thread->mutex); error_mutex: - - if (thread->pipe_fd[1] >= 0) - close(thread->pipe_fd[1]); - - if (thread->pipe_fd[0] >= 0) - close(thread->pipe_fd[0]); - -error_pipefd0: + winpr_event_uninit(&thread->event); +error_event: free(thread); return NULL; } @@ -499,28 +474,25 @@ void cleanup_handle(void* obj) { int rc; WINPR_THREAD* thread = (WINPR_THREAD*)obj; - rc = pthread_cond_destroy(&thread->threadIsReady); + if (!apc_uninit(&thread->apc)) + WLog_ERR(TAG, "failed to destroy APC"); + + rc = pthread_cond_destroy(&thread->threadIsReady); if (rc) - WLog_ERR(TAG, "failed to destroy a condition variable [%d] %s (%d)", rc, strerror(errno), + WLog_ERR(TAG, "failed to destroy thread->threadIsReady [%d] %s (%d)", rc, strerror(errno), errno); rc = pthread_mutex_destroy(&thread->threadIsReadyMutex); - if (rc) - WLog_ERR(TAG, "failed to destroy a condition variable mutex [%d] %s (%d)", rc, + WLog_ERR(TAG, "failed to destroy thread->threadIsReadyMutex [%d] %s (%d)", rc, strerror(errno), errno); rc = pthread_mutex_destroy(&thread->mutex); - if (rc) - WLog_ERR(TAG, "failed to destroy mutex [%d] %s (%d)", rc, strerror(errno), errno); + WLog_ERR(TAG, "failed to destroy thread->mutex [%d] %s (%d)", rc, strerror(errno), errno); - if (thread->pipe_fd[0] >= 0) - close(thread->pipe_fd[0]); - - if (thread->pipe_fd[1] >= 0) - close(thread->pipe_fd[1]); + winpr_event_uninit(&thread->event); if (thread_list && ListDictionary_Contains(thread_list, &thread->thread)) ListDictionary_Remove(thread_list, &thread->thread); @@ -645,31 +617,28 @@ BOOL GetExitCodeThread(HANDLE hThread, LPDWORD lpExitCode) return TRUE; } -HANDLE _GetCurrentThread(VOID) +WINPR_THREAD* winpr_GetCurrentThread(VOID) { - HANDLE hdl = NULL; - pthread_t tid = pthread_self(); + WINPR_THREAD* ret; - if (!thread_list) - { - WLog_ERR(TAG, "function called without existing thread list!"); -#if defined(WITH_DEBUG_THREADS) - DumpThreadHandles(); -#endif - } - else if (!ListDictionary_Contains(thread_list, &tid)) + InitOnceExecuteOnce(&threads_InitOnce, initializeThreads, NULL, NULL); + if (mainThreadId == pthread_self()) + return (HANDLE)&mainThread; + + ret = TlsGetValue(currentThreadTlsIndex); + if (!ret) { WLog_ERR(TAG, "function called, but no matching entry in thread list!"); #if defined(WITH_DEBUG_THREADS) DumpThreadHandles(); #endif } - else - { - hdl = ListDictionary_GetItemValue(thread_list, &tid); - } + return ret; +} - return hdl; +HANDLE _GetCurrentThread(VOID) +{ + return (HANDLE)winpr_GetCurrentThread(); } DWORD GetCurrentThreadId(VOID) @@ -681,6 +650,60 @@ DWORD GetCurrentThreadId(VOID) return (DWORD)tid & 0xffffffffUL; } +typedef struct +{ + WINPR_APC_ITEM apc; + PAPCFUNC completion; + ULONG_PTR completionArg; +} UserApcItem; + +void userAPC(LPVOID arg) +{ + UserApcItem* userApc = (UserApcItem*)arg; + + userApc->completion(userApc->completionArg); + + userApc->apc.markedForRemove = TRUE; +} + +DWORD QueueUserAPC(PAPCFUNC pfnAPC, HANDLE hThread, ULONG_PTR dwData) +{ + ULONG Type; + WINPR_HANDLE* Object; + WINPR_THREAD* thread; + WINPR_APC_ITEM* apc; + UserApcItem* apcItem; + + if (!pfnAPC) + return 1; + + if (!winpr_Handle_GetInfo(hThread, &Type, &Object) || Object->Type != HANDLE_TYPE_THREAD) + { + WLog_ERR(TAG, "hThread is not a thread"); + SetLastError(ERROR_INVALID_PARAMETER); + return (DWORD)0; + } + thread = (WINPR_THREAD*)Object; + + apcItem = calloc(1, sizeof(*apcItem)); + if (!apcItem) + { + SetLastError(ERROR_INVALID_PARAMETER); + return (DWORD)0; + } + + apc = &apcItem->apc; + apc->type = APC_TYPE_USER; + apc->markedForFree = TRUE; + apc->alwaysSignaled = TRUE; + apc->completion = userAPC; + apc->completionArgs = apc; + apcItem->completion = pfnAPC; + apcItem->completionArg = dwData; + apc_register(hThread, apc); + return 1; +} + DWORD ResumeThread(HANDLE hThread) { ULONG Type; diff --git a/winpr/libwinpr/thread/thread.h b/winpr/libwinpr/thread/thread.h index 97eb99695..f187e2630 100644 --- a/winpr/libwinpr/thread/thread.h +++ b/winpr/libwinpr/thread/thread.h @@ -4,6 +4,7 @@ * * Copyright 2012 Marc-Andre Moreau * Copyright 2015 Hewlett-Packard Development Company, L.P. + * Copyright 2021 David Fort * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -28,15 +29,18 @@ #include #include "../handle/handle.h" +#include "../synch/event.h" +#include "apc.h" typedef void* (*pthread_start_routine)(void*); +typedef struct winpr_APC_item WINPR_APC_ITEM; struct winpr_thread { WINPR_HANDLE_DEF(); BOOL started; - int pipe_fd[2]; + WINPR_EVENT_IMPL event; BOOL mainProcess; BOOL detached; BOOL joined; @@ -50,6 +54,7 @@ struct winpr_thread pthread_cond_t threadIsReady; LPTHREAD_START_ROUTINE lpStartAddress; LPSECURITY_ATTRIBUTES lpThreadAttributes; + APC_QUEUE apc; #if defined(WITH_DEBUG_THREADS) void* create_stack; void* exit_stack; @@ -57,6 +62,8 @@ struct winpr_thread }; typedef struct winpr_thread WINPR_THREAD; +WINPR_THREAD* winpr_GetCurrentThread(VOID); + struct winpr_process { WINPR_HANDLE_DEF();