narupa.utilities.request_queues module¶
Provides a dictionary of queues.
-
class
narupa.utilities.request_queues.
DictOfQueues
(queue_max_size=0)¶ Bases:
object
Dictionary of request queues. This class is used by Narupa servers to provide a thread-safe way to publish data to multiple clients using queues.
# A thread working on its own queue many_queues = DictOfQueues() request_id = 0 with many_queues.one_queue(request_id) as queue: # do stuff # A thread populating all the queues many_queues = DictOfQueues(queue_max_size=100) for queue in many_queue.iter_queue(): queue.put('something')
Adding or removing a key from the dictionary must be done while holding
DictOfQueues.lock
. The recommended way to register and unregister a queue is to use theone_queue()
context manager.-
iter_queues
() → Generator[queue.Queue, None, None]¶ Iterate over the queues.
The method places a lock on the dictionary so no queue can be added or removed while iterating.
-
iter_queues_items
() → Generator[Tuple[Hashable, queue.Queue], None, None]¶ Iterate over the queues and their keys.
The method places a lock on the dictionary so no queue can be added or removed while iterating.
-
one_queue
(request_id, queue_class=<class 'queue.Queue'>)¶ Works with a queue.
This method is a context manager that creates and registers the queue, provides the queue to the calling scope, and un-registers the queue when exiting the context.
Parameters: - request_id – The key for the queue. This key has to be unique, if
a queue is already registered with that key, then a
ValueError
is raised. - queue_class – The class to instantiate for that queue. By default,
a
Queue
is instantiated.
- request_id – The key for the queue. This key has to be unique, if
a queue is already registered with that key, then a
-
-
class
narupa.utilities.request_queues.
GetFrameResponseAggregatingQueue
(maxsize=None)¶ Bases:
narupa.utilities.request_queues.SingleItemQueue
SingleItemQueue specifically for GetFrameResponse items. Put items will be aggregated with any existing item so that there is at most one item in the queue at any time.
-
put
(item: narupa.protocol.trajectory.get_frame_pb2.GetFrameResponse, **kwargs)¶ Store a value, replace the previous one if any.
This method is thread-safe and is meant to be a drop in replacement to
Queue.put()
.Parameters: - item – The value to store.
- kwargs – Unused arguments for compatibility with
Queue.put()
.
-
-
class
narupa.utilities.request_queues.
SingleItemQueue
(maxsize=None)¶ Bases:
object
Mimics the basic interface of a
Queue
but only stores one item.-
get
(block=True, timeout=None)¶ Get the stored value, and remove it from storage.
If there is no value to get, then the method raises an
Empty
exception.This method is thread-safe and is meant to be a drop in replacement to
Queue.get()
.Parameters: - block – Whether to wait until a value is available.
- timeout – Timeout for waiting until a value is available.
Returns: The stored value.
-
put
(item, **kwargs)¶ Store a value, replace the previous one if any.
This method is thread-safe and is meant to be a drop in replacement to
Queue.put()
.Parameters: - item – The value to store.
- kwargs – Unused arguments for compatibility with
Queue.put()
.
-