Merge pull request #12322 from akallabeth/msg-queue-fix

[winpr,utils] ensure message queue capacity
This commit is contained in:
akallabeth
2026-02-18 15:58:55 +01:00
committed by GitHub
6 changed files with 536 additions and 126 deletions

View File

@@ -93,6 +93,14 @@ extern "C"
*/
WINPR_API size_t Queue_Count(wQueue* queue);
/** @brief Return the allocated elements in the queue
*
* @param queue A pointer to a queue, must not be \b NULL
*
* @return the number of objects allocated
*/
WINPR_API size_t Queue_Capacity(wQueue* queue);
/** @brief Mutex-Lock a queue
*
* @param queue A pointer to a queue, must not be \b NULL
@@ -181,6 +189,14 @@ extern "C"
WINPR_API void Queue_Free(wQueue* queue);
/** @brief Creates a new queue
*
* @param synchronized If \b TRUE all functions are thread safe, if \b FALSE no synchronization
* is done.
* @param capacity The initial capacity of the queue. If \b 0 or \b -1 default settings are
* applied.
* @param growthFactor allocation behaviour when the queue capacity should be increased. Larger
* values increase the allocation contingent. \b 0 or \b -1 apply default settings.
*
*
* @return A newly allocated queue or \b NULL in case of failure
*/
@@ -727,8 +743,23 @@ extern "C"
WINPR_API wObject* MessageQueue_Object(wMessageQueue* queue);
WINPR_API HANDLE MessageQueue_Event(wMessageQueue* queue);
WINPR_API BOOL MessageQueue_Wait(wMessageQueue* queue);
/** @brief return the currently used number of elements in the queue
*
* @param queue A pointer to the queue to query. Must not be \b NULL
*
* @return The number of elements in the queue
*/
WINPR_API size_t MessageQueue_Size(wMessageQueue* queue);
/** @brief return the currently allocated elements in the queue
*
* @param queue A pointer to the queue to query. Must not be \b NULL
*
* @return The number of currently allocated elements in the queue
*/
WINPR_API size_t MessageQueue_Capacity(wMessageQueue* queue);
WINPR_API BOOL MessageQueue_Dispatch(wMessageQueue* queue, const wMessage* message);
WINPR_API BOOL MessageQueue_Post(wMessageQueue* queue, void* context, UINT32 type, void* wParam,
void* lParam);

View File

@@ -77,6 +77,15 @@ size_t MessageQueue_Size(wMessageQueue* queue)
return ret;
}
size_t MessageQueue_Capacity(wMessageQueue* queue)
{
WINPR_ASSERT(queue);
EnterCriticalSection(&queue->lock);
const size_t ret = queue->capacity;
LeaveCriticalSection(&queue->lock);
return ret;
}
/**
* Methods
*/
@@ -94,29 +103,52 @@ BOOL MessageQueue_Wait(wMessageQueue* queue)
static BOOL MessageQueue_EnsureCapacity(wMessageQueue* queue, size_t count)
{
const size_t increment = 128;
WINPR_ASSERT(queue);
const size_t required = queue->size + count;
// check for overflow
if ((required < queue->size) || (required < count))
if ((required < queue->size) || (required < count) ||
(required > (SIZE_MAX - increment) / sizeof(wMessage)))
return FALSE;
if (required > queue->capacity)
{
const size_t old_capacity = queue->capacity;
const size_t new_capacity = required + increment;
wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * required);
wMessage* new_arr = (wMessage*)realloc(queue->array, sizeof(wMessage) * new_capacity);
if (!new_arr)
return FALSE;
queue->array = new_arr;
queue->capacity = required;
ZeroMemory(&(queue->array[old_capacity]), (required - old_capacity) * sizeof(wMessage));
queue->capacity = new_capacity;
ZeroMemory(&(queue->array[old_capacity]), (new_capacity - old_capacity) * sizeof(wMessage));
/* rearrange wrapped entries */
/* rearrange wrapped entries:
* fill up the newly available space and move tail
* back by the amount of elements that have been moved to the newly
* allocated space.
*/
if (queue->tail <= queue->head)
{
CopyMemory(&(queue->array[old_capacity]), queue->array, queue->tail * sizeof(wMessage));
queue->tail += old_capacity;
size_t tocopy = queue->tail;
size_t slots = new_capacity - old_capacity;
const size_t batch = (tocopy < slots) ? tocopy : slots;
CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(wMessage));
ZeroMemory(queue->array, batch * sizeof(wMessage));
/* Tail is decremented. if the whole thing is appended
* just move the existing tail by old_capacity */
if (tocopy < slots)
queue->tail += old_capacity;
else
{
const size_t movesize = (queue->tail - batch) * sizeof(wMessage);
memmove_s(queue->array, queue->tail * sizeof(wMessage), &queue->array[batch],
movesize);
ZeroMemory(&queue->array[batch], movesize);
queue->tail -= batch;
}
}
}

View File

@@ -35,7 +35,7 @@ struct s_wQueue
size_t head;
size_t tail;
size_t size;
void** array;
uintptr_t* array;
CRITICAL_SECTION lock;
HANDLE event;
@@ -45,6 +45,16 @@ struct s_wQueue
BYTE padding2[4];
};
static inline void* uptr2void(uintptr_t ptr)
{
return (void*)ptr;
}
static inline uintptr_t void2uptr(const void* ptr)
{
return (uintptr_t)ptr;
}
/**
* C equivalent of the C# Queue Class:
* http://msdn.microsoft.com/en-us/library/system.collections.queue.aspx
@@ -71,6 +81,19 @@ size_t Queue_Count(wQueue* queue)
return ret;
}
size_t Queue_Capacity(wQueue* queue)
{
WINPR_ASSERT(queue);
Queue_Lock(queue);
const size_t ret = queue->capacity;
Queue_Unlock(queue);
return ret;
}
/**
* Lock access to the ArrayList
*/
@@ -124,9 +147,12 @@ void Queue_Clear(wQueue* queue)
for (size_t index = queue->head; index != queue->tail; index = (index + 1) % queue->capacity)
{
if (queue->object.fnObjectFree)
queue->object.fnObjectFree(queue->array[index]);
{
void* obj = uptr2void(queue->array[index]);
queue->object.fnObjectFree(obj);
}
queue->array[index] = NULL;
queue->array[index] = 0;
}
queue->size = 0;
@@ -147,7 +173,8 @@ BOOL Queue_Contains(wQueue* queue, const void* obj)
for (size_t index = 0; index < queue->tail; index++)
{
if (queue->object.fnObjectEquals(queue->array[index], obj))
void* ptr = uptr2void(queue->array[index]);
if (queue->object.fnObjectEquals(ptr, obj))
{
found = TRUE;
break;
@@ -163,29 +190,52 @@ static BOOL Queue_EnsureCapacity(wQueue* queue, size_t count)
{
WINPR_ASSERT(queue);
if (queue->size + count >= queue->capacity)
if (queue->size + count > queue->capacity)
{
if (queue->growthFactor > SIZE_MAX / 32ull)
return FALSE;
if (queue->size > SIZE_MAX - count)
return FALSE;
const size_t increment = 32ull * queue->growthFactor;
const size_t old_capacity = queue->capacity;
size_t new_capacity = queue->capacity * queue->growthFactor;
void** newArray = NULL;
if (new_capacity < queue->size + count)
new_capacity = queue->size + count;
newArray = (void**)realloc((void*)queue->array, sizeof(void*) * new_capacity);
const size_t required = queue->size + count;
const size_t new_capacity = required + increment - required % increment;
if (new_capacity > SIZE_MAX / sizeof(BYTE*))
return FALSE;
uintptr_t* newArray = (uintptr_t*)realloc(queue->array, sizeof(uintptr_t) * new_capacity);
if (!newArray)
return FALSE;
queue->capacity = new_capacity;
queue->array = newArray;
ZeroMemory((void*)&(queue->array[old_capacity]),
(new_capacity - old_capacity) * sizeof(void*));
ZeroMemory(&(queue->array[old_capacity]),
(new_capacity - old_capacity) * sizeof(uintptr_t));
/* rearrange wrapped entries */
if (queue->tail <= queue->head)
{
CopyMemory((void*)&(queue->array[old_capacity]), (void*)queue->array,
queue->tail * sizeof(void*));
queue->tail += old_capacity;
const size_t tocopy = queue->tail;
const size_t slots = new_capacity - old_capacity;
const size_t batch = (tocopy < slots) ? tocopy : slots;
CopyMemory(&(queue->array[old_capacity]), queue->array, batch * sizeof(uintptr_t));
ZeroMemory(queue->array, batch * sizeof(uintptr_t));
/* Tail is decremented. if the whole thing is appended
* just move the existing tail by old_capacity */
if (tocopy < slots)
queue->tail += old_capacity;
else
{
const size_t movesize = (queue->tail - batch) * sizeof(uintptr_t);
memmove_s(queue->array, queue->tail * sizeof(uintptr_t), &queue->array[batch],
movesize);
ZeroMemory(&queue->array[batch], movesize);
queue->tail -= batch;
}
}
}
return TRUE;
@@ -205,17 +255,10 @@ BOOL Queue_Enqueue(wQueue* queue, const void* obj)
goto out;
if (queue->object.fnObjectNew)
queue->array[queue->tail] = queue->object.fnObjectNew(obj);
queue->array[queue->tail] = void2uptr(queue->object.fnObjectNew(obj));
else
{
union
{
const void* cv;
void* v;
} cnv;
cnv.cv = obj;
queue->array[queue->tail] = cnv.v;
}
queue->array[queue->tail] = void2uptr(obj);
queue->tail = (queue->tail + 1) % queue->capacity;
{
@@ -244,8 +287,8 @@ void* Queue_Dequeue(wQueue* queue)
if (queue->size > 0)
{
obj = queue->array[queue->head];
queue->array[queue->head] = NULL;
obj = uptr2void(queue->array[queue->head]);
queue->array[queue->head] = 0;
queue->head = (queue->head + 1) % queue->capacity;
queue->size--;
}
@@ -265,11 +308,10 @@ void* Queue_Dequeue(wQueue* queue)
void* Queue_Peek(wQueue* queue)
{
void* obj = NULL;
Queue_Lock(queue);
if (queue->size > 0)
obj = queue->array[queue->head];
obj = uptr2void(queue->array[queue->head]);
Queue_Unlock(queue);
@@ -299,9 +341,7 @@ static BOOL default_queue_equals(const void* obj1, const void* obj2)
wQueue* Queue_New(BOOL synchronized, SSIZE_T capacity, SSIZE_T growthFactor)
{
wObject* obj = NULL;
wQueue* queue = NULL;
queue = (wQueue*)calloc(1, sizeof(wQueue));
wQueue* queue = (wQueue*)calloc(1, sizeof(wQueue));
if (!queue)
return NULL;
@@ -325,9 +365,10 @@ wQueue* Queue_New(BOOL synchronized, SSIZE_T capacity, SSIZE_T growthFactor)
if (!queue->event)
goto fail;
obj = Queue_Object(queue);
obj->fnObjectEquals = default_queue_equals;
{
wObject* obj = Queue_Object(queue);
obj->fnObjectEquals = default_queue_equals;
}
return queue;
fail:
WINPR_PRAGMA_DIAG_PUSH
@@ -348,6 +389,6 @@ void Queue_Free(wQueue* queue)
DeleteCriticalSection(&queue->lock);
}
(void)CloseHandle(queue->event);
free((void*)queue->array);
free(queue->array);
free(queue);
}

View File

@@ -44,43 +44,37 @@
BOOL Stream_EnsureCapacity(wStream* s, size_t size)
{
WINPR_ASSERT(s);
if (s->capacity < size)
if (s->capacity >= size)
return TRUE;
const size_t increment = 128ull;
const size_t old_capacity = s->capacity;
const size_t new_capacity = size + increment - size % increment;
const size_t position = Stream_GetPosition(s);
BYTE* new_buf = NULL;
if (!s->isOwner)
{
BYTE* new_buf = NULL;
size_t old_capacity = s->capacity;
size_t new_capacity = old_capacity;
do
{
if (new_capacity > SIZE_MAX - 128ull)
return FALSE;
new_capacity += 128ull;
} while (new_capacity <= size);
const size_t position = Stream_GetPosition(s);
if (!s->isOwner)
{
new_buf = (BYTE*)malloc(new_capacity);
CopyMemory(new_buf, s->buffer, s->capacity);
s->isOwner = TRUE;
}
else
{
new_buf = (BYTE*)realloc(s->buffer, new_capacity);
}
new_buf = (BYTE*)malloc(new_capacity);
if (!new_buf)
return FALSE;
s->buffer = new_buf;
s->capacity = new_capacity;
s->length = new_capacity;
ZeroMemory(&s->buffer[old_capacity], s->capacity - old_capacity);
Stream_SetPosition(s, position);
CopyMemory(new_buf, s->buffer, s->capacity);
s->isOwner = TRUE;
}
return TRUE;
else
{
new_buf = (BYTE*)realloc(s->buffer, new_capacity);
if (!new_buf)
return FALSE;
}
s->buffer = new_buf;
s->capacity = new_capacity;
s->length = new_capacity;
ZeroMemory(&s->buffer[old_capacity], s->capacity - old_capacity);
return Stream_SetPosition(s, position);
}
BOOL Stream_EnsureRemainingCapacity(wStream* s, size_t size)

View File

@@ -22,35 +22,174 @@ static DWORD WINAPI message_queue_consumer_thread(LPVOID arg)
return 0;
}
int TestMessageQueue(int argc, char* argv[])
static bool wrap_test(bool (*fkt)(wMessageQueue* queue))
{
HANDLE thread = NULL;
wMessageQueue* queue = NULL;
wMessageQueue* queue = MessageQueue_New(NULL);
if (!queue)
return false;
WINPR_UNUSED(argc);
WINPR_UNUSED(argv);
WINPR_ASSERT(fkt);
const bool rc = fkt(queue);
MessageQueue_Free(queue);
return rc;
}
if (!(queue = MessageQueue_New(NULL)))
static bool check(const wMessage* message, size_t pos)
{
if (!message)
return false;
if (message->context != (void*)13)
return false;
if (message->id != pos)
return false;
if (message->wParam != (void*)23)
return false;
if (message->lParam != (void*)42)
return false;
if (message->Free != NULL)
return false;
return true;
}
static bool append(wMessageQueue* queue, size_t pos)
{
const wMessage message = { .context = (void*)13,
.id = WINPR_ASSERTING_INT_CAST(DWORD, pos),
.wParam = (void*)23,
.lParam = (void*)42,
.Free = NULL };
return MessageQueue_Dispatch(queue, &message);
}
static bool fill_capcity(wMessageQueue* queue, size_t* pos)
{
WINPR_ASSERT(pos);
size_t cpos = *pos;
const size_t capacity = MessageQueue_Capacity(queue);
while (MessageQueue_Size(queue) < capacity)
{
printf("failed to create message queue\n");
return 1;
if (!append(queue, cpos++))
return false;
}
*pos = cpos;
return true;
}
static bool drain(wMessageQueue* queue, size_t expect)
{
wMessage message = { 0 };
if (MessageQueue_Get(queue, &message) < 0)
return false;
if (!check(&message, expect))
return false;
return true;
}
static bool drain_capcity(wMessageQueue* queue, size_t remain, size_t* pos)
{
WINPR_ASSERT(pos);
size_t cpos = *pos;
while (MessageQueue_Size(queue) > remain)
{
if (!drain(queue, cpos++))
return false;
}
*pos = cpos;
return true;
}
static bool test_growth_move(wMessageQueue* queue, bool big)
{
WINPR_ASSERT(queue);
const size_t cap = MessageQueue_Capacity(queue);
if (cap < 4)
return false;
size_t wpos = 0;
size_t rpos = 0;
if (!fill_capcity(queue, &wpos))
return false;
if (big)
{
if (!append(queue, wpos++))
return false;
}
if (!(thread = CreateThread(NULL, 0, message_queue_consumer_thread, (void*)queue, 0, NULL)))
if (!drain_capcity(queue, 3, &rpos))
return false;
if (!fill_capcity(queue, &wpos))
return false;
if (!append(queue, wpos++))
return false;
return drain_capcity(queue, 0, &rpos);
}
static bool test_growth_big_move(wMessageQueue* queue)
{
return test_growth_move(queue, true);
}
static bool test_growth_small_move(wMessageQueue* queue)
{
return test_growth_move(queue, false);
}
static bool test_operation_run(wMessageQueue* queue, HANDLE thread)
{
WINPR_ASSERT(queue);
WINPR_ASSERT(thread);
if (!MessageQueue_Post(queue, NULL, 123, NULL, NULL))
return false;
if (!MessageQueue_Post(queue, NULL, 456, NULL, NULL))
return false;
if (!MessageQueue_Post(queue, NULL, 789, NULL, NULL))
return false;
if (!MessageQueue_PostQuit(queue, 0))
return false;
const DWORD status = WaitForSingleObject(thread, INFINITE);
if (status != WAIT_OBJECT_0)
return false;
return true;
}
static bool test_operation(wMessageQueue* queue)
{
WINPR_ASSERT(queue);
HANDLE thread = CreateThread(NULL, 0, message_queue_consumer_thread, queue, 0, NULL);
if (!thread)
{
printf("failed to create thread\n");
MessageQueue_Free(queue);
return 1;
return false;
}
const bool rc = test_operation_run(queue, thread);
if (!CloseHandle(thread))
return false;
return rc;
}
if (!MessageQueue_Post(queue, NULL, 123, NULL, NULL) ||
!MessageQueue_Post(queue, NULL, 456, NULL, NULL) ||
!MessageQueue_Post(queue, NULL, 789, NULL, NULL) || !MessageQueue_PostQuit(queue, 0) ||
WaitForSingleObject(thread, INFINITE) != WAIT_OBJECT_0)
int TestMessageQueue(WINPR_ATTR_UNUSED int argc, WINPR_ATTR_UNUSED char* argv[])
{
if (!wrap_test(test_growth_big_move))
return -1;
MessageQueue_Free(queue);
(void)CloseHandle(thread);
if (!wrap_test(test_growth_small_move))
return -2;
if (!wrap_test(test_operation))
return -3;
return 0;
}

View File

@@ -3,56 +3,229 @@
#include <winpr/tchar.h>
#include <winpr/collections.h>
int TestQueue(int argc, char* argv[])
static bool wrap_test(bool (*fkt)(wQueue* queue))
{
size_t item = 0;
size_t count = 0;
wQueue* queue = NULL;
WINPR_UNUSED(argc);
WINPR_UNUSED(argv);
queue = Queue_New(TRUE, -1, -1);
wQueue* queue = Queue_New(TRUE, -1, -1);
if (!queue)
return -1;
return false;
WINPR_ASSERT(fkt);
const bool rc = fkt(queue);
Queue_Free(queue);
return rc;
}
static bool check(const void* ptr, size_t pos)
{
if (!ptr)
return false;
if (ptr != (void*)(pos + 23))
return false;
return true;
}
static bool append(wQueue* queue, size_t pos)
{
void* ptr = (void*)(pos + 23);
return Queue_Enqueue(queue, ptr);
}
static bool fill_capcity(wQueue* queue, size_t* pos)
{
WINPR_ASSERT(pos);
size_t cpos = *pos;
const size_t capacity = Queue_Capacity(queue);
while (Queue_Count(queue) < capacity)
{
if (!append(queue, cpos++))
return false;
}
*pos = cpos;
return true;
}
static bool drain(wQueue* queue, size_t expect)
{
void* ptr = Queue_Dequeue(queue);
return check(ptr, expect);
}
static bool drain_capcity(wQueue* queue, size_t remain, size_t* pos)
{
WINPR_ASSERT(pos);
size_t cpos = *pos;
while (Queue_Count(queue) > remain)
{
if (!drain(queue, cpos++))
return false;
}
*pos = cpos;
return true;
}
static bool test_growth_move(wQueue* queue, bool big)
{
WINPR_ASSERT(queue);
const size_t cap = Queue_Capacity(queue);
if (cap < 4)
return false;
size_t wpos = 0;
size_t rpos = 0;
if (!fill_capcity(queue, &wpos))
return false;
/* Ensure the (base) capacity is larger than the allocation step
* so a full copy of tail will not be possible */
if (big)
{
if (!append(queue, wpos++))
return false;
}
if (!drain_capcity(queue, 3, &rpos))
return false;
if (!fill_capcity(queue, &wpos))
return false;
if (!append(queue, wpos++))
return false;
return drain_capcity(queue, 0, &rpos);
}
static bool test_growth_big_move(wQueue* queue)
{
return test_growth_move(queue, true);
}
static bool test_growth_small_move(wQueue* queue)
{
return test_growth_move(queue, false);
}
static bool check_size(wQueue* queue, size_t expected)
{
WINPR_ASSERT(queue);
const size_t count = Queue_Count(queue);
printf("queue count: %" PRIuz "\n", count);
if (count != expected)
return false;
return true;
}
static bool enqueue(wQueue* queue, size_t val)
{
WINPR_ASSERT(queue);
void* ptr = (void*)(23 + val);
return Queue_Enqueue(queue, ptr);
}
static bool dequeue(wQueue* queue, size_t expected)
{
WINPR_ASSERT(queue);
const void* pexpect = (void*)(23 + expected);
void* ptr = Queue_Dequeue(queue);
if (pexpect != ptr)
return false;
return true;
}
static bool legacy_test(wQueue* queue)
{
WINPR_ASSERT(queue);
for (size_t index = 1; index <= 10; index++)
{
Queue_Enqueue(queue, (void*)index);
if (!enqueue(queue, index))
return false;
}
count = Queue_Count(queue);
printf("queue count: %" PRIuz "\n", count);
if (!check_size(queue, 10))
return false;
for (size_t index = 1; index <= 10; index++)
{
item = (size_t)Queue_Dequeue(queue);
if (item != index)
return -1;
if (!dequeue(queue, index))
return false;
}
count = Queue_Count(queue);
printf("queue count: %" PRIuz "\n", count);
if (!check_size(queue, 0))
return false;
Queue_Enqueue(queue, (void*)(size_t)1);
Queue_Enqueue(queue, (void*)(size_t)2);
Queue_Enqueue(queue, (void*)(size_t)3);
if (!enqueue(queue, 1))
return false;
if (!enqueue(queue, 2))
return false;
if (!enqueue(queue, 3))
return false;
Queue_Dequeue(queue);
Queue_Dequeue(queue);
if (!check_size(queue, 3))
return false;
Queue_Enqueue(queue, (void*)(size_t)4);
Queue_Enqueue(queue, (void*)(size_t)5);
Queue_Enqueue(queue, (void*)(size_t)6);
if (!dequeue(queue, 1))
return false;
if (!dequeue(queue, 2))
return false;
Queue_Dequeue(queue);
Queue_Dequeue(queue);
Queue_Dequeue(queue);
Queue_Dequeue(queue);
if (!check_size(queue, 1))
return false;
if (!enqueue(queue, 4))
return false;
if (!enqueue(queue, 5))
return false;
if (!enqueue(queue, 6))
return false;
if (!check_size(queue, 4))
return false;
if (!dequeue(queue, 3))
return false;
if (!dequeue(queue, 4))
return false;
if (!dequeue(queue, 5))
return false;
if (!dequeue(queue, 6))
return false;
if (!check_size(queue, 0))
return false;
Queue_Clear(queue);
Queue_Free(queue);
if (!check_size(queue, 0))
return false;
for (size_t x = 0; x < 32; x++)
{
void* ptr = (void*)(23 + x);
if (!Queue_Enqueue(queue, ptr))
return false;
}
if (!check_size(queue, 32))
return false;
Queue_Clear(queue);
if (!check_size(queue, 0))
return false;
return true;
}
int TestQueue(WINPR_ATTR_UNUSED int argc, WINPR_ATTR_UNUSED char* argv[])
{
if (!wrap_test(test_growth_big_move))
return -1;
if (!wrap_test(test_growth_small_move))
return -2;
if (!wrap_test(legacy_test))
return -3;
return 0;
}