282 lines
9.3 KiB
Python
282 lines
9.3 KiB
Python
from __future__ import annotations
|
|
|
|
import codecs
|
|
import queue
|
|
import threading
|
|
from typing import Iterator, List, Optional, cast
|
|
|
|
from ..frames import Frame, Opcode
|
|
from ..typing import Data
|
|
|
|
|
|
__all__ = ["Assembler"]
|
|
|
|
UTF8Decoder = codecs.getincrementaldecoder("utf-8")
|
|
|
|
|
|
class Assembler:
|
|
"""
|
|
Assemble messages from frames.
|
|
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
# Serialize reads and writes -- except for reads via synchronization
|
|
# primitives provided by the threading and queue modules.
|
|
self.mutex = threading.Lock()
|
|
|
|
# We create a latch with two events to ensure proper interleaving of
|
|
# writing and reading messages.
|
|
# put() sets this event to tell get() that a message can be fetched.
|
|
self.message_complete = threading.Event()
|
|
# get() sets this event to let put() that the message was fetched.
|
|
self.message_fetched = threading.Event()
|
|
|
|
# This flag prevents concurrent calls to get() by user code.
|
|
self.get_in_progress = False
|
|
# This flag prevents concurrent calls to put() by library code.
|
|
self.put_in_progress = False
|
|
|
|
# Decoder for text frames, None for binary frames.
|
|
self.decoder: Optional[codecs.IncrementalDecoder] = None
|
|
|
|
# Buffer of frames belonging to the same message.
|
|
self.chunks: List[Data] = []
|
|
|
|
# When switching from "buffering" to "streaming", we use a thread-safe
|
|
# queue for transferring frames from the writing thread (library code)
|
|
# to the reading thread (user code). We're buffering when chunks_queue
|
|
# is None and streaming when it's a SimpleQueue. None is a sentinel
|
|
# value marking the end of the stream, superseding message_complete.
|
|
|
|
# Stream data from frames belonging to the same message.
|
|
# Remove quotes around type when dropping Python < 3.9.
|
|
self.chunks_queue: Optional["queue.SimpleQueue[Optional[Data]]"] = None
|
|
|
|
# This flag marks the end of the stream.
|
|
self.closed = False
|
|
|
|
def get(self, timeout: Optional[float] = None) -> Data:
|
|
"""
|
|
Read the next message.
|
|
|
|
:meth:`get` returns a single :class:`str` or :class:`bytes`.
|
|
|
|
If the message is fragmented, :meth:`get` waits until the last frame is
|
|
received, then it reassembles the message and returns it. To receive
|
|
messages frame by frame, use :meth:`get_iter` instead.
|
|
|
|
Args:
|
|
timeout: If a timeout is provided and elapses before a complete
|
|
message is received, :meth:`get` raises :exc:`TimeoutError`.
|
|
|
|
Raises:
|
|
EOFError: If the stream of frames has ended.
|
|
RuntimeError: If two threads run :meth:`get` or :meth:``get_iter`
|
|
concurrently.
|
|
|
|
"""
|
|
with self.mutex:
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
if self.get_in_progress:
|
|
raise RuntimeError("get or get_iter is already running")
|
|
|
|
self.get_in_progress = True
|
|
|
|
# If the message_complete event isn't set yet, release the lock to
|
|
# allow put() to run and eventually set it.
|
|
# Locking with get_in_progress ensures only one thread can get here.
|
|
completed = self.message_complete.wait(timeout)
|
|
|
|
with self.mutex:
|
|
self.get_in_progress = False
|
|
|
|
# Waiting for a complete message timed out.
|
|
if not completed:
|
|
raise TimeoutError(f"timed out in {timeout:.1f}s")
|
|
|
|
# get() was unblocked by close() rather than put().
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
assert self.message_complete.is_set()
|
|
self.message_complete.clear()
|
|
|
|
joiner: Data = b"" if self.decoder is None else ""
|
|
# mypy cannot figure out that chunks have the proper type.
|
|
message: Data = joiner.join(self.chunks) # type: ignore
|
|
|
|
assert not self.message_fetched.is_set()
|
|
self.message_fetched.set()
|
|
|
|
self.chunks = []
|
|
assert self.chunks_queue is None
|
|
|
|
return message
|
|
|
|
def get_iter(self) -> Iterator[Data]:
|
|
"""
|
|
Stream the next message.
|
|
|
|
Iterating the return value of :meth:`get_iter` yields a :class:`str` or
|
|
:class:`bytes` for each frame in the message.
|
|
|
|
The iterator must be fully consumed before calling :meth:`get_iter` or
|
|
:meth:`get` again. Else, :exc:`RuntimeError` is raised.
|
|
|
|
This method only makes sense for fragmented messages. If messages aren't
|
|
fragmented, use :meth:`get` instead.
|
|
|
|
Raises:
|
|
EOFError: If the stream of frames has ended.
|
|
RuntimeError: If two threads run :meth:`get` or :meth:``get_iter`
|
|
concurrently.
|
|
|
|
"""
|
|
with self.mutex:
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
if self.get_in_progress:
|
|
raise RuntimeError("get or get_iter is already running")
|
|
|
|
chunks = self.chunks
|
|
self.chunks = []
|
|
self.chunks_queue = cast(
|
|
# Remove quotes around type when dropping Python < 3.9.
|
|
"queue.SimpleQueue[Optional[Data]]",
|
|
queue.SimpleQueue(),
|
|
)
|
|
|
|
# Sending None in chunk_queue supersedes setting message_complete
|
|
# when switching to "streaming". If message is already complete
|
|
# when the switch happens, put() didn't send None, so we have to.
|
|
if self.message_complete.is_set():
|
|
self.chunks_queue.put(None)
|
|
|
|
self.get_in_progress = True
|
|
|
|
# Locking with get_in_progress ensures only one thread can get here.
|
|
yield from chunks
|
|
while True:
|
|
chunk = self.chunks_queue.get()
|
|
if chunk is None:
|
|
break
|
|
yield chunk
|
|
|
|
with self.mutex:
|
|
self.get_in_progress = False
|
|
|
|
assert self.message_complete.is_set()
|
|
self.message_complete.clear()
|
|
|
|
# get_iter() was unblocked by close() rather than put().
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
assert not self.message_fetched.is_set()
|
|
self.message_fetched.set()
|
|
|
|
assert self.chunks == []
|
|
self.chunks_queue = None
|
|
|
|
def put(self, frame: Frame) -> None:
|
|
"""
|
|
Add ``frame`` to the next message.
|
|
|
|
When ``frame`` is the final frame in a message, :meth:`put` waits until
|
|
the message is fetched, either by calling :meth:`get` or by fully
|
|
consuming the return value of :meth:`get_iter`.
|
|
|
|
:meth:`put` assumes that the stream of frames respects the protocol. If
|
|
it doesn't, the behavior is undefined.
|
|
|
|
Raises:
|
|
EOFError: If the stream of frames has ended.
|
|
RuntimeError: If two threads run :meth:`put` concurrently.
|
|
|
|
"""
|
|
with self.mutex:
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
if self.put_in_progress:
|
|
raise RuntimeError("put is already running")
|
|
|
|
if frame.opcode is Opcode.TEXT:
|
|
self.decoder = UTF8Decoder(errors="strict")
|
|
elif frame.opcode is Opcode.BINARY:
|
|
self.decoder = None
|
|
elif frame.opcode is Opcode.CONT:
|
|
pass
|
|
else:
|
|
# Ignore control frames.
|
|
return
|
|
|
|
data: Data
|
|
if self.decoder is not None:
|
|
data = self.decoder.decode(frame.data, frame.fin)
|
|
else:
|
|
data = frame.data
|
|
|
|
if self.chunks_queue is None:
|
|
self.chunks.append(data)
|
|
else:
|
|
self.chunks_queue.put(data)
|
|
|
|
if not frame.fin:
|
|
return
|
|
|
|
# Message is complete. Wait until it's fetched to return.
|
|
|
|
assert not self.message_complete.is_set()
|
|
self.message_complete.set()
|
|
|
|
if self.chunks_queue is not None:
|
|
self.chunks_queue.put(None)
|
|
|
|
assert not self.message_fetched.is_set()
|
|
|
|
self.put_in_progress = True
|
|
|
|
# Release the lock to allow get() to run and eventually set the event.
|
|
self.message_fetched.wait()
|
|
|
|
with self.mutex:
|
|
self.put_in_progress = False
|
|
|
|
assert self.message_fetched.is_set()
|
|
self.message_fetched.clear()
|
|
|
|
# put() was unblocked by close() rather than get() or get_iter().
|
|
if self.closed:
|
|
raise EOFError("stream of frames ended")
|
|
|
|
self.decoder = None
|
|
|
|
def close(self) -> None:
|
|
"""
|
|
End the stream of frames.
|
|
|
|
Callling :meth:`close` concurrently with :meth:`get`, :meth:`get_iter`,
|
|
or :meth:`put` is safe. They will raise :exc:`EOFError`.
|
|
|
|
"""
|
|
with self.mutex:
|
|
if self.closed:
|
|
return
|
|
|
|
self.closed = True
|
|
|
|
# Unblock get or get_iter.
|
|
if self.get_in_progress:
|
|
self.message_complete.set()
|
|
if self.chunks_queue is not None:
|
|
self.chunks_queue.put(None)
|
|
|
|
# Unblock put().
|
|
if self.put_in_progress:
|
|
self.message_fetched.set()
|