Source code for bosdyn.client.lease

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Lease clients"""

import collections
import enum
import logging
import threading
import time

from bosdyn.api import lease_pb2
from bosdyn.api.lease_pb2 import AcquireLeaseRequest, AcquireLeaseResponse
from bosdyn.api.lease_pb2 import Lease as LeaseProto
from bosdyn.api.lease_pb2 import (LeaseUseResult, ListLeasesRequest, RetainLeaseRequest,
                                  ReturnLeaseRequest, ReturnLeaseResponse, TakeLeaseRequest,
                                  TakeLeaseResponse)
from bosdyn.api.lease_service_pb2_grpc import LeaseServiceStub

from . import common
from .exceptions import Error as BaseError
from .exceptions import ResponseError, RpcError

_LOGGER = logging.getLogger(__name__)


[docs]class LeaseResponseError(ResponseError): """General class of errors for LeaseResponseError service."""
[docs]class InvalidLeaseError(LeaseResponseError): """The provided lease is invalid."""
[docs]class DisplacedLeaseError(LeaseResponseError): """Lease is older than the current lease."""
[docs]class InvalidResourceError(LeaseResponseError): """Resource is not known to the LeaseService."""
[docs]class NotAuthoritativeServiceError(LeaseResponseError): """LeaseService is not authoritative so Acquire should not work."""
[docs]class ResourceAlreadyClaimedError(LeaseResponseError): """Use TakeLease method to forcefully grab the already claimed lease."""
[docs]class RevokedLeaseError(LeaseResponseError): """Lease is stale because the lease-holder did not check in regularly enough."""
[docs]class UnmanagedResourceError(LeaseResponseError): """LeaseService does not manage this resource."""
[docs]class WrongEpochError(LeaseResponseError): """Lease is for the wrong epoch."""
[docs]class NotActiveLeaseError(LeaseResponseError): """Lease is not the active lease."""
[docs]class Error(Exception): """Base non-response error for lease module."""
[docs]class NoSuchLease(Error): """The requested lease does not exist.""" def __init__(self, resource): self.resource = resource def __str__(self): return 'No lease for resource "{}"'.format(self.resource)
[docs]class LeaseNotOwnedByWallet(Error): """The lease is not owned by the wallet.""" def __init__(self, resource, lease_state): self.resource = resource self.lease_state = lease_state def __str__(self): try: state = self.lease_state.lease_status except AttributeError: state = '<unknown>' return 'Lease on "{}" has state ({})'.format(self.resource, state)
_ACQUIRE_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _ACQUIRE_LEASE_STATUS_TO_ERROR.update({ AcquireLeaseResponse.STATUS_OK: (None, None), AcquireLeaseResponse.STATUS_RESOURCE_ALREADY_CLAIMED: (ResourceAlreadyClaimedError, ResourceAlreadyClaimedError.__doc__), AcquireLeaseResponse.STATUS_INVALID_RESOURCE: (InvalidResourceError, InvalidResourceError.__doc__), AcquireLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE: (NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__), }) _TAKE_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _TAKE_LEASE_STATUS_TO_ERROR.update({ TakeLeaseResponse.STATUS_OK: (None, None), TakeLeaseResponse.STATUS_INVALID_RESOURCE: (InvalidResourceError, InvalidResourceError.__doc__), TakeLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE: (NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__), }) _RETURN_LEASE_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None)) _RETURN_LEASE_STATUS_TO_ERROR.update({ ReturnLeaseResponse.STATUS_OK: (None, None), ReturnLeaseResponse.STATUS_INVALID_RESOURCE: (InvalidResourceError, InvalidResourceError.__doc__), ReturnLeaseResponse.STATUS_NOT_ACTIVE_LEASE: (NotActiveLeaseError, NotActiveLeaseError.__doc__), ReturnLeaseResponse.STATUS_NOT_AUTHORITATIVE_SERVICE: (NotAuthoritativeServiceError, NotAuthoritativeServiceError.__doc__), })
[docs]class Lease(object): """Leases are used to coordinate access to shared resources on a Boston Dynamics robot. A service will grant access to the shared resource if the lease which accompanies a request is "more recent" than any previously seen leases. Recency is determined using a sequence of monotonically increasing numbers, similar to a Lamport logical clock. Args: lease_proto: bosdyn.api.Lease protobuf object. Raises: ValueError if lease_proto is not present or valid. """ def __init__(self, lease_proto, ignore_is_valid_check=False): if not ignore_is_valid_check and not self.is_valid_proto(lease_proto): raise ValueError('invalid lease_proto: {}'.format(lease_proto)) self.lease_proto = lease_proto
[docs] class CompareResult(enum.Enum): """Enum for comparison results between two leases.""" SAME = 1 SUPER_LEASE = 2 SUB_LEASE = 3 OLDER = 4 NEWER = 5 DIFFERENT_RESOURCES = 6 DIFFERENT_EPOCHS = 7
[docs] def compare(self, other_lease, ignore_resources=False): """Compare two different lease objects. Args: other_lease: The lease to compare this lease with. Returns: * CompareResult.SAME if this lease is exactly the same as other_lease. * CompareResult.SUPER_LEASE if this lease is a "super-lease" of the other lease - in other words, the other lease is a sublease of this lease. * CompareResult.SUB_LEASE if this lease is a "sub-lease" of the other lease. * CompareResult.OLDER if this lease is older than other_lease. other_lease may be a sublease of this lease. * CompareResult.NEWER if this is lease is newer than other_lease. This lease may be a sublease of other_lease. * CompareResult.DIFFERENT_RESOURCES if this lease is for a different resource than other_lease. There is no way to compare recency/time of Leases for two different resources. * CompareResult.DIFFERENT_EPOCHS if this lease is for a different epoch than other_lease. There is no way to compare recency/time of Leases for two different epochs. """ # Sequences are only valid for leases with the same resource and epoch. if not (self.lease_proto.resource == other_lease.lease_proto.resource) and not ignore_resources: return self.CompareResult.DIFFERENT_RESOURCES if not (self.lease_proto.epoch == other_lease.lease_proto.epoch): return self.CompareResult.DIFFERENT_EPOCHS # If any sequence numbers are different within the common subset of sequence lengths, then one # Lease is newer than the other. sequence_size = len(self.lease_proto.sequence) other_sequence_size = len(other_lease.lease_proto.sequence) common_sequence_size = min(sequence_size, other_sequence_size) for i in range(common_sequence_size): sequence_num = self.lease_proto.sequence[i] other_sequence_num = other_lease.lease_proto.sequence[i] if sequence_num < other_sequence_num: return self.CompareResult.OLDER elif sequence_num > other_sequence_num: return self.CompareResult.NEWER # At this point, the sequence numbers are different within the common subset. If one Lease has # more sequence numbers than the other Lease, it is a sublease of that lease and considered # newer. if sequence_size < other_sequence_size: return self.CompareResult.SUPER_LEASE elif sequence_size > other_sequence_size: return self.CompareResult.SUB_LEASE # Lease are the same return self.CompareResult.SAME
[docs] def create_newer(self): """Creates a new Lease which is newer than this Lease. Returns: A new Lease object where self.compare(returned_lease) would return OLDER. """ incr_lease_proto = LeaseProto() incr_lease_proto.CopyFrom(self.lease_proto) incr_lease_proto.sequence[-1] = self.lease_proto.sequence[-1] + 1 return Lease(incr_lease_proto)
[docs] def create_sublease(self, client_name=None): """Creates a sublease of this lease. Args: client_name (string): Optional argument to pass a client name to be appended to the new lease's set of clients which have used it. Returns: A new Lease object where self.compare(returned_lease) would return SUB_LEASE. """ sub_lease_proto = LeaseProto() sub_lease_proto.CopyFrom(self.lease_proto) sub_lease_proto.sequence.append(0) if client_name is not None: sub_lease_proto.client_names.append(client_name) return Lease(sub_lease_proto)
[docs] def is_valid_lease(self): return Lease.is_valid_proto(self.lease_proto)
[docs] @staticmethod def is_valid_proto(lease_proto): """Checks whether this lease is valid. Returns: bool indicating that this lease has a valid resource and sequence. """ return lease_proto and lease_proto.resource and lease_proto.sequence
[docs] @staticmethod def compare_result_to_lease_use_result_status(compare_result, allow_super_leases): """Determines the comparable LeaseUseResult.Status enum value based on the CompareResult enum. Args: allow_super_leases(boolean): If true, a super lease will still be considered as "ok"/ newer when compared to the active lease. Raises: bosdyn.client.lease.Error: Raised if there is an unknown compare result enum value. Returns: The corresponding LeaseUseResult.Status enum value. """ if compare_result == Lease.CompareResult.DIFFERENT_EPOCHS: return LeaseUseResult.STATUS_WRONG_EPOCH elif compare_result == Lease.CompareResult.DIFFERENT_RESOURCES: # if the incoming lease's resource doesn't match the active lease resource, # then mark it as unmanaged. return LeaseUseResult.STATUS_UNMANAGED elif compare_result == Lease.CompareResult.SUPER_LEASE: # In some cases we may want to allow a super lease, so check an optional boolean # to see if that is the case. if allow_super_leases: return LeaseUseResult.STATUS_OK # In the normal case, a super lease is considered older. return LeaseUseResult.STATUS_OLDER elif compare_result == Lease.CompareResult.OLDER: return LeaseUseResult.STATUS_OLDER elif (compare_result == Lease.CompareResult.SAME or compare_result == Lease.CompareResult.SUB_LEASE or compare_result == Lease.CompareResult.NEWER): return LeaseUseResult.STATUS_OK else: # Shouldn't hit here. The above set of checks should be exhaustive. raise Error("The comparison result of the leases is unknown/unaccounted for.")
[docs]class LeaseState(object): """State of lease ownership in the wallet. Args: lease_status(LeaseState.Status): The ownership status of the lease. lease_owner(lease_pb2.LeaseOwner): The name of the owner of the lease. lease (Lease): The original lease used to initialize the LeaseState, before applying any subleasing/incrementing. lease_current(Lease): The newest version of the lease (subleased, and incremented from the original lease). client_name(string): The name of the client using this lease. """
[docs] class Status(enum.Enum): UNOWNED = 0 REVOKED = 1 SELF_OWNER = 2 OTHER_OWNER = 3 NOT_MANAGED = 4
# Deprecated. Provided for backwards compatibility. STATUS_UNOWNED = Status.UNOWNED STATUS_REVOKED = Status.REVOKED STATUS_SELF_OWNER = Status.SELF_OWNER STATUS_OTHER_OWNER = Status.OTHER_OWNER STATUS_NOT_MANAGED = Status.NOT_MANAGED def __init__(self, lease_status, lease_owner=None, lease=None, lease_current=None, client_name=None): self.lease_status = lease_status self.lease_owner = lease_owner self.lease_original = lease self.client_name = client_name if lease_current: self.lease_current = lease_current elif lease: self.lease_current = self.lease_original.create_sublease(self.client_name) else: self.lease_current = None
[docs] def create_newer(self): """Create newer version of the Lease. Returns: Instance of itself if lease_current was not passed, or a new LeaseState. """ if not self.lease_current: return self return LeaseState(self.lease_status, self.lease_owner, self.lease_original, self.lease_current.create_newer())
[docs] def update_from_lease_use_result(self, lease_use_result): """Update internal instance of LeaseState from given lease. Args: lease_use_result: LeaseUseResult from the server. Returns: Updated internal instance of LeaseState. """ if lease_use_result.status == lease_use_result.STATUS_OLDER: if self.lease_current: latest_known_lease = Lease(lease_use_result.latest_known_lease) if latest_known_lease.is_valid_lease(): cmp = self.lease_current.compare(latest_known_lease) if cmp is Lease.CompareResult.NEWER or cmp is Lease.CompareResult.SAME: # The attempted lease was older, but the lease in the wallet has been updated # in the meantime to something that is newer than what the robot has seen, so # this OLDER result is no longer relevant. return self # The lease from the lease wallet was an older lease. return LeaseState(LeaseState.Status.OTHER_OWNER, lease_owner=lease_use_result.owner) elif lease_use_result.status == lease_use_result.STATUS_WRONG_EPOCH: if self.lease_current: attempted_lease = Lease(lease_use_result.attempted_lease) if attempted_lease.compare(self.lease_current) is Lease.CompareResult.SAME: return LeaseState(LeaseState.Status.UNOWNED) elif lease_use_result.status == lease_use_result.STATUS_REVOKED: if self.lease_current: attempted_lease = Lease(lease_use_result.attempted_lease) if attempted_lease.compare(self.lease_current) is Lease.CompareResult.SAME: return LeaseState(LeaseState.Status.REVOKED) # The LeaseState is not modified return self
_RESOURCE_BODY = 'body'
[docs]class LeaseWallet(object): """Thread-safe storage of Leases.""" def __init__(self): self._lease_state_map = {} self._lock = threading.Lock() self.client_name = None
[docs] def add(self, lease): """Add lease in the wallet. Args: lease: Lease to add in the wallet. """ with self._lock: self._add_lease_locked(lease)
def _add_lease_locked(self, lease, current=False): resource = lease.lease_proto.resource self._lease_state_map[resource] = LeaseState(LeaseState.Status.SELF_OWNER, lease=lease, client_name=self.client_name)
[docs] def remove(self, lease): """Remove lease from the wallet. Args: lease: Lease to remove from the wallet. """ with self._lock: self._lease_state_map.pop(lease.lease_proto.resource, None)
[docs] def advance(self, resource=_RESOURCE_BODY): """Advance the lease for a specific resource. Args: resource: The resource that the Lease is for. Returns: Advanced lease for the resource. Raises: LeaseNotOwnedByWallet: The lease is not owned by the wallet. """ with self._lock: lease_state = self._get_owned_lease_state_locked(resource) new_lease = lease_state.create_newer() self._lease_state_map[resource] = new_lease return new_lease.lease_current
[docs] def get_lease(self, resource=_RESOURCE_BODY): """Get the lease for a specific resource. Args: resource: The resource that the Lease is for. Returns: Lease for the resource. Raises: LeaseNotOwnedByWallet: The lease is not owned by the wallet. """ with self._lock: return self._get_owned_lease_state_locked(resource).lease_current
[docs] def get_lease_state(self, resource=_RESOURCE_BODY): """Get the lease state for a specific resource. Args: resource: The resource that the Lease is for. Returns: Lease state for the resource. Raises: NoSuchLease: The requested lease does not exist. """ with self._lock: return self._get_lease_state_locked(resource)
def _get_lease_state_locked(self, resource): """Get the lease state for a specific resource or raise an NoSuchLease exception if lease is not found. Args: resource: The resource that the Lease is for. Returns: Lease state for the resource. Raises: NoSuchLease: The requested lease does not exist. """ try: return self._lease_state_map[resource] except KeyError: raise NoSuchLease(resource) def _get_owned_lease_state_locked(self, resource): """Get the lease for a specific resource or raise an LeaseNotOwnedByWallet exception if lease is not found. Args: resource: The resource that the Lease is for. Returns: Lease state for the resource. Raises: LeaseNotOwnedByWallet: The lease is not owned by the wallet. """ lease_state = self._get_lease_state_locked(resource) if lease_state.lease_status != LeaseState.Status.SELF_OWNER: raise LeaseNotOwnedByWallet(resource, lease_state) return lease_state
[docs] def on_lease_use_result(self, lease_use_result, resource=None): """Update the lease state based on result of using the lease. Args: lease_use_result: LeaseUseResult from the server. resource: Resource to update, e.g. 'body'. Default to None to use the resource specified by the lease_use_result. """ resource = resource or lease_use_result.attempted_lease.resource with self._lock: lease_state = self._lease_state_map.get(resource, None) if not lease_state: return new_lease_state = lease_state.update_from_lease_use_result(lease_use_result) self._lease_state_map[resource] = new_lease_state
[docs] def set_client_name(self, client_name): """Set the client name that will be issuing the leases.""" with self._lock: self.client_name = client_name
[docs]class LeaseClient(common.BaseClient): """Client to the lease service. Args: lease_wallet: Lease wallet to use. """ default_service_name = 'lease' service_type = 'bosdyn.api.LeaseService' def __init__(self, lease_wallet=None): super(LeaseClient, self).__init__(LeaseServiceStub) self.lease_wallet = lease_wallet
[docs] def acquire(self, resource=_RESOURCE_BODY, **kwargs): """Acquire a lease for the given resource. Args: resource: Resource for the lease. Returns: Acquired Lease object. Raises: ResourceAlreadyClaimedError: Use TakeLease method to forcefully grab the already claimed lease. InvalidResourceError: Resource is not known to the LeaseService. NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not work. """ req = self._make_acquire_request(resource) return self.call(self._stub.AcquireLease, req, self._handle_acquire_success, self._handle_acquire_errors, copy_request=False, **kwargs)
[docs] def acquire_async(self, resource=_RESOURCE_BODY, **kwargs): """Async version of acquire() function.""" req = self._make_acquire_request(resource) return self.call_async(self._stub.AcquireLease, req, self._handle_acquire_success, self._handle_acquire_errors, copy_request=False, **kwargs)
[docs] def take(self, resource=_RESOURCE_BODY, **kwargs): """Take the lease for the given resource. Args: resource: Resource for the lease. Returns: Taken Lease object. Raises: InvalidResourceError: Resource is not known to the LeaseService. NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not work. """ req = self._make_take_request(resource) return self.call(self._stub.TakeLease, req, self._handle_acquire_success, self._handle_take_errors, copy_request=False, **kwargs)
[docs] def take_async(self, resource=_RESOURCE_BODY, **kwargs): """Async version of the take() function.""" req = self._make_take_request(resource) return self.call_async(self._stub.TakeLease, req, self._handle_acquire_success, self._handle_take_errors, copy_request=False, **kwargs)
[docs] def return_lease(self, lease, **kwargs): """Return an acquired lease. Args: lease (Lease object): Lease to return. This should be a Lease class object, and not the proto. Raises: InvalidResourceError: Resource is not known to the LeaseService. NotActiveLeaseError: Lease is not the active lease. NotAuthoritativeServiceError: LeaseService is not authoritative so Acquire should not work. """ if self.lease_wallet: self.lease_wallet.remove(lease) req = self._make_return_request(lease) return self.call(self._stub.ReturnLease, req, None, self._handle_return_errors, copy_request=False, **kwargs)
[docs] def return_lease_async(self, lease, **kwargs): """Async version of the return_lease() function.""" if self.lease_wallet: self.lease_wallet.remove(lease) req = self._make_return_request(lease) return self.call(self._stub.ReturnLease, req, None, self._handle_return_errors, copy_request=False, **kwargs)
[docs] def retain_lease(self, lease, **kwargs): """Retain the lease. Args: lease: Lease to retain. Raises: InternalServerError: Service experienced an unexpected error state. LeaseUseError: Request was rejected due to using an invalid lease. """ req = self._make_retain_request(lease) return self.call(self._stub.RetainLease, req, None, common.common_lease_errors, copy_request=False, **kwargs)
[docs] def retain_lease_async(self, lease, **kwargs): """Async version of the retain_lease() function.""" req = self._make_retain_request(lease) return self.call_async(self._stub.RetainLease, req, None, common.common_lease_errors, copy_request=False, **kwargs)
[docs] def list_leases(self, include_full_lease_info=False, **kwargs): """Get a list of the leases. Args: include_full_lease_info: Whether the returned list of LeaseResources should include all of the available information about the last lease used. Defaults to False. Returns: List of lease resources. Raises: InternalServerError: Service experienced an unexpected error state. LeaseUseError: Request was rejected due to using an invalid lease. """ req = self._make_list_leases_request(include_full_lease_info) return self.call(self._stub.ListLeases, req, self._list_leases_success, common.common_header_errors, copy_request=False, **kwargs)
[docs] def list_leases_async(self, include_full_lease_info=False, **kwargs): """Async version of the list_leases() function.""" req = self._make_list_leases_request(include_full_lease_info) return self.call_async(self._stub.ListLeases, req, self._list_leases_success, common.common_header_errors, copy_request=False, **kwargs)
[docs] def list_leases_full(self, include_full_lease_info=False, **kwargs): """Get a list of the leases. Args: include_full_lease_info: Whether the returned list of LeaseResources should include all of the available information about the last lease used. Defaults to False. Returns: The complete ListLeasesResponse message. Raises: InternalServerError: Service experienced an unexpected error state. LeaseUseError: Request was rejected due to using an invalid lease. """ req = self._make_list_leases_request(include_full_lease_info) return self.call(self._stub.ListLeases, req, None, common.common_header_errors, copy_request=False, **kwargs)
[docs] def list_leases_full_async(self, include_full_lease_info=False, **kwargs): """Async version of the list_leases() function.""" req = self._make_list_leases_request(include_full_lease_info) return self.call_async(self._stub.ListLeases, req, None, common.common_header_errors, copy_request=False, **kwargs)
@staticmethod def _make_acquire_request(resource): """Return AcquireLeaseRequest message with the given resource.""" return AcquireLeaseRequest(resource=resource) def _handle_acquire_success(self, response): """Return lease in an AcquireLeaseResponse message.""" lease = Lease(response.lease) if self.lease_wallet: self.lease_wallet.add(lease) return lease @staticmethod @common.handle_common_header_errors def _handle_acquire_errors(response): """Return a custom exception based on response, None if no error.""" return common.error_factory(response, response.status, status_to_string=AcquireLeaseResponse.Status.Name, status_to_error=_ACQUIRE_LEASE_STATUS_TO_ERROR) @staticmethod def _make_take_request(resource): """Return TakeLeaseRequest message with the given resource.""" return TakeLeaseRequest(resource=resource) @staticmethod @common.handle_common_header_errors def _handle_take_errors(response): """Return a custom exception based on response, None if no error.""" return common.error_factory(response, response.status, status_to_string=TakeLeaseResponse.Status.Name, status_to_error=_TAKE_LEASE_STATUS_TO_ERROR) @staticmethod def _make_return_request(lease): return ReturnLeaseRequest(lease=lease.lease_proto) @staticmethod @common.handle_common_header_errors def _handle_return_errors(response): """Return a custom exception based on response, None if no error.""" return common.error_factory(response, response.status, status_to_string=ReturnLeaseResponse.Status.Name, status_to_error=_RETURN_LEASE_STATUS_TO_ERROR) @staticmethod def _make_retain_request(lease): req = RetainLeaseRequest(lease=lease.lease_proto) return req @staticmethod def _make_list_leases_request(include_full_lease_info): return ListLeasesRequest(include_full_lease_info=include_full_lease_info) @staticmethod def _list_leases_success(response): return response.resources
# Constant to be explicit about using the default resources that a processor was initialized with, # rather than using None to imply that. DEFAULT_RESOURCES = object()
[docs]class LeaseWalletRequestProcessor(object): """LeaseWalletRequestProcessor adds a lease from a wallet to a request. Args: lease_wallet: The LeaseWallet to read leases from. resource_list: List of resources this processor should add to requests. Default None to use the default resource. """ def __init__(self, lease_wallet, resource_list=None): self.lease_wallet = lease_wallet if resource_list is None: self.resource_list = (_RESOURCE_BODY,) else: self.resource_list = resource_list self.logger = logging.getLogger()
[docs] def mutate(self, request, resource_list=DEFAULT_RESOURCES): """Add the leases for the necessary resources if no leases have been specified yet.""" multiple_leases, skip_mutation = self.get_lease_state(request) if skip_mutation: return if resource_list is DEFAULT_RESOURCES: resource_list = self.resource_list if multiple_leases and len(resource_list) <= 1: pass elif not multiple_leases and len(resource_list) > 1: self.logger.error('LeaseWalletRequestProcessor assigned multiple leases, ' 'but request only wants one') if multiple_leases: for resource in resource_list: lease = self.lease_wallet.advance(resource) request.leases.add().CopyFrom(lease.lease_proto) else: lease = self.lease_wallet.advance(resource_list[0]) request.lease.CopyFrom(lease.lease_proto)
[docs] @staticmethod def get_lease_state(request): """Returns a tuple of ("are there multiple leases in request?", "are they set already?")""" skip_mutation = False multiple_leases = None try: # ValueError will occur if the request class does not have a field named 'lease' skip_mutation = request.HasField('lease') except ValueError: try: # AttributeError will occur if the request class does not have a field named 'leases' skip_mutation = len(request.leases) > 0 except AttributeError: # If we get here, there's no 'lease' field nor a 'leases' field. # There are responses that do not have either field, so just return. skip_mutation = True else: multiple_leases = True else: # If we get here, there's only a single lease. multiple_leases = False return multiple_leases, skip_mutation
[docs]class LeaseWalletResponseProcessor(object): """LeaseWalletResponseProcessor updates the wallet with a LeaseUseResult. Args: lease_wallet: Lease wallet to use. """ def __init__(self, lease_wallet): self.lease_wallet = lease_wallet
[docs] def mutate(self, response): """Update the wallet if a response has a lease_use_result.""" try: # AttributeError will occur if the response does not have a field named 'lease_use_result' lease_use_results = [response.lease_use_result] except AttributeError: try: # AttributeError will occur if the request class does not have a field named 'lease_use_results' lease_use_results = response.lease_use_results except AttributeError: # If we get here, there's no 'lease' field nor a 'leases' field for usage results. # There are responses that do not have either field, so just return. return for result in lease_use_results: self.lease_wallet.on_lease_use_result(result)
[docs]def add_lease_wallet_processors(client, lease_wallet, resource_list=None): """Adds LeaseWallet related processors to a gRPC client. For services which use leases for access control, this does two things: * Advance the lease from the LeaseWallet and attach to a request. * Handle the LeaseUseResult from a response and update LeaseWallet. Args: * client: BaseClient derived class for a single service. * lease_wallet: The LeaseWallet to track from, must be non-None. * resource_list: List of resources these processors should add to requests. Default None to use a default resource. """ client.request_processors.append(LeaseWalletRequestProcessor(lease_wallet, resource_list)) client.response_processors.append(LeaseWalletResponseProcessor(lease_wallet))
[docs]class LeaseKeepAlive(object): """LeaseKeepAlive issues lease liveness checks on a background thread. The robot's lease system expects lease-holders to check in at a regular cadence. If the check-ins do not happen, the robot will treat it as a communications loss. Typically this will result in the robot stopping, powering off, and the lease-holder getting their lease revoked. Using a LeaseKeepAlive object hides most of the details of issuing the lease liveness check. Developers can also manage liveness checks directly by using the retain_lease methods on the LeaseClient object. Args: lease_client: The LeaseClient object to issue requests on. lease_wallet: The LeaseWallet to retrieve current leases from, and to handle any bad LeaseUseResults from. If not specified, the lease_client's lease_wallet will be used. resource: The resource to do liveness checks for. rpc_interval_seconds: Duration in seconds between liveness checks. keep_running_cb: If specified, should be a callable object that returns True if the liveness checks should proceed, False otherwise. LeaseKeepAlive will invoke keep_running_cb on its background thread. One example of where this could be used is in an interactive UI - keep_running_cb could stall or return False if the UI thread is wedged, which prevents the application from continuing to keep the Lease alive when it is no longer in a good state. on_failure_callback: If specified, this should be a callable function object which takes the error/exception as an input. The function does not need to return anything. This function can be used to action on any failures during the keepalive from the RetainLease RPC. warnings(bool): Used to determine if the _periodic_check_in function will print lease check-in errors. must_acquire(bool): If True, exceptions when trying to acquire the lease will not be caught. return_at_exit(bool): If True, return the lease when shutting down. """ def __init__(self, lease_client, lease_wallet=None, resource=_RESOURCE_BODY, rpc_interval_seconds=2, keep_running_cb=None, host_name="", on_failure_callback=None, warnings=True, must_acquire=False, return_at_exit=False): """Create a new LeaseKeepAlive object.""" self.host_name = host_name self.print_warnings = warnings self._return_at_exit = return_at_exit if not lease_client: raise ValueError("lease_client must be set") self._lease_client = lease_client if not lease_wallet: lease_wallet = lease_client.lease_wallet if not lease_wallet: raise ValueError("lease_wallet must be set") self._lease_wallet = lease_wallet if not resource: raise ValueError("resource must be set") self._resource = resource # If we don't have the lease, acquire it. try: self._lease_wallet.get_lease(self._resource) except Error: try: self._lease_client.acquire(self._resource) except BaseError as exc: if must_acquire: raise _LOGGER.error('Failed to acquire the lease in LeaseKeepAlive: %s', exc) if rpc_interval_seconds <= 0.0: raise ValueError("rpc_interval_seconds must be > 0, was %f" % rpc_interval_seconds) self._rpc_interval_seconds = rpc_interval_seconds self.logger = logging.getLogger() self._keep_running = keep_running_cb or (lambda: True) self._end_check_in_signal = threading.Event() # If the on_failure_callback is not provided, then set the default as a no-op function. self._retain_lease_failed_cb = on_failure_callback or (lambda err: None) # Configure the thread to do check-ins, and begin checking in. self._thread = threading.Thread(target=self._periodic_check_in) self._thread.daemon = True self._thread.start()
[docs] def shutdown(self): """Shut the background thread down and stop the liveness checks. Can be called multiple times, but subsequent calls are no-ops. Blocks until the background thread completes. """ self.logger.debug('Shutting down') self._end_periodic_check_in() self.wait_until_done() if self._return_at_exit: try: self._lease_client.return_lease(self.lease_wallet.get_lease(self._resource), timeout=2) except (LeaseResponseError, NoSuchLease, LeaseNotOwnedByWallet): pass # These all mean that we don't own the lease anymore, which is fine. except RpcError as exc: _LOGGER.error('Failed to return the lease at the end: %s', exc)
[docs] def is_alive(self): return self._thread.is_alive()
@property def lease_wallet(self): return self._lease_wallet
[docs] def wait_until_done(self): """Waits until the background thread exits. Most client code should exit the background thread by using shutdown or by passing in a keep_running_cb callback in the constructor. However, this can be useful in unit tests for ensuring exits. """ self._thread.join()
def _end_periodic_check_in(self): """Stop checking into the Lease system.""" self.logger.debug('Stopping check-in') self._end_check_in_signal.set() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() def _ok(self): self.logger.debug('Check-in successful') def _check_in(self): """Retain lease associated with the resource in this class.""" lease = self._lease_wallet.get_lease(self._resource) if not lease: return None return self._lease_client.retain_lease(lease) def _periodic_check_in(self): """Periodically check in and retain the lease associated with the resource in this class.""" self.logger.info('Starting lease check-in') while True: # Include the time it takes to execute keep_running, in case it takes a significant # portion of our check in period. exec_start = time.time() # Stop doing retention if this is not meant to keep running. if not self._keep_running(): break try: self._check_in() # We really do want to catch anything. #pylint: disable=broad-except except Exception as exc: if self.print_warnings: self.logger.warning( 'Generic exception for %s during check-in:\n%s\n' ' (resuming check-in)', self.host_name, exc) self._retain_lease_failed_cb(exc) else: # No errors! self._ok() # How long did the RPC and processing of said RPC take? exec_seconds = time.time() - exec_start # Block and wait for the stop signal. If we receive it within the check-in period, # leave the loop. This check must be at the end of the loop! # Wait up to self._check_in_period seconds, minus the RPC processing time. # (values < 0 are OK and will return immediately) if self._end_check_in_signal.wait(self._rpc_interval_seconds - exec_seconds): break self.logger.info('Lease check-in stopped')
[docs]def test_active_lease(incoming_lease_proto, active_lease, sublease_name=None, allow_super_leases=False): """Check if an incoming lease is newer than the current lease. Args: incoming_lease_proto(lease_pb2.Lease): The incoming lease proto. active_lease(Lease): A lease object representing the most recent/newest known lease that the incoming lease should be compared against. sublease_name(string): If not NoneType, a sublease of the incoming lease will be created (with sublease_name as the client name) and used to compare to the active lease. allow_super_leases(boolean): If true, a super lease will still be considered as "ok"/ newer when compared to the active lease. Returns: A tuple containing the lease use result from comparing the incoming and active leases, and then a Lease object made from the incoming lease proto. The lease object will be None if the incoming lease proto was invalid. The lease object will be a sublease of the incoming lease proto if sublease_name is not NoneType. """ lease_use_result = LeaseUseResult() lease_use_result.attempted_lease.CopyFrom(incoming_lease_proto) # Ensure the incoming lease is valid. If it is valid, create a sublease for the # incoming lease and test with that. try: incoming_lease = Lease(incoming_lease_proto) if sublease_name is not None: incoming_lease = incoming_lease.create_sublease(client_name=sublease_name) except ValueError: # Invalid lease proto will throw this error. lease_use_result.status = LeaseUseResult.STATUS_INVALID_LEASE return lease_use_result, None if active_lease is None: # No active lease means that the incoming lease is good! # Set the incoming lease proto as the latest known lease. There will be no previous # lease since this is the first one for the resource. lease_use_result.latest_known_lease.CopyFrom(incoming_lease.lease_proto) lease_use_result.status = LeaseUseResult.STATUS_OK return lease_use_result, incoming_lease # Ensure the active lease is also valid. if not active_lease.is_valid_lease(): # Raise an exception since the incoming lease is invalid. raise Error("The active lease object is invalid.") # Set the previous lease as the active lease. lease_use_result.previous_lease.CopyFrom(active_lease.lease_proto) # Set the latest known lease as the active lease. This will be overwritten if the incoming # lease is found to be newer. lease_use_result.latest_known_lease.CopyFrom(active_lease.lease_proto) # Compare the incoming lease's sublease to the latest known/most recent lease (active lease). compare_result = incoming_lease.compare(active_lease) lease_use_result.status = Lease.compare_result_to_lease_use_result_status( compare_result, allow_super_leases) if lease_use_result.status == LeaseUseResult.STATUS_OK and compare_result != Lease.CompareResult.SUPER_LEASE: # Only update the latest known lease if the incoming lease was found as status ok (newer/the same # as the existing lease). Also prevents a "super-lease" from getting set as the # latest known lease (when the allow_super_lease boolean is True) when there have been newer # subleases seen. lease_use_result.latest_known_lease.CopyFrom(incoming_lease.lease_proto) return lease_use_result, incoming_lease