narupa.utilities.request_queues module

Provides a dictionary of queues.

class narupa.utilities.request_queues.DictOfQueues(queue_max_size=0)

Bases: object

Dictionary of request queues. This class is used by Narupa servers to provide a thread-safe way to publish data to multiple clients using queues.

# A thread working on its own queue
many_queues = DictOfQueues()
request_id = 0
with many_queues.one_queue(request_id) as queue:
    # do stuff

# A thread populating all the queues
many_queues = DictOfQueues(queue_max_size=100)
for queue in many_queue.iter_queue():
    queue.put('something')

Adding or removing a key from the dictionary must be done while holding DictOfQueues.lock. The recommended way to register and unregister a queue is to use the one_queue() context manager.

iter_queues() → Generator[queue.Queue, None, None]

Iterate over the queues.

The method places a lock on the dictionary so no queue can be added or removed while iterating.

iter_queues_items() → Generator[Tuple[Hashable, queue.Queue], None, None]

Iterate over the queues and their keys.

The method places a lock on the dictionary so no queue can be added or removed while iterating.

one_queue(request_id, queue_class=<class 'queue.Queue'>)

Works with a queue.

This method is a context manager that creates and registers the queue, provides the queue to the calling scope, and un-registers the queue when exiting the context.

Parameters:
  • request_id – The key for the queue. This key has to be unique, if a queue is already registered with that key, then a ValueError is raised.
  • queue_class – The class to instantiate for that queue. By default, a Queue is instantiated.
class narupa.utilities.request_queues.GetFrameResponseAggregatingQueue(maxsize=None)

Bases: narupa.utilities.request_queues.SingleItemQueue

SingleItemQueue specifically for GetFrameResponse items. Put items will be aggregated with any existing item so that there is at most one item in the queue at any time.

put(item: narupa.protocol.trajectory.get_frame_pb2.GetFrameResponse, **kwargs)

Store a value, replace the previous one if any.

This method is thread-safe and is meant to be a drop in replacement to Queue.put().

Parameters:
  • item – The value to store.
  • kwargs – Unused arguments for compatibility with Queue.put().
class narupa.utilities.request_queues.SingleItemQueue(maxsize=None)

Bases: object

Mimics the basic interface of a Queue but only stores one item.

get(block=True, timeout=None)

Get the stored value, and remove it from storage.

If there is no value to get, then the method raises an Empty exception.

This method is thread-safe and is meant to be a drop in replacement to Queue.get().

Parameters:
  • block – Whether to wait until a value is available.
  • timeout – Timeout for waiting until a value is available.
Returns:

The stored value.

put(item, **kwargs)

Store a value, replace the previous one if any.

This method is thread-safe and is meant to be a drop in replacement to Queue.put().

Parameters:
  • item – The value to store.
  • kwargs – Unused arguments for compatibility with Queue.put().