libfreerdp-core: add asynchronous send queue

This commit is contained in:
Marc-André Moreau
2012-11-28 13:38:01 -05:00
parent d0792ea4d1
commit 3936a19914
6 changed files with 74 additions and 32 deletions

View File

@@ -29,6 +29,7 @@
#include <winpr/crt.h>
#include <winpr/tchar.h>
#include <winpr/synch.h>
#include <winpr/dsparse.h>
#include <openssl/rand.h>
@@ -457,7 +458,6 @@ int rpc_recv_pdu(rdpRpc* rpc)
int rpc_tsg_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum)
{
int status;
BYTE* buffer;
UINT32 offset;
rdpNtlm* ntlm;
@@ -549,12 +549,9 @@ int rpc_tsg_write(rdpRpc* rpc, BYTE* data, int length, UINT16 opnum)
offset += Buffers[1].cbBuffer;
rpc_send_enqueue_pdu(rpc, buffer, request_pdu->frag_length);
status = rpc_send_dequeue_pdu(rpc);
free(buffer);
if (status < 0)
return -1;
WaitForSingleObject(rpc->client->PduSentEvent, INFINITE);
ResetEvent(rpc->client->PduSentEvent);
return length;
}
@@ -715,6 +712,8 @@ rdpRpc* rpc_new(rdpTransport* transport)
rpc->VirtualConnectionCookieTable = rpc_virtual_connection_cookie_table_new(rpc);
rpc->call_id = 1;
rpc_client_start(rpc);
}
return rpc;

View File

@@ -705,11 +705,8 @@ struct rpc_client
HANDLE Thread;
HANDLE StopEvent;
HANDLE SendEvent;
HANDLE PduSentEvent;
HANDLE SendSemaphore;
HANDLE ReceiveEvent;
HANDLE ReceiveSemaphore;
};
typedef struct rpc_client RpcClient;

View File

@@ -40,6 +40,7 @@ int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length)
PduEntry->Length = length;
InterlockedPushEntrySList(rpc->SendQueue, &(PduEntry->ItemEntry));
ReleaseSemaphore(rpc->client->SendSemaphore, 1, NULL);
return 0;
}
@@ -51,6 +52,9 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
PduEntry = (RPC_PDU_ENTRY*) InterlockedPopEntrySList(rpc->SendQueue);
if (!PduEntry)
return 0;
status = rpc_in_write(rpc, PduEntry->Buffer, PduEntry->Length);
/*
@@ -62,8 +66,11 @@ int rpc_send_dequeue_pdu(rdpRpc* rpc)
rpc->VirtualConnection->DefaultInChannel->BytesSent += status;
rpc->VirtualConnection->DefaultInChannel->SenderAvailableWindow -= status;
free(PduEntry->Buffer);
_aligned_free(PduEntry);
SetEvent(rpc->client->PduSentEvent);
return status;
}
@@ -71,30 +78,23 @@ static void* rpc_client_thread(void* arg)
{
rdpRpc* rpc;
DWORD status;
HANDLE events[3];
DWORD nCount;
HANDLE events[2];
rpc = (rdpRpc*) arg;
events[0] = rpc->client->StopEvent;
events[1] = rpc->client->SendEvent;
events[2] = rpc->client->ReceiveEvent;
nCount = 0;
events[nCount++] = rpc->client->StopEvent;
events[nCount++] = rpc->client->SendSemaphore;
while (1)
{
status = WaitForMultipleObjects(3, events, FALSE, INFINITE);
status = WaitForMultipleObjects(nCount, events, FALSE, INFINITE);
if (WaitForSingleObject(rpc->client->StopEvent, 0) == WAIT_OBJECT_0)
break;
if (WaitForSingleObject(rpc->client->SendEvent, 0) == WAIT_OBJECT_0)
{
rpc_send_dequeue_pdu(rpc);
}
if (WaitForSingleObject(rpc->client->ReceiveEvent, 0) == WAIT_OBJECT_0)
{
}
rpc_send_dequeue_pdu(rpc);
}
return NULL;
@@ -108,9 +108,9 @@ int rpc_client_start(rdpRpc* rpc)
(LPTHREAD_START_ROUTINE) rpc_client_thread,
rpc, CREATE_SUSPENDED, NULL);
rpc->client->SendEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
rpc->client->ReceiveEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
rpc->client->StopEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
rpc->client->SendSemaphore = CreateSemaphore(NULL, 0, 64, NULL);
rpc->client->PduSentEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
ResumeThread(rpc->client->Thread);

View File

@@ -27,4 +27,6 @@
int rpc_send_enqueue_pdu(rdpRpc* rpc, BYTE* buffer, UINT32 length);
int rpc_send_dequeue_pdu(rdpRpc* rpc);
int rpc_client_start(rdpRpc* rpc);
#endif /* FREERDP_CORE_RPC_CLIENT_H */

View File

@@ -132,6 +132,26 @@ const RPC_FAULT_CODE RPC_FAULT_CODES[] =
DEFINE_RPC_FAULT_CODE(RPC_X_ENUM_VALUE_OUT_OF_RANGE)
DEFINE_RPC_FAULT_CODE(RPC_X_BYTE_COUNT_TOO_SMALL)
DEFINE_RPC_FAULT_CODE(RPC_X_BAD_STUB_DATA)
DEFINE_RPC_FAULT_CODE(RPC_S_NO_INTERFACES)
DEFINE_RPC_FAULT_CODE(RPC_S_CALL_CANCELLED)
DEFINE_RPC_FAULT_CODE(RPC_S_BINDING_INCOMPLETE)
DEFINE_RPC_FAULT_CODE(RPC_S_COMM_FAILURE)
DEFINE_RPC_FAULT_CODE(RPC_S_UNSUPPORTED_AUTHN_LEVEL)
DEFINE_RPC_FAULT_CODE(RPC_S_NO_PRINC_NAME)
DEFINE_RPC_FAULT_CODE(RPC_S_NOT_RPC_ERROR)
DEFINE_RPC_FAULT_CODE(RPC_S_UUID_LOCAL_ONLY)
DEFINE_RPC_FAULT_CODE(RPC_S_SEC_PKG_ERROR)
DEFINE_RPC_FAULT_CODE(RPC_S_NOT_CANCELLED)
DEFINE_RPC_FAULT_CODE(RPC_X_INVALID_ES_ACTION)
DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_ES_VERSION)
DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_STUB_VERSION)
DEFINE_RPC_FAULT_CODE(RPC_X_INVALID_PIPE_OBJECT)
DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_PIPE_ORDER)
DEFINE_RPC_FAULT_CODE(RPC_X_WRONG_PIPE_VERSION)
DEFINE_RPC_FAULT_CODE(RPC_S_COOKIE_AUTH_FAILED)
DEFINE_RPC_FAULT_CODE(RPC_S_GROUP_MEMBER_NOT_FOUND)
DEFINE_RPC_FAULT_CODE(EPT_S_CANT_CREATE)
DEFINE_RPC_FAULT_CODE(RPC_S_INVALID_OBJECT)
{ 0, NULL }
};

View File

@@ -153,7 +153,6 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
fd_set fds;
ULONG Type;
PVOID Object;
WINPR_EVENT* event;
struct timeval timeout;
maxfd = 0;
@@ -168,11 +167,22 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
if (!winpr_Handle_GetInfo(lpHandles[index], &Type, &Object))
return WAIT_FAILED;
if (Type != HANDLE_TYPE_EVENT)
if (Type == HANDLE_TYPE_EVENT)
{
fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
}
else if (Type == HANDLE_TYPE_SEMAPHORE)
{
#ifdef WINPR_PIPE_SEMAPHORE
fd = ((WINPR_SEMAPHORE*) Object)->pipe_fd[0];
#else
return WAIT_FAILED;
event = (WINPR_EVENT*) Object;
fd = event->pipe_fd[0];
#endif
}
else
{
return WAIT_FAILED;
}
FD_SET(fd, &fds);
@@ -197,10 +207,24 @@ DWORD WaitForMultipleObjects(DWORD nCount, const HANDLE* lpHandles, BOOL bWaitAl
for (index = 0; index < nCount; index++)
{
winpr_Handle_GetInfo(lpHandles[index], &Type, &Object);
fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
if (Type == HANDLE_TYPE_EVENT)
fd = ((WINPR_EVENT*) Object)->pipe_fd[0];
else if (Type == HANDLE_TYPE_SEMAPHORE)
fd = ((WINPR_SEMAPHORE*) Object)->pipe_fd[0];
if (FD_ISSET(fd, &fds))
{
if (Type == HANDLE_TYPE_SEMAPHORE)
{
int length = read(fd, &length, 1);
if (length != 1)
return WAIT_FAILED;
}
return (WAIT_OBJECT_0 + index);
}
}
return WAIT_FAILED;