From a4ec778f02bbbdf0b7263ed80d28d95b49db9130 Mon Sep 17 00:00:00 2001 From: Vic Lee Date: Mon, 1 Aug 2011 12:43:53 +0800 Subject: [PATCH] libfreerdp-core: add non-blocking read. --- libfreerdp-core/activation.c | 2 - libfreerdp-core/connection.c | 1 + libfreerdp-core/freerdp.c | 5 ++- libfreerdp-core/rdp.c | 50 +++++++++++++++++++--- libfreerdp-core/rdp.h | 3 ++ libfreerdp-core/transport.c | 80 ++++++++++++++++++++++++++++++++++++ libfreerdp-core/transport.h | 2 + 7 files changed, 134 insertions(+), 9 deletions(-) diff --git a/libfreerdp-core/activation.c b/libfreerdp-core/activation.c index 95cf9d9da..5121dff7f 100644 --- a/libfreerdp-core/activation.c +++ b/libfreerdp-core/activation.c @@ -170,7 +170,6 @@ void rdp_send_client_font_list_pdu(rdpRdp* rdp, uint16 flags) void rdp_recv_server_font_map_pdu(rdpRdp* rdp, STREAM* s, rdpSettings* settings) { rdp->activated = True; - rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, False); } void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s) @@ -184,6 +183,5 @@ void rdp_recv_deactivate_all(rdpRdp* rdp, STREAM* s) stream_seek(s, lengthSourceDescriptor); /* sourceDescriptor (should be 0x00) */ rdp->activated = False; - rdp->transport->tcp->set_blocking_mode(rdp->transport->tcp, True); } diff --git a/libfreerdp-core/connection.c b/libfreerdp-core/connection.c index 5b027af9e..e8934077f 100644 --- a/libfreerdp-core/connection.c +++ b/libfreerdp-core/connection.c @@ -95,6 +95,7 @@ boolean rdp_client_connect(rdpRdp* rdp) rdp->licensed = True; rdp_client_activate(rdp); + rdp_set_blocking_mode(rdp, False); return True; } diff --git a/libfreerdp-core/freerdp.c b/libfreerdp-core/freerdp.c index ce75572dc..d46c9be7b 100644 --- a/libfreerdp-core/freerdp.c +++ b/libfreerdp-core/freerdp.c @@ -54,10 +54,13 @@ boolean freerdp_get_fds(freerdp* instance, void** rfds, int* rcount, void** wfds boolean freerdp_check_fds(freerdp* instance) { rdpRdp* rdp; + int status; rdp = (rdpRdp*) instance->rdp; - rdp_recv(rdp); + status = rdp_check_fds(rdp); + if (status < 0) + return False; return True; } diff --git a/libfreerdp-core/rdp.c b/libfreerdp-core/rdp.c index 3729ef9a0..4948f964e 100644 --- a/libfreerdp-core/rdp.c +++ b/libfreerdp-core/rdp.c @@ -344,13 +344,13 @@ void rdp_read_data_pdu(rdpRdp* rdp, STREAM* s) } /** - * Receive an RDP packet.\n + * Process an RDP packet.\n * @param rdp RDP module + * @param s stream */ -void rdp_recv(rdpRdp* rdp) +void rdp_process_pdu(rdpRdp* rdp, STREAM* s) { - STREAM* s; int length; uint16 pduType; uint16 pduLength; @@ -359,9 +359,6 @@ void rdp_recv(rdpRdp* rdp) uint16 sec_flags; enum DomainMCSPDU MCSPDU; - s = transport_recv_stream_init(rdp->transport, 4096); - transport_read(rdp->transport, s); - MCSPDU = DomainMCSPDU_SendDataIndication; mcs_read_domain_mcspdu_header(s, &MCSPDU, &length); @@ -421,6 +418,47 @@ void rdp_recv(rdpRdp* rdp) } } +/** + * Receive an RDP packet.\n + * @param rdp RDP module + */ + +void rdp_recv(rdpRdp* rdp) +{ + STREAM* s; + + s = transport_recv_stream_init(rdp->transport, 4096); + transport_read(rdp->transport, s); + + rdp_process_pdu(rdp, s); +} + +static int rdp_recv_callback(rdpTransport* transport, STREAM* s, void* extra) +{ + rdpRdp* rdp = (rdpRdp*) extra; + + rdp_process_pdu(rdp, s); + + return 1; +} + +/** + * Set non-blocking mode information. + * @param rdp RDP module + * @param blocking blocking mode + */ +void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking) +{ + rdp->transport->recv_callback = rdp_recv_callback; + rdp->transport->recv_extra = rdp; + transport_set_blocking_mode(rdp->transport, blocking); +} + +int rdp_check_fds(rdpRdp* rdp) +{ + return transport_check_fds(rdp->transport); +} + /** * Instantiate new RDP module. * @return new RDP module diff --git a/libfreerdp-core/rdp.h b/libfreerdp-core/rdp.h index 162c40ad7..52b0b38a7 100644 --- a/libfreerdp-core/rdp.h +++ b/libfreerdp-core/rdp.h @@ -236,6 +236,9 @@ void rdp_send_data_pdu(rdpRdp* rdp, STREAM* s, uint16 type, uint16 channel_id); void rdp_send(rdpRdp* rdp, STREAM* s); void rdp_recv(rdpRdp* rdp); +void rdp_set_blocking_mode(rdpRdp* rdp, boolean blocking); +int rdp_check_fds(rdpRdp* rdp); + rdpRdp* rdp_new(); void rdp_free(rdpRdp* rdp); diff --git a/libfreerdp-core/transport.c b/libfreerdp-core/transport.c index 45cebd77c..2a03a4b7d 100644 --- a/libfreerdp-core/transport.c +++ b/libfreerdp-core/transport.c @@ -135,6 +135,21 @@ int transport_read(rdpTransport* transport, STREAM* s) return status; } +static int transport_read_nonblocking(rdpTransport* transport) +{ + int status; + + stream_check_size(transport->recv_buffer, 4096); + status = transport_read(transport, transport->recv_buffer); + + if (status <= 0) + return status; + + stream_seek(transport->recv_buffer, status); + + return status; +} + int transport_write(rdpTransport* transport, STREAM* s) { int status = -1; @@ -149,6 +164,63 @@ int transport_write(rdpTransport* transport, STREAM* s) int transport_check_fds(rdpTransport* transport) { + int pos; + int status; + uint8 header; + uint16 length; + STREAM* received; + + status = transport_read_nonblocking(transport); + if (status <= 0) + return status; + + while ((pos = stream_get_pos(transport->recv_buffer)) > 0) + { + /* Ensure the TPKT or Fast Path header is available. */ + if (pos <= 4) + return 0; + + stream_set_pos(transport->recv_buffer, 0); + stream_peek_uint8(transport->recv_buffer, header); + if (header == 0x03) /* TPKT */ + length = tpkt_read_header(transport->recv_buffer); + else /* TODO: Fast Path */ + length = 0; + + if (length == 0) + { + printf("transport_check_fds: protocol error, not a TPKT header (%d).\n", header); + return -1; + } + + if (pos < length) + { + stream_set_pos(transport->recv_buffer, pos); + return 0; /* Packet is not yet completely received. */ + } + + /* + * A complete packet has been received. In case there are trailing data + * for the next packet, we copy it to the new receive buffer. + */ + received = transport->recv_buffer; + transport->recv_buffer = stream_new(BUFFER_SIZE); + + if (pos > length) + { + stream_set_pos(received, length); + stream_check_size(transport->recv_buffer, pos - length); + stream_copy(transport->recv_buffer, received, pos - length); + } + + stream_set_pos(received, 0); + status = transport->recv_callback(transport, received, transport->recv_extra); + stream_free(received); + + if (status < 0) + return status; + } + return 0; } @@ -158,6 +230,12 @@ void transport_init(rdpTransport* transport) transport->state = TRANSPORT_STATE_NEGO; } +boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking) +{ + transport->blocking = blocking; + return transport->tcp->set_blocking_mode(transport->tcp, blocking); +} + rdpTransport* transport_new(rdpSettings* settings) { rdpTransport* transport; @@ -179,6 +257,8 @@ rdpTransport* transport_new(rdpSettings* settings) /* buffers for blocking read/write */ transport->recv_stream = stream_new(BUFFER_SIZE); transport->send_stream = stream_new(BUFFER_SIZE); + + transport->blocking = True; } return transport; diff --git a/libfreerdp-core/transport.h b/libfreerdp-core/transport.h index 23c97cb7b..0f591abcd 100644 --- a/libfreerdp-core/transport.h +++ b/libfreerdp-core/transport.h @@ -63,6 +63,7 @@ struct rdp_transport void* recv_extra; STREAM* recv_buffer; TransportRecv recv_callback; + boolean blocking; }; STREAM* transport_recv_stream_init(rdpTransport* transport, int size); @@ -75,6 +76,7 @@ boolean transport_connect_nla(rdpTransport* transport); int transport_read(rdpTransport* transport, STREAM* s); int transport_write(rdpTransport* transport, STREAM* s); int transport_check_fds(rdpTransport* transport); +boolean transport_set_blocking_mode(rdpTransport* transport, boolean blocking); rdpTransport* transport_new(rdpSettings* settings); void transport_free(rdpTransport* transport);