narupa.state.state_service module

Module providing an implementation of the StateServicer.

class narupa.state.state_service.StateService

Bases: narupa.protocol.state.state_service_pb2_grpc.StateServicer

Implementation of the State service, for tracking and making changes to a shared key/value store.

SubscribeStateUpdates(request: narupa.protocol.state.state_service_pb2.SubscribeStateUpdatesRequest, context) → Iterable[narupa.protocol.state.state_service_pb2.StateUpdate]

Provides a stream of updates to a shared key/value store.

UpdateLocks(request: narupa.protocol.state.state_service_pb2.UpdateLocksRequest, context) → narupa.protocol.state.state_service_pb2.UpdateLocksResponse

Attempts to acquire and release locks on keys in the shared key/value store. If any of the locks cannot be acquired, none of them will be.

UpdateState(request: narupa.protocol.state.state_service_pb2.UpdateStateRequest, context) → narupa.protocol.state.state_service_pb2.UpdateStateResponse

Attempts an atomic update of the shared key/value store. If any key cannot be updates, no change will be made.

close()
copy_state() → Dict[str, Union[None, str, int, float, bool, Iterable[Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]], Mapping[str, Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]]]]

Return a deep copy of the current state.

get_change_buffer() → AbstractContextManager[narupa.utilities.change_buffers.DictionaryChangeBuffer]

Return a DictionaryChangeBuffer that tracks changes to this service’s state.

lock_state() → AbstractContextManager[Dict[str, Union[None, str, int, float, bool, Iterable[Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]], Mapping[str, Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]]]]]

Context manager for reading the current state while delaying any changes to it.

update_locks(access_token: Union[None, str, int, float, bool, Iterable[Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]], Mapping[str, Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]]], acquire: Optional[Dict[str, float]] = None, release: Optional[Set[str]] = None)

Attempts to acquire and release locks on keys in the shared key/value store. If any of the locks cannot be acquired, none of them will be. Requested lock releases are carried out regardless.

Raises:ResourceLockedError – if the access token cannot acquire all requested keys.
update_state(access_token: Union[None, str, int, float, bool, Iterable[Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]], Mapping[str, Union[None, str, int, float, bool, Iterable[Any], Mapping[str, Any]]]], change: narupa.utilities.change_buffers.DictionaryChange)

Attempts an atomic update of the shared key/value store. If any key cannot be updated, no change will be made.

Raises:
  • ResourceLockedError – if the access token cannot acquire all keys for updating.
  • TypeError – if the update values cannot be serialized for transmission.
narupa.state.state_service.dictionary_change_to_state_update(change: narupa.utilities.change_buffers.DictionaryChange) → narupa.protocol.state.state_service_pb2.StateUpdate

Convert a DictionaryChange to a protobuf StateUpdate.

Parameters:change – a DictionaryChange which species key changes and key removals to make to a dictionary.
Returns:an equivalent protobuf StateUpdate representing the key removals as key changes to a protobuf null value.
narupa.state.state_service.locks_update_to_acquire_release(update: narupa.protocol.state.state_service_pb2.UpdateLocksRequest) → Tuple[Dict[str, float], Set[str]]

Convert a grpc UpdateLocksRequest to a tuple of lock times and locked keys to release.

narupa.state.state_service.state_update_to_dictionary_change(update: narupa.protocol.state.state_service_pb2.StateUpdate) → narupa.utilities.change_buffers.DictionaryChange

Convert a protobuf StateUpdate to a DictionaryChange.

Parameters:update – a protobuf StateUpdate which encodes key removals as keys with a protobuf null value.
Returns:an equivalent DictionaryChange representing the key changes and key removals of the StateUpdate.
narupa.state.state_service.validate_dict_is_serializable(dictionary)
Raises:TypeError – if the given dictionary cannot be converted to a protobuf struct.