# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Lease clients"""
import collections
import enum
import logging
import threading
import time
from bosdyn.api import lease_pb2
from bosdyn.api.lease_pb2 import AcquireLeaseRequest, AcquireLeaseResponse
from bosdyn.api.lease_pb2 import Lease as LeaseProto
from bosdyn.api.lease_pb2 import (LeaseUseResult, ListLeasesRequest, RetainLeaseRequest,
ReturnLeaseRequest, ReturnLeaseResponse, TakeLeaseRequest,
TakeLeaseResponse)
from bosdyn.api.lease_service_pb2_grpc import LeaseServiceStub
from . import common
from .exceptions import Error as BaseError
from .exceptions import ResponseError, RpcError
_LOGGER = logging.getLogger(__name__)
[docs]class LeaseResponseError(ResponseError):
"""General class of errors for LeaseResponseError service."""
[docs]class InvalidLeaseError(LeaseResponseError):
"""The provided lease is invalid."""
[docs]class DisplacedLeaseError(LeaseResponseError):
"""Lease is older than the current lease."""
[docs]class InvalidResourceError(LeaseResponseError):
"""Resource is not known to the LeaseService."""
[docs]class NotAuthoritativeServiceError(LeaseResponseError):
"""LeaseService is not authoritative so Acquire should not work."""
[docs]class ResourceAlreadyClaimedError(LeaseResponseError):
"""Use TakeLease method to forcefully grab the already claimed lease."""
[docs]class RevokedLeaseError(LeaseResponseError):
"""Lease is stale because the lease-holder did not check in regularly enough."""
[docs]class UnmanagedResourceError(LeaseResponseError):
"""LeaseService does not manage this resource."""
[docs]class WrongEpochError(LeaseResponseError):
"""Lease is for the wrong epoch."""
[docs]class NotActiveLeaseError(LeaseResponseError):
"""Lease is not the active lease."""
[docs]class Error(Exception):
"""Base non-response error for lease module."""
[docs]class NoSuchLease(Error):
"""The requested lease does not exist."""
def __init__(self, resource):
self.resource = resource
def __str__(self):
return 'No lease for resource "{}"'.format(self.resource)
[docs]class LeaseNotOwnedByWallet(Error):
"""The lease is not owned by the wallet."""
def __init__(self, resource, lease_state):
self.resource = resource
self.lease_state = lease_state
def __str__(self):
try:
state = self.lease_state.lease_status
except AttributeError:
state = '<unknown>'
return 'Lease on "{}" has state ({})'.format(self.resource, state)
_ACQUIRE_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_ACQUIRE_LEASE_STATUS_TO_ERROR.update({
AcquireLeaseResponse.STATUS_OK: (None, None),
AcquireLeaseResponse.STATUS_RESOURCE_ALREADY_CLAIMED:
(ResourceAlreadyClaimedError, ResourceAlreadyClaimedError.__doc__),
AcquireLeaseResponse.STATUS_INVALID_RESOURCE:
(InvalidResourceError, InvalidResourceError.__doc__),
AcquireLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE:
(NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__),
})
_TAKE_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_TAKE_LEASE_STATUS_TO_ERROR.update({
TakeLeaseResponse.STATUS_OK: (None, None),
TakeLeaseResponse.STATUS_INVALID_RESOURCE: (InvalidResourceError, InvalidResourceError.__doc__),
TakeLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE:
(NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__),
})
_RETURN_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_RETURN_LEASE_STATUS_TO_ERROR.update({
ReturnLeaseResponse.STATUS_OK: (None, None),
ReturnLeaseResponse.STATUS_INVALID_RESOURCE:
(InvalidResourceError, InvalidResourceError.__doc__),
ReturnLeaseResponse.STATUS_NOT_ACTIVE_LEASE: (NotActiveLeaseError, NotActiveLeaseError.__doc__),
ReturnLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE:
(NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__),
})
[docs]class Lease(object):
"""Leases are used to coordinate access to shared resources on a Boston Dynamics robot.
A service will grant access to the shared resource if the lease which accompanies a request is
"more recent" than any previously seen leases. Recency is determined using a sequence of
monotonically increasing numbers, similar to a Lamport logical clock.
Args:
lease_proto: bosdyn.api.Lease protobuf object.
Raises:
ValueError if lease_proto is not present or valid.
"""
def __init__(self, lease_proto, ignore_is_valid_check=False):
if not ignore_is_valid_check and not self.is_valid_proto(lease_proto):
raise ValueError('invalid lease_proto: {}'.format(lease_proto))
self.lease_proto = lease_proto
[docs] class CompareResult(enum.Enum):
"""Enum for comparison results between two leases."""
SAME = 1
SUPER_LEASE = 2
SUB_LEASE = 3
OLDER = 4
NEWER = 5
DIFFERENT_RESOURCES = 6
DIFFERENT_EPOCHS = 7
[docs] def compare(self, other_lease, ignore_resources=False):
"""Compare two different lease objects.
Args:
other_lease: The lease to compare this lease with.
Returns:
* CompareResult.SAME if this lease is exactly the same as other_lease.
* CompareResult.SUPER_LEASE if this lease is a "super-lease" of the other
lease - in other words, the other lease is a sublease of this lease.
* CompareResult.SUB_LEASE if this lease is a "sub-lease" of the other lease.
* CompareResult.OLDER if this lease is older than other_lease.
other_lease may be a sublease of this lease.
* CompareResult.NEWER if this is lease is newer than other_lease. This
lease may be a sublease of other_lease.
* CompareResult.DIFFERENT_RESOURCES if this lease is for a different
resource than other_lease. There is no way to compare recency/time of
Leases for two different resources.
* CompareResult.DIFFERENT_EPOCHS if this lease is for a different
epoch than other_lease. There is no way to compare recency/time of
Leases for two different epochs.
"""
# Sequences are only valid for leases with the same resource and epoch.
if not (self.lease_proto.resource
== other_lease.lease_proto.resource) and not ignore_resources:
return self.CompareResult.DIFFERENT_RESOURCES
if not (self.lease_proto.epoch == other_lease.lease_proto.epoch):
return self.CompareResult.DIFFERENT_EPOCHS
# If any sequence numbers are different within the common subset of sequence lengths, then one
# Lease is newer than the other.
sequence_size = len(self.lease_proto.sequence)
other_sequence_size = len(other_lease.lease_proto.sequence)
common_sequence_size = min(sequence_size, other_sequence_size)
for i in range(common_sequence_size):
sequence_num = self.lease_proto.sequence[i]
other_sequence_num = other_lease.lease_proto.sequence[i]
if sequence_num < other_sequence_num:
return self.CompareResult.OLDER
elif sequence_num > other_sequence_num:
return self.CompareResult.NEWER
# At this point, the sequence numbers are different within the common subset. If one Lease has
# more sequence numbers than the other Lease, it is a sublease of that lease and considered
# newer.
if sequence_size < other_sequence_size:
return self.CompareResult.SUPER_LEASE
elif sequence_size > other_sequence_size:
return self.CompareResult.SUB_LEASE
# Lease are the same
return self.CompareResult.SAME
[docs] def create_newer(self):
"""Creates a new Lease which is newer than this Lease.
Returns:
A new Lease object where self.compare(returned_lease) would return OLDER.
"""
incr_lease_proto = LeaseProto()
incr_lease_proto.CopyFrom(self.lease_proto)
incr_lease_proto.sequence[-1] = self.lease_proto.sequence[-1] + 1
return Lease(incr_lease_proto)
[docs] def create_sublease(self, client_name=None):
"""Creates a sublease of this lease.
Args:
client_name (string): Optional argument to pass a client name to be appended to
the new lease's set of clients which have used it.
Returns:
A new Lease object where self.compare(returned_lease) would return SUB_LEASE.
"""
sub_lease_proto = LeaseProto()
sub_lease_proto.CopyFrom(self.lease_proto)
sub_lease_proto.sequence.append(0)
if client_name is not None:
sub_lease_proto.client_names.append(client_name)
return Lease(sub_lease_proto)
[docs] def is_valid_lease(self):
return Lease.is_valid_proto(self.lease_proto)
[docs] @staticmethod
def is_valid_proto(lease_proto):
"""Checks whether this lease is valid.
Returns:
bool indicating that this lease has a valid resource and sequence.
"""
return lease_proto and lease_proto.resource and lease_proto.sequence
[docs] @staticmethod
def compare_result_to_lease_use_result_status(compare_result, allow_super_leases):
"""Determines the comparable LeaseUseResult.Status enum value based on the CompareResult enum.
Args:
allow_super_leases(boolean): If true, a super lease will still be considered as "ok"/
newer when compared to the active lease.
Raises:
bosdyn.client.lease.Error: Raised if there is an unknown compare result enum value.
Returns:
The corresponding LeaseUseResult.Status enum value.
"""
if compare_result == Lease.CompareResult.DIFFERENT_EPOCHS:
return LeaseUseResult.STATUS_WRONG_EPOCH
elif compare_result == Lease.CompareResult.DIFFERENT_RESOURCES:
# if the incoming lease's resource doesn't match the active lease resource,
# then mark it as unmanaged.
return LeaseUseResult.STATUS_UNMANAGED
elif compare_result == Lease.CompareResult.SUPER_LEASE:
# In some cases we may want to allow a super lease, so check an optional boolean
# to see if that is the case.
if allow_super_leases:
return LeaseUseResult.STATUS_OK
# In the normal case, a super lease is considered older.
return LeaseUseResult.STATUS_OLDER
elif compare_result == Lease.CompareResult.OLDER:
return LeaseUseResult.STATUS_OLDER
elif (compare_result == Lease.CompareResult.SAME or
compare_result == Lease.CompareResult.SUB_LEASE or
compare_result == Lease.CompareResult.NEWER):
return LeaseUseResult.STATUS_OK
else:
# Shouldn't hit here. The above set of checks should be exhaustive.
raise Error("The comparison result of the leases is unknown/unaccounted for.")
[docs]class LeaseState(object):
"""State of lease ownership in the wallet.
Args:
lease_status(LeaseState.Status): The ownership status of the lease.
lease_owner(lease_pb2.LeaseOwner): The name of the owner of the lease.
lease (Lease): The original lease used to initialize the LeaseState, before
applying any subleasing/incrementing.
lease_current(Lease): The newest version of the lease (subleased, and
incremented from the original lease).
client_name(string): The name of the client using this lease.
"""
[docs] class Status(enum.Enum):
UNOWNED = 0
REVOKED = 1
SELF_OWNER = 2
OTHER_OWNER = 3
NOT_MANAGED = 4
# Deprecated. Provided for backwards compatibility.
STATUS_UNOWNED = Status.UNOWNED
STATUS_REVOKED = Status.REVOKED
STATUS_SELF_OWNER = Status.SELF_OWNER
STATUS_OTHER_OWNER = Status.OTHER_OWNER
STATUS_NOT_MANAGED = Status.NOT_MANAGED
def __init__(self, lease_status, lease_owner=None, lease=None, lease_current=None,
client_name=None):
self.lease_status = lease_status
self.lease_owner = lease_owner
self.lease_original = lease
self.client_name = client_name
if lease_current:
self.lease_current = lease_current
elif lease:
self.lease_current = self.lease_original.create_sublease(self.client_name)
else:
self.lease_current = None
[docs] def create_newer(self):
"""Create newer version of the Lease.
Returns:
Instance of itself if lease_current was not passed, or a new LeaseState.
"""
if not self.lease_current:
return self
return LeaseState(self.lease_status, self.lease_owner, self.lease_original,
self.lease_current.create_newer())
[docs] def update_from_lease_use_result(self, lease_use_result):
"""Update internal instance of LeaseState from given lease.
Args:
lease_use_result: LeaseUseResult from the server.
Returns:
Updated internal instance of LeaseState.
"""
if lease_use_result.status == lease_use_result.STATUS_OLDER:
if self.lease_current:
latest_known_lease = Lease(lease_use_result.latest_known_lease)
if latest_known_lease.is_valid_lease():
cmp = self.lease_current.compare(latest_known_lease)
if cmp is Lease.CompareResult.NEWER or cmp is Lease.CompareResult.SAME:
# The attempted lease was older, but the lease in the wallet has been updated
# in the meantime to something that is newer than what the robot has seen, so
# this OLDER result is no longer relevant.
return self
# The lease from the lease wallet was an older lease.
return LeaseState(LeaseState.Status.OTHER_OWNER, lease_owner=lease_use_result.owner)
elif lease_use_result.status == lease_use_result.STATUS_WRONG_EPOCH:
if self.lease_current:
attempted_lease = Lease(lease_use_result.attempted_lease)
if attempted_lease.compare(self.lease_current) is Lease.CompareResult.SAME:
return LeaseState(LeaseState.Status.UNOWNED)
elif lease_use_result.status == lease_use_result.STATUS_REVOKED:
if self.lease_current:
attempted_lease = Lease(lease_use_result.attempted_lease)
if attempted_lease.compare(self.lease_current) is Lease.CompareResult.SAME:
return LeaseState(LeaseState.Status.REVOKED)
# The LeaseState is not modified
return self
_RESOURCE_BODY = 'body'
[docs]class LeaseWallet(object):
"""Thread-safe storage of Leases."""
def __init__(self):
self._lease_state_map = {}
self._lock = threading.Lock()
self.client_name = None
[docs] def add(self, lease):
"""Add lease in the wallet.
Args:
lease: Lease to add in the wallet.
"""
with self._lock:
self._add_lease_locked(lease)
def _add_lease_locked(self, lease, current=False):
resource = lease.lease_proto.resource
self._lease_state_map[resource] = LeaseState(LeaseState.Status.SELF_OWNER, lease=lease,
client_name=self.client_name)
[docs] def remove(self, lease):
"""Remove lease from the wallet.
Args:
lease: Lease to remove from the wallet.
"""
with self._lock:
self._lease_state_map.pop(lease.lease_proto.resource, None)
[docs] def advance(self, resource=_RESOURCE_BODY):
"""Advance the lease for a specific resource.
Args:
resource: The resource that the Lease is for.
Returns:
Advanced lease for the resource.
Raises:
LeaseNotOwnedByWallet: The lease is not owned by the wallet.
"""
with self._lock:
lease_state = self._get_owned_lease_state_locked(resource)
new_lease = lease_state.create_newer()
self._lease_state_map[resource] = new_lease
return new_lease.lease_current
[docs] def get_lease(self, resource=_RESOURCE_BODY):
"""Get the lease for a specific resource.
Args:
resource: The resource that the Lease is for.
Returns:
Lease for the resource.
Raises:
LeaseNotOwnedByWallet: The lease is not owned by the wallet.
"""
with self._lock:
return self._get_owned_lease_state_locked(resource).lease_current
[docs] def get_lease_state(self, resource=_RESOURCE_BODY):
"""Get the lease state for a specific resource.
Args:
resource: The resource that the Lease is for.
Returns:
Lease state for the resource.
Raises:
NoSuchLease: The requested lease does not exist.
"""
with self._lock:
return self._get_lease_state_locked(resource)
def _get_lease_state_locked(self, resource):
"""Get the lease state for a specific resource or raise an NoSuchLease exception if lease
is not found.
Args:
resource: The resource that the Lease is for.
Returns:
Lease state for the resource.
Raises:
NoSuchLease: The requested lease does not exist.
"""
try:
return self._lease_state_map[resource]
except KeyError:
raise NoSuchLease(resource)
def _get_owned_lease_state_locked(self, resource):
"""Get the lease for a specific resource or raise an LeaseNotOwnedByWallet exception if
lease is not found.
Args:
resource: The resource that the Lease is for.
Returns:
Lease state for the resource.
Raises:
LeaseNotOwnedByWallet: The lease is not owned by the wallet.
"""
lease_state = self._get_lease_state_locked(resource)
if lease_state.lease_status != LeaseState.Status.SELF_OWNER:
raise LeaseNotOwnedByWallet(resource, lease_state)
return lease_state
[docs] def on_lease_use_result(self, lease_use_result, resource=None):
"""Update the lease state based on result of using the lease.
Args:
lease_use_result: LeaseUseResult from the server.
resource: Resource to update, e.g. 'body'. Default to None to use the resource specified
by the lease_use_result.
"""
resource = resource or lease_use_result.attempted_lease.resource
with self._lock:
lease_state = self._lease_state_map.get(resource, None)
if not lease_state:
return
new_lease_state = lease_state.update_from_lease_use_result(lease_use_result)
self._lease_state_map[resource] = new_lease_state
[docs] def set_client_name(self, client_name):
"""Set the client name that will be issuing the leases."""
with self._lock:
self.client_name = client_name
[docs]class LeaseClient(common.BaseClient):
"""Client to the lease service.
Args:
lease_wallet: Lease wallet to use.
"""
default_service_name = 'lease'
service_type = 'bosdyn.api.LeaseService'
def __init__(self, lease_wallet=None):
super(LeaseClient, self).__init__(LeaseServiceStub)
self.lease_wallet = lease_wallet
[docs] def acquire(self, resource=_RESOURCE_BODY, **kwargs):
"""Acquire a lease for the given resource.
Args:
resource: Resource for the lease.
Returns:
Acquired Lease object.
Raises:
ResourceAlreadyClaimedError: Use TakeLease method to forcefully grab the already
claimed lease.
InvalidResourceError: Resource is not known to the LeaseService.
NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not work.
"""
req = self._make_acquire_request(resource)
return self.call(self._stub.AcquireLease, req, self._handle_acquire_success,
self._handle_acquire_errors, copy_request=False, **kwargs)
[docs] def acquire_async(self, resource=_RESOURCE_BODY, **kwargs):
"""Async version of acquire() function."""
req = self._make_acquire_request(resource)
return self.call_async(self._stub.AcquireLease, req, self._handle_acquire_success,
self._handle_acquire_errors, copy_request=False, **kwargs)
[docs] def take(self, resource=_RESOURCE_BODY, **kwargs):
"""Take the lease for the given resource.
Args:
resource: Resource for the lease.
Returns:
Taken Lease object.
Raises:
InvalidResourceError: Resource is not known to the LeaseService.
NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not
work.
"""
req = self._make_take_request(resource)
return self.call(self._stub.TakeLease, req, self._handle_acquire_success,
self._handle_take_errors, copy_request=False, **kwargs)
[docs] def take_async(self, resource=_RESOURCE_BODY, **kwargs):
"""Async version of the take() function."""
req = self._make_take_request(resource)
return self.call_async(self._stub.TakeLease, req, self._handle_acquire_success,
self._handle_take_errors, copy_request=False, **kwargs)
[docs] def return_lease(self, lease, **kwargs):
"""Return an acquired lease.
Args:
lease (Lease object): Lease to return. This should be a Lease class object, and not the proto.
Raises:
InvalidResourceError: Resource is not known to the LeaseService.
NotActiveLeaseError: Lease is not the active lease.
NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not
work.
"""
if self.lease_wallet:
self.lease_wallet.remove(lease)
req = self._make_return_request(lease)
return self.call(self._stub.ReturnLease, req, None, self._handle_return_errors,
copy_request=False, **kwargs)
[docs] def return_lease_async(self, lease, **kwargs):
"""Async version of the return_lease() function."""
if self.lease_wallet:
self.lease_wallet.remove(lease)
req = self._make_return_request(lease)
return self.call(self._stub.ReturnLease, req, None, self._handle_return_errors,
copy_request=False, **kwargs)
[docs] def retain_lease(self, lease, **kwargs):
"""Retain the lease.
Args:
lease: Lease to retain.
Raises:
InternalServerError: Service experienced an unexpected error state.
LeaseUseError: Request was rejected due to using an invalid lease.
"""
req = self._make_retain_request(lease)
return self.call(self._stub.RetainLease, req, None, common.common_lease_errors,
copy_request=False, **kwargs)
[docs] def retain_lease_async(self, lease, **kwargs):
"""Async version of the retain_lease() function."""
req = self._make_retain_request(lease)
return self.call_async(self._stub.RetainLease, req, None, common.common_lease_errors,
copy_request=False, **kwargs)
[docs] def list_leases(self, include_full_lease_info=False, **kwargs):
"""Get a list of the leases.
Args:
include_full_lease_info: Whether the returned list of LeaseResources should include
all of the available information about the last lease used.
Defaults to False.
Returns:
List of lease resources.
Raises:
InternalServerError: Service experienced an unexpected error state.
LeaseUseError: Request was rejected due to using an invalid lease.
"""
req = self._make_list_leases_request(include_full_lease_info)
return self.call(self._stub.ListLeases, req, self._list_leases_success,
common.common_header_errors, copy_request=False, **kwargs)
[docs] def list_leases_async(self, include_full_lease_info=False, **kwargs):
"""Async version of the list_leases() function."""
req = self._make_list_leases_request(include_full_lease_info)
return self.call_async(self._stub.ListLeases, req, self._list_leases_success,
common.common_header_errors, copy_request=False, **kwargs)
[docs] def list_leases_full(self, include_full_lease_info=False, **kwargs):
"""Get a list of the leases.
Args:
include_full_lease_info: Whether the returned list of LeaseResources should include
all of the available information about the last lease used.
Defaults to False.
Returns:
The complete ListLeasesResponse message.
Raises:
InternalServerError: Service experienced an unexpected error state.
LeaseUseError: Request was rejected due to using an invalid lease.
"""
req = self._make_list_leases_request(include_full_lease_info)
return self.call(self._stub.ListLeases, req, None, common.common_header_errors,
copy_request=False, **kwargs)
[docs] def list_leases_full_async(self, include_full_lease_info=False, **kwargs):
"""Async version of the list_leases() function."""
req = self._make_list_leases_request(include_full_lease_info)
return self.call_async(self._stub.ListLeases, req, None, common.common_header_errors,
copy_request=False, **kwargs)
@staticmethod
def _make_acquire_request(resource):
"""Return AcquireLeaseRequest message with the given resource."""
return AcquireLeaseRequest(resource=resource)
def _handle_acquire_success(self, response):
"""Return lease in an AcquireLeaseResponse message."""
lease = Lease(response.lease)
if self.lease_wallet:
self.lease_wallet.add(lease)
return lease
@staticmethod
@common.handle_common_header_errors
def _handle_acquire_errors(response):
"""Return a custom exception based on response, None if no error."""
return common.error_factory(response, response.status,
status_to_string=AcquireLeaseResponse.Status.Name,
status_to_error=_ACQUIRE_LEASE_STATUS_TO_ERROR)
@staticmethod
def _make_take_request(resource):
"""Return TakeLeaseRequest message with the given resource."""
return TakeLeaseRequest(resource=resource)
@staticmethod
@common.handle_common_header_errors
def _handle_take_errors(response):
"""Return a custom exception based on response, None if no error."""
return common.error_factory(response, response.status,
status_to_string=TakeLeaseResponse.Status.Name,
status_to_error=_TAKE_LEASE_STATUS_TO_ERROR)
@staticmethod
def _make_return_request(lease):
return ReturnLeaseRequest(lease=lease.lease_proto)
@staticmethod
@common.handle_common_header_errors
def _handle_return_errors(response):
"""Return a custom exception based on response, None if no error."""
return common.error_factory(response, response.status,
status_to_string=ReturnLeaseResponse.Status.Name,
status_to_error=_RETURN_LEASE_STATUS_TO_ERROR)
@staticmethod
def _make_retain_request(lease):
req = RetainLeaseRequest(lease=lease.lease_proto)
return req
@staticmethod
def _make_list_leases_request(include_full_lease_info):
return ListLeasesRequest(include_full_lease_info=include_full_lease_info)
@staticmethod
def _list_leases_success(response):
return response.resources
# Constant to be explicit about using the default resources that a processor was initialized with,
# rather than using None to imply that.
DEFAULT_RESOURCES = object()
[docs]class LeaseWalletRequestProcessor(object):
"""LeaseWalletRequestProcessor adds a lease from a wallet to a request.
Args:
lease_wallet: The LeaseWallet to read leases from.
resource_list: List of resources this processor should add to requests. Default None
to use the default resource.
"""
def __init__(self, lease_wallet, resource_list=None):
self.lease_wallet = lease_wallet
if resource_list is None:
self.resource_list = (_RESOURCE_BODY,)
else:
self.resource_list = resource_list
self.logger = logging.getLogger()
[docs] def mutate(self, request, resource_list=DEFAULT_RESOURCES):
"""Add the leases for the necessary resources if no leases have been specified yet."""
multiple_leases, skip_mutation = self.get_lease_state(request)
if skip_mutation:
return
if resource_list is DEFAULT_RESOURCES:
resource_list = self.resource_list
if multiple_leases and len(resource_list) <= 1:
pass
elif not multiple_leases and len(resource_list) > 1:
self.logger.error('LeaseWalletRequestProcessor assigned multiple leases, '
'but request only wants one')
if multiple_leases:
for resource in resource_list:
lease = self.lease_wallet.advance(resource)
request.leases.add().CopyFrom(lease.lease_proto)
else:
lease = self.lease_wallet.advance(resource_list[0])
request.lease.CopyFrom(lease.lease_proto)
[docs] @staticmethod
def get_lease_state(request):
"""Returns a tuple of ("are there multiple leases in request?", "are they set already?")"""
skip_mutation = False
multiple_leases = None
try:
# ValueError will occur if the request class does not have a field named 'lease'
skip_mutation = request.HasField('lease')
except ValueError:
try:
# AttributeError will occur if the request class does not have a field named 'leases'
skip_mutation = len(request.leases) > 0
except AttributeError:
# If we get here, there's no 'lease' field nor a 'leases' field.
# There are responses that do not have either field, so just return.
skip_mutation = True
else:
multiple_leases = True
else:
# If we get here, there's only a single lease.
multiple_leases = False
return multiple_leases, skip_mutation
[docs]class LeaseWalletResponseProcessor(object):
"""LeaseWalletResponseProcessor updates the wallet with a LeaseUseResult.
Args:
lease_wallet: Lease wallet to use.
"""
def __init__(self, lease_wallet):
self.lease_wallet = lease_wallet
[docs] def mutate(self, response):
"""Update the wallet if a response has a lease_use_result."""
try:
# AttributeError will occur if the response does not have a field named 'lease_use_result'
lease_use_results = [response.lease_use_result]
except AttributeError:
try:
# AttributeError will occur if the request class does not have a field named 'lease_use_results'
lease_use_results = response.lease_use_results
except AttributeError:
# If we get here, there's no 'lease' field nor a 'leases' field for usage results.
# There are responses that do not have either field, so just return.
return
for result in lease_use_results:
self.lease_wallet.on_lease_use_result(result)
[docs]def add_lease_wallet_processors(client, lease_wallet, resource_list=None):
"""Adds LeaseWallet related processors to a gRPC client.
For services which use leases for access control, this does two things:
* Advance the lease from the LeaseWallet and attach to a request.
* Handle the LeaseUseResult from a response and update LeaseWallet.
Args:
* client: BaseClient derived class for a single service.
* lease_wallet: The LeaseWallet to track from, must be non-None.
* resource_list: List of resources these processors should add to requests. Default None
to use a default resource.
"""
client.request_processors.append(LeaseWalletRequestProcessor(lease_wallet, resource_list))
client.response_processors.append(LeaseWalletResponseProcessor(lease_wallet))
[docs]class LeaseKeepAlive(object):
"""LeaseKeepAlive issues lease liveness checks on a background thread.
The robot's lease system expects lease-holders to check in at a regular
cadence. If the check-ins do not happen, the robot will treat it as a
communications loss. Typically this will result in the robot stopping,
powering off, and the lease-holder getting their lease revoked.
Using a LeaseKeepAlive object hides most of the details of issuing the
lease liveness check. Developers can also manage liveness checks directly
by using the retain_lease methods on the LeaseClient object.
Args:
lease_client: The LeaseClient object to issue requests on.
lease_wallet: The LeaseWallet to retrieve current leases from,
and to handle any bad LeaseUseResults from. If not specified,
the lease_client's lease_wallet will be used.
resource: The resource to do liveness checks for.
rpc_interval_seconds: Duration in seconds between liveness checks.
keep_running_cb: If specified, should be a callable object that
returns True if the liveness checks should proceed, False
otherwise. LeaseKeepAlive will invoke keep_running_cb on its
background thread. One example of where this could be used is
in an interactive UI - keep_running_cb could stall or return
False if the UI thread is wedged, which prevents the
application from continuing to keep the Lease alive when it is
no longer in a good state.
on_failure_callback: If specified, this should be a callable function object
which takes the error/exception as an input. The function does not
need to return anything. This function can be used to action on any
failures during the keepalive from the RetainLease RPC.
warnings(bool): Used to determine if the _periodic_check_in function will print lease check-in errors.
must_acquire(bool): If True, exceptions when trying to acquire the lease will not be caught.
return_at_exit(bool): If True, return the lease when shutting down.
"""
def __init__(self, lease_client, lease_wallet=None, resource=_RESOURCE_BODY,
rpc_interval_seconds=2, keep_running_cb=None, host_name="",
on_failure_callback=None, warnings=True, must_acquire=False, return_at_exit=False):
"""Create a new LeaseKeepAlive object."""
self.host_name = host_name
self.print_warnings = warnings
self._return_at_exit = return_at_exit
if not lease_client:
raise ValueError("lease_client must be set")
self._lease_client = lease_client
if not lease_wallet:
lease_wallet = lease_client.lease_wallet
if not lease_wallet:
raise ValueError("lease_wallet must be set")
self._lease_wallet = lease_wallet
if not resource:
raise ValueError("resource must be set")
self._resource = resource
# If we don't have the lease, acquire it.
try:
self._lease_wallet.get_lease(self._resource)
except Error:
try:
self._lease_client.acquire(self._resource)
except BaseError as exc:
if must_acquire:
raise
_LOGGER.error('Failed to acquire the lease in LeaseKeepAlive: %s', exc)
if rpc_interval_seconds <= 0.0:
raise ValueError("rpc_interval_seconds must be > 0, was %f" % rpc_interval_seconds)
self._rpc_interval_seconds = rpc_interval_seconds
self.logger = logging.getLogger()
self._keep_running = keep_running_cb or (lambda: True)
self._end_check_in_signal = threading.Event()
# If the on_failure_callback is not provided, then set the default as a no-op function.
self._retain_lease_failed_cb = on_failure_callback or (lambda err: None)
# Configure the thread to do check-ins, and begin checking in.
self._thread = threading.Thread(target=self._periodic_check_in)
self._thread.daemon = True
self._thread.start()
[docs] def shutdown(self):
"""Shut the background thread down and stop the liveness checks.
Can be called multiple times, but subsequent calls are no-ops.
Blocks until the background thread completes.
"""
self.logger.debug('Shutting down')
self._end_periodic_check_in()
self.wait_until_done()
if self._return_at_exit:
try:
self._lease_client.return_lease(self.lease_wallet.get_lease(self._resource),
timeout=2)
except (LeaseResponseError, NoSuchLease, LeaseNotOwnedByWallet):
pass # These all mean that we don't own the lease anymore, which is fine.
except RpcError as exc:
_LOGGER.error('Failed to return the lease at the end: %s', exc)
[docs] def is_alive(self):
return self._thread.is_alive()
@property
def lease_wallet(self):
return self._lease_wallet
[docs] def wait_until_done(self):
"""Waits until the background thread exits.
Most client code should exit the background thread by using shutdown
or by passing in a keep_running_cb callback in the constructor.
However, this can be useful in unit tests for ensuring exits.
"""
self._thread.join()
def _end_periodic_check_in(self):
"""Stop checking into the Lease system."""
self.logger.debug('Stopping check-in')
self._end_check_in_signal.set()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown()
def _ok(self):
self.logger.debug('Check-in successful')
def _check_in(self):
"""Retain lease associated with the resource in this class."""
lease = self._lease_wallet.get_lease(self._resource)
if not lease:
return None
return self._lease_client.retain_lease(lease)
def _periodic_check_in(self):
"""Periodically check in and retain the lease associated with the resource in this class."""
self.logger.info('Starting lease check-in')
while True:
# Include the time it takes to execute keep_running, in case it takes a significant
# portion of our check in period.
exec_start = time.time()
# Stop doing retention if this is not meant to keep running.
if not self._keep_running():
break
try:
self._check_in()
# We really do want to catch anything.
#pylint: disable=broad-except
except Exception as exc:
if self.print_warnings:
self.logger.warning(
'Generic exception for %s during check-in:\n%s\n'
' (resuming check-in)', self.host_name, exc)
self._retain_lease_failed_cb(exc)
else:
# No errors!
self._ok()
# How long did the RPC and processing of said RPC take?
exec_seconds = time.time() - exec_start
# Block and wait for the stop signal. If we receive it within the check-in period,
# leave the loop. This check must be at the end of the loop!
# Wait up to self._check_in_period seconds, minus the RPC processing time.
# (values < 0 are OK and will return immediately)
if self._end_check_in_signal.wait(self._rpc_interval_seconds - exec_seconds):
break
self.logger.info('Lease check-in stopped')
[docs]def test_active_lease(incoming_lease_proto, active_lease, sublease_name=None,
allow_super_leases=False):
"""Check if an incoming lease is newer than the current lease.
Args:
incoming_lease_proto(lease_pb2.Lease): The incoming lease proto.
active_lease(Lease): A lease object representing the most recent/newest known lease
that the incoming lease should be compared against.
sublease_name(string): If not NoneType, a sublease of the incoming lease will be
created (with sublease_name as the client name) and used to compare to
the active lease.
allow_super_leases(boolean): If true, a super lease will still be considered as "ok"/
newer when compared to the active lease.
Returns:
A tuple containing the lease use result from comparing the incoming and active leases, and
then a Lease object made from the incoming lease proto. The lease object will be None if
the incoming lease proto was invalid. The lease object will be a sublease of the incoming
lease proto if sublease_name is not NoneType.
"""
lease_use_result = LeaseUseResult()
lease_use_result.attempted_lease.CopyFrom(incoming_lease_proto)
# Ensure the incoming lease is valid. If it is valid, create a sublease for the
# incoming lease and test with that.
try:
incoming_lease = Lease(incoming_lease_proto)
if sublease_name is not None:
incoming_lease = incoming_lease.create_sublease(client_name=sublease_name)
except ValueError:
# Invalid lease proto will throw this error.
lease_use_result.status = LeaseUseResult.STATUS_INVALID_LEASE
return lease_use_result, None
if active_lease is None:
# No active lease means that the incoming lease is good!
# Set the incoming lease proto as the latest known lease. There will be no previous
# lease since this is the first one for the resource.
lease_use_result.latest_known_lease.CopyFrom(incoming_lease.lease_proto)
lease_use_result.status = LeaseUseResult.STATUS_OK
return lease_use_result, incoming_lease
# Ensure the active lease is also valid.
if not active_lease.is_valid_lease():
# Raise an exception since the incoming lease is invalid.
raise Error("The active lease object is invalid.")
# Set the previous lease as the active lease.
lease_use_result.previous_lease.CopyFrom(active_lease.lease_proto)
# Set the latest known lease as the active lease. This will be overwritten if the incoming
# lease is found to be newer.
lease_use_result.latest_known_lease.CopyFrom(active_lease.lease_proto)
# Compare the incoming lease's sublease to the latest known/most recent lease (active lease).
compare_result = incoming_lease.compare(active_lease)
lease_use_result.status = Lease.compare_result_to_lease_use_result_status(
compare_result, allow_super_leases)
if lease_use_result.status == LeaseUseResult.STATUS_OK and compare_result != Lease.CompareResult.SUPER_LEASE:
# Only update the latest known lease if the incoming lease was found as status ok (newer/the same
# as the existing lease). Also prevents a "super-lease" from getting set as the
# latest known lease (when the allow_super_lease boolean is True) when there have been newer
# subleases seen.
lease_use_result.latest_known_lease.CopyFrom(incoming_lease.lease_proto)
return lease_use_result, incoming_lease