Source code for bosdyn.client.common

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Contains elements common to all service clients."""
import concurrent
import copy
import functools
import logging
import socket
import types

import grpc
from deprecated.sphinx import deprecated

from bosdyn.api.header_pb2 import CommonError
from bosdyn.deprecated import moved_to

from .channel import TransportError, translate_exception
from .data_chunk import chunk_message, parse_from_chunks
from .exceptions import (CustomParamError, Error, InternalServerError, InvalidRequestError,
                         LeaseUseError, LicenseError, ResponseError, UnsetStatusError)

_LOGGER = logging.getLogger(__name__)

from bosdyn.api import data_chunk_pb2, license_pb2

DEFAULT_RPC_TIMEOUT = 30  # seconds


[docs]def common_header_errors(response): """Return an exception based on common response header. None if no error.""" if response.header.error.code == CommonError.CODE_OK: return None if response.header.error.code == CommonError.CODE_UNSPECIFIED: return UnsetStatusError(response) if response.header.error.code == CommonError.CODE_INTERNAL_SERVER_ERROR: return InternalServerError(response) if response.header.error.code == CommonError.CODE_INVALID_REQUEST: return InvalidRequestError(response) return ResponseError(response)
[docs]def streaming_common_header_errors(response_iterator): """Return an exception based on common response header for a streaming response iterator. None if no error.""" for response in response_iterator: error = common_header_errors(response) if error is not None: return error # No common header error found. return None
[docs]def common_lease_errors(response): """Return an exception based on lease use result. None if no error.""" if hasattr(response, 'lease_use_result'): # On the off chance the protobuf message has a lease_use_result field but the instance does # not have it filled out... if response.HasField('lease_use_result'): lease_use_results = [response.lease_use_result] else: lease_use_results = [] elif hasattr(response, 'lease_use_results'): lease_use_results = response.lease_use_results else: # This means you're using the wrong error handler. return InternalServerError(response, 'No LeaseUseResult field found!') for result in lease_use_results: if result.status != result.STATUS_OK: return LeaseUseError(response, result) return None
[docs]def streaming_common_lease_errors(response_iterator): """Return an exception based on lease use result for a streaming response iterator. None if no error.""" for response in response_iterator: error = common_lease_errors(response) if error is not None: return error # No lease use error found. return None
[docs]def custom_params_error(response, status_value=None, status_field_name='status', error_field_name='custom_param_error', total_response=None): """Return an exception based on having a custom parameter status and message. None if no error.""" if status_value is None: status_value = response.STATUS_CUSTOM_PARAMS_ERROR if getattr(response, status_field_name) == status_value: return CustomParamError(total_response or response, getattr(response, error_field_name)) return None
[docs]def error_pair(error_message): """Creates a pair of an error class and the associated docstring as the error message which can be used by the error_factory. Args: error_message: A class that inherits from the python Error class. Returns: The tuple of the error class and it's associated docstring. """ return (error_message, error_message.__doc__)
[docs]def error_factory(response, status, status_to_string, status_to_error): """Return an error based on the status field of the given response. Since most callers of this function are "response to error" callbacks, any exceptions raised by this function are a considered a serious problem. Strongly consider using collections.defaultdict for the status_to_error mapping, and/or wrapping calls to this function in try/except blocks. Args: response: Protobuf message to examine or an iterator of protobuf responses. status: Status from the protobuf message. status_to_string: Function that converts numeric status value to string. May raise ValueError, in which case just the numeric code is included in a default error message. status_to_error: mapping of status -> (error_constructor, error_message) error_constructor must take arguments "response" and "error_message". (and ideally will subclass from ResponseError.) Returns: None if status_to_error[status] maps to (None, _). Otherwise, an instance of an error determined by status_to_error. """ error_type, message = status_to_error[status] # This status is not an error. if error_type is None: return None # This status doesn't have a default error message, let's make one. if message is None: try: status_str = status_to_string(status) except ValueError: message = 'Code: {} (Protobuf definition mismatch?)'.format(status) else: message = 'Code: {} ({})'.format(status, status_str) # Determine if this is a streaming response or a regular response. if isinstance(response, types.GeneratorType): for resp in response: err = error_type(response=resp, error_message=message) if err is not None: return err return None else: return error_type(response=response, error_message=message)
[docs]def handle_unset_status_error(unset, field='status', statustype=None): """Decorate "error from response" functions to handle unset status field errors.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # See if the given field is the given "unset" value. if isinstance(args[0], list): for resp in args[0]: _statustype = statustype if statustype else resp if getattr(resp, field) == getattr(_statustype, unset): return UnsetStatusError(resp) else: _statustype = statustype if statustype else args[0] if getattr(args[0], field) == getattr(_statustype, unset): return UnsetStatusError(args[0]) return func(*args, **kwargs) return wrapper return decorator
[docs]def handle_common_header_errors(func): """Decorate "error from response" functions to handle typical header errors.""" @functools.wraps(func) def wrapper(*args, **kwargs): # Look for errors in the common response, before looking for specific errors. # pylint: disable=no-value-for-parameter if isinstance(args[0], list): return streaming_common_header_errors(*args) or func(*args, **kwargs) else: return common_header_errors(*args) or func(*args, **kwargs) return wrapper
[docs]def handle_lease_use_result_errors(func): """Decorate "error from response" functions to handle typical lease errors.""" @functools.wraps(func) def wrapper(*args, **kwargs): # pylint: disable=no-value-for-parameter if isinstance(args[0], list): return streaming_common_lease_errors(*args) or func(*args, **kwargs) else: return common_lease_errors(*args) or func(*args, **kwargs) return wrapper
[docs]def handle_custom_params_errors(*args, status_value=None, status_field_name='status', error_field_name='custom_param_error'): """Decorate "error from response" functions to handle custom param errors.""" def decorator(func): @functools.wraps(func) def wrapper(*args, **kwargs): # pylint: disable=no-value-for-parameter return custom_params_error(*args, status_value=status_value, status_field_name=status_field_name, error_field_name=error_field_name) or func(*args, **kwargs) return wrapper if len(args) == 1 and callable(args[0]): # No arguments, this is the decorator return decorator(args[0]) return decorator
[docs]def handle_license_errors(func): """Decorate "error from response" functions to handle typical license errors.""" @functools.wraps(func) def wrapper(*args, **kwargs): return common_license_errors(*args) or func(*args, **kwargs) return wrapper
[docs]def handle_license_errors_if_present(func): """Decorate "error from response" functions to handle typical license errors. Does not raise an error for STATUS_UNKNOWN. Use for responses that may only sometimes fill out the license status.""" @functools.wraps(func) def wrapper(*args, **kwargs): return common_license_errors(*args, allow_unset=True) or func(*args, **kwargs) return wrapper
[docs]def common_license_errors(response, allow_unset=False): license_status = response.license_status if allow_unset and license_status == license_pb2.LicenseInfo.STATUS_UNKNOWN: return None elif license_status != license_pb2.LicenseInfo.STATUS_VALID: return LicenseError(response) return None
[docs]def maybe_raise(exc): """raise the provided exception if it is not None""" if exc is not None: raise exc
[docs]def process_kwargs(func): @functools.wraps(func) def processor(self, rpc_method, request, value_from_response=None, error_from_response=None, **kwargs): if kwargs.get("disable_value_handler"): value_from_response = None kwargs.pop("disable_value_handler", None) if kwargs.get("disable_error_handler"): error_from_response = None kwargs.pop("disable_error_handler", None) return func(self, rpc_method, request, value_from_response=value_from_response, error_from_response=error_from_response, **kwargs) return processor
[docs]class BaseClient(object): """Helper base class for all clients to Boston Dynamics services.""" _SPLIT_SERVICE = '.' _SPLIT_METHOD = '/' def __init__(self, stub_creation_func, name=None): self._service_type_short = getattr(self.__class__, 'service_type', 'BaseClient').split(BaseClient._SPLIT_SERVICE)[-1] self._channel = None self._logger = None self._name = name self._stub = None self._stub_creation_func = stub_creation_func self.logger = logging.getLogger(self._name or 'bosdyn.{}'.format(self._service_type_short)) self.request_processors = [] self.response_processors = [] self.lease_wallet = None self.client_name = None self.executor = None
[docs] @staticmethod @deprecated(reason='Forces serialization even if the logging is not happening. Do not use.', version='3.3.0') def request_trim_for_log(req): return '\n{}\n'.format(req)
[docs] @staticmethod @deprecated(reason='Forces serialization even if the logging is not happening. Do not use.', version='3.3.0') def response_trim_for_log(resp): return '\n{}\n'.format(resp)
@property def channel(self): if self._channel is None: raise Error('Client channel is unset!') return self._channel @channel.setter def channel(self, channel): self._channel = channel self._stub = self._stub_creation_func(channel)
[docs] def update_from(self, other): """Adopt key objects like processors, logger, and wallet from other.""" self.request_processors = other.request_processors + self.request_processors self.response_processors = other.response_processors + self.response_processors self.logger = other.logger.getChild(self._name or self._service_type_short) self.lease_wallet = other.lease_wallet self.client_name = other.client_name self.executor = other.executor
[docs] def update_request_iterator(self, request_iterator, logger, rpc_method, is_blocking, copy_request=True): for request in request_iterator: request = self._apply_request_processors(request, copy_request=copy_request) if is_blocking: logger.debug('blocking request: %s\n%s', rpc_method._method, request) else: logger.debug('async request: %s\n%s', rpc_method._method, request) yield request
[docs] def update_response_iterator(self, response_iterator, logger, rpc_method, is_blocking): try: for response in response_iterator: response = self._apply_response_processors(copy.deepcopy(response)) if is_blocking: logger.debug('blocking response: %s\n%s', rpc_method._method, response) else: logger.debug('async response: %s\n%s', rpc_method._method, response) yield response except TransportError as e: # Iterating through the response_iterator is the point that transport exceptions will # be thrown for streaming rpcs if any are going to occur. # Here we make sure that they're translated to our more meaningful exceptions. # Any ResponseErrors or other exception types can be let through untranslated. # Use the "raise from None" pattern to reset the exception's context, which produces # confusing stack traces. raise translate_exception(e) from None
[docs] @process_kwargs def call(self, rpc_method, request, value_from_response=None, error_from_response=None, assemble_type=None, copy_request=True, **kwargs): """Returns result of calling rpc_method(request, kwargs) after running processors. value_from_response and error_from_response should not raise their own exceptions! Additionally, value_from_response and error_from_response that are not common handlers must accept streaming responses if it is a grpc streaming response. """ logger = self._get_logger(rpc_method) if isinstance(rpc_method, grpc.StreamUnaryMultiCallable) or isinstance( rpc_method, grpc.StreamStreamMultiCallable): # The incoming request is a streaming request. request = self.update_request_iterator(request, logger, rpc_method, is_blocking=True, copy_request=copy_request) else: request = self._apply_request_processors(request, copy_request=copy_request) logger.debug('blocking request: %s\n%s', rpc_method._method, request) try: timeout = kwargs.pop('timeout', DEFAULT_RPC_TIMEOUT) response = rpc_method(request, timeout=timeout, **kwargs) except TransportError as e: # Use the "raise from None" pattern to reset the exception's context, which produces # confusing stack traces. raise translate_exception(e) from None if isinstance(rpc_method, grpc.UnaryStreamMultiCallable) or isinstance( rpc_method, grpc.StreamStreamMultiCallable): # The outgoing response is a streaming response. if assemble_type is not None: # Assemble the data chunks into a message before passing to non-streaming handlers. msg = assemble_type() # For server streaming response RPCs, transport errors are not raised during the rpc call. # We cannot explicitly check for them until the RPC deadline has been exceeded. # To make due, we attempt to parse the response and catch transport errors raised while iterating through the responses. try: parse_from_chunks(response, msg) except TransportError as e: raise translate_exception(e) from None msg = self._apply_response_processors(msg) logger.debug('response: %s\n%s', rpc_method._method, msg) return self.handle_response(msg, error_from_response, value_from_response) else: responses = self.update_response_iterator(response, logger, rpc_method, is_blocking=True) return self.handle_response_streaming(list(responses), error_from_response, value_from_response) else: response = self._apply_response_processors(response) logger.debug('response: %s\n%s', rpc_method._method, response) return self.handle_response(response, error_from_response, value_from_response)
[docs] def handle_response(self, response, error_from_response, value_from_response): if error_from_response is not None: exc = error_from_response(response) else: exc = None if exc is not None: raise exc # pylint: disable=raising-bad-type if value_from_response is None: return response return value_from_response(response)
[docs] def handle_response_streaming(self, response, error_from_response, value_from_response): if error_from_response is not None: exc = error_from_response(response) else: exc = None if exc is not None: raise exc # pylint: disable=raising-bad-type if value_from_response is None: return response return value_from_response(response)
[docs] @process_kwargs def call_async(self, rpc_method, request, value_from_response=None, error_from_response=None, copy_request=True, **kwargs): """Returns a Future for rpc_method(request, kwargs) after running processors. value_from_response and error_from_response should not raise their own exceptions! call_async does not accept streaming rpcs, see 'call_async_streaming'. """ request = self._apply_request_processors(request, copy_request=copy_request) logger = self._get_logger(rpc_method) logger.debug('async request: %s\n%s', rpc_method._method, request) timeout = kwargs.pop('timeout', DEFAULT_RPC_TIMEOUT) response_future = rpc_method.future(request, timeout=timeout, **kwargs) def on_finish(fut): try: result = fut.result() except Exception as exc: # pylint: disable=broad-except logger.debug('async exception: %s\n%s\n', rpc_method._method, exc) else: try: self._apply_response_processors(result) except Exception: # pylint: disable=broad-except logger.exception("Error applying response processors.") else: logger.debug('async response: %s\n%s', rpc_method._method, result) response_future.add_done_callback(on_finish) return FutureWrapper(response_future, value_from_response, error_from_response)
[docs] @process_kwargs def call_async_streaming(self, rpc_method, request, value_from_response=None, error_from_response=None, assemble_type=None, copy_request=False, **kwargs): """Returns a Future for rpc_method(request, kwargs) after running processors. value_from_response and error_from_response should not raise their own exceptions. A version of 'call_async' for streaming rpcs. True async streaming calls are not supported by python grpc. Instead, this call creates a thread that runs the synchronous 'call' function. """ request = self._apply_request_processors(request, copy_request=copy_request) if self.executor is None: self.executor = concurrent.futures.ThreadPoolExecutor() future = self.executor.submit(self.call, rpc_method, request, assemble_type=assemble_type, copy_request=copy_request, **kwargs) return FutureWrapper(future, value_from_response, error_from_response, is_streaming=True)
def _apply_request_processors(self, request, copy_request=True): if request is None: return if copy_request: request = copy.deepcopy(request) for proc in self.request_processors: proc.mutate(request) return request def _apply_response_processors(self, response): if response is None: return for proc in self.response_processors: proc.mutate(response) return response def _get_logger(self, rpc_method): method_name = getattr(rpc_method, '_method', None) if method_name: method_name_short = str(method_name.decode()).rsplit(BaseClient._SPLIT_METHOD, 1)[-1] # This returns the same instance if it's been created before. return self.logger.getChild(method_name_short) return self.logger chunk_message = moved_to(chunk_message, version='3.3.0')
[docs]class FutureWrapper(): """Wraps a Future to aid more complicated clients' async calls.""" def __init__(self, future, value_from_response, error_from_response, is_streaming=False): self.original_future = future self._error_from_response = error_from_response self._value_from_response = value_from_response self._is_streaming = is_streaming def __repr__(self): return self.original_future.__repr__()
[docs] def cancel(self): return self.original_future.cancel()
[docs] def cancelled(self): return self.original_future.cancelled()
[docs] def running(self): return self.original_future.running()
[docs] def done(self): return self.original_future.done()
[docs] def traceback(self, **kwargs): return self.original_future.traceback(**kwargs)
[docs] def add_done_callback(self, cb, assemble_type=None): """Add callback executed on FutureWrapper when future is done.""" self.original_future.add_done_callback(lambda not_used_original_future: cb(self))
[docs] def result(self, **kwargs): """Get the result of the value_from_response(future.result()).""" error = self.exception() if error is not None: raise error base_result = self.original_future.result(**kwargs) if self._value_from_response is None: return base_result return self._value_from_response(base_result)
[docs] def exception(self, **kwargs): """Get exceptions from the Future, or from custom response processing.""" error = self.original_future.exception(**kwargs) if error is None: if self._error_from_response is None: return None return self._error_from_response(self.original_future.result()) # 'call_async_streaming' uses the non-async 'call' function. 'call' does all of it's # own error handling so just return any errors from that call as is. if self._is_streaming: return error return translate_exception(error)
[docs]def get_self_ip(robot_hostname): """ Get the IP address of the ethernet or WiFi interface used to talk to the robot.""" s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: # doesn't even have to be reachable s.connect((robot_hostname, 1)) ip = s.getsockname()[0] except socket.error: ip = '127.0.0.1' finally: s.close() return ip