# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""Contains elements common to all service clients."""
import concurrent
import copy
import functools
import logging
import socket
import types
import grpc
from deprecated.sphinx import deprecated
from bosdyn.api.header_pb2 import CommonError
from bosdyn.deprecated import moved_to
from .channel import TransportError, translate_exception
from .data_chunk import chunk_message, parse_from_chunks
from .exceptions import (CustomParamError, Error, InternalServerError, InvalidRequestError,
LeaseUseError, LicenseError, ResponseError, UnsetStatusError)
_LOGGER = logging.getLogger(__name__)
from bosdyn.api import data_chunk_pb2, license_pb2
DEFAULT_RPC_TIMEOUT = 30 # seconds
[docs]def common_lease_errors(response):
"""Return an exception based on lease use result. None if no error."""
if hasattr(response, 'lease_use_result'):
# On the off chance the protobuf message has a lease_use_result field but the instance does
# not have it filled out...
if response.HasField('lease_use_result'):
lease_use_results = [response.lease_use_result]
else:
lease_use_results = []
elif hasattr(response, 'lease_use_results'):
lease_use_results = response.lease_use_results
else:
# This means you're using the wrong error handler.
return InternalServerError(response, 'No LeaseUseResult field found!')
for result in lease_use_results:
if result.status != result.STATUS_OK:
return LeaseUseError(response, result)
return None
[docs]def streaming_common_lease_errors(response_iterator):
"""Return an exception based on lease use result for a streaming
response iterator. None if no error."""
for response in response_iterator:
error = common_lease_errors(response)
if error is not None:
return error
# No lease use error found.
return None
[docs]def custom_params_error(response, status_value=None, status_field_name='status',
error_field_name='custom_param_error', total_response=None):
"""Return an exception based on having a custom parameter status and message.
None if no error."""
if status_value is None:
status_value = response.STATUS_CUSTOM_PARAMS_ERROR
if getattr(response, status_field_name) == status_value:
return CustomParamError(total_response or response, getattr(response, error_field_name))
return None
[docs]def error_pair(error_message):
"""Creates a pair of an error class and the associated docstring as the error message
which can be used by the error_factory.
Args:
error_message: A class that inherits from the python Error class.
Returns:
The tuple of the error class and it's associated docstring.
"""
return (error_message, error_message.__doc__)
[docs]def error_factory(response, status, status_to_string, status_to_error):
"""Return an error based on the status field of the given response.
Since most callers of this function are "response to error" callbacks, any exceptions
raised by this function are a considered a serious problem. Strongly consider using
collections.defaultdict for the status_to_error mapping, and/or wrapping calls to this
function in try/except blocks.
Args:
response: Protobuf message to examine or an iterator of protobuf responses.
status: Status from the protobuf message.
status_to_string: Function that converts numeric status value to string. May raise
ValueError, in which case just the numeric code is included in a default
error message.
status_to_error: mapping of status -> (error_constructor, error_message)
error_constructor must take arguments "response" and "error_message".
(and ideally will subclass from ResponseError.)
Returns:
None if status_to_error[status] maps to (None, _).
Otherwise, an instance of an error determined by status_to_error.
"""
error_type, message = status_to_error[status]
# This status is not an error.
if error_type is None:
return None
# This status doesn't have a default error message, let's make one.
if message is None:
try:
status_str = status_to_string(status)
except ValueError:
message = 'Code: {} (Protobuf definition mismatch?)'.format(status)
else:
message = 'Code: {} ({})'.format(status, status_str)
# Determine if this is a streaming response or a regular response.
if isinstance(response, types.GeneratorType):
for resp in response:
err = error_type(response=resp, error_message=message)
if err is not None:
return err
return None
else:
return error_type(response=response, error_message=message)
[docs]def handle_unset_status_error(unset, field='status', statustype=None):
"""Decorate "error from response" functions to handle unset status field errors."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# See if the given field is the given "unset" value.
if isinstance(args[0], list):
for resp in args[0]:
_statustype = statustype if statustype else resp
if getattr(resp, field) == getattr(_statustype, unset):
return UnsetStatusError(resp)
else:
_statustype = statustype if statustype else args[0]
if getattr(args[0], field) == getattr(_statustype, unset):
return UnsetStatusError(args[0])
return func(*args, **kwargs)
return wrapper
return decorator
[docs]def handle_common_header_errors(func):
"""Decorate "error from response" functions to handle typical header errors."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# Look for errors in the common response, before looking for specific errors.
# pylint: disable=no-value-for-parameter
if isinstance(args[0], list):
return streaming_common_header_errors(*args) or func(*args, **kwargs)
else:
return common_header_errors(*args) or func(*args, **kwargs)
return wrapper
[docs]def handle_lease_use_result_errors(func):
"""Decorate "error from response" functions to handle typical lease errors."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
# pylint: disable=no-value-for-parameter
if isinstance(args[0], list):
return streaming_common_lease_errors(*args) or func(*args, **kwargs)
else:
return common_lease_errors(*args) or func(*args, **kwargs)
return wrapper
[docs]def handle_custom_params_errors(*args, status_value=None, status_field_name='status',
error_field_name='custom_param_error'):
"""Decorate "error from response" functions to handle custom param errors."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
# pylint: disable=no-value-for-parameter
return custom_params_error(*args, status_value=status_value,
status_field_name=status_field_name,
error_field_name=error_field_name) or func(*args, **kwargs)
return wrapper
if len(args) == 1 and callable(args[0]):
# No arguments, this is the decorator
return decorator(args[0])
return decorator
[docs]def handle_license_errors(func):
"""Decorate "error from response" functions to handle typical license errors."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return common_license_errors(*args) or func(*args, **kwargs)
return wrapper
[docs]def handle_license_errors_if_present(func):
"""Decorate "error from response" functions to handle typical license errors.
Does not raise an error for STATUS_UNKNOWN.
Use for responses that may only sometimes fill out the license status."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
return common_license_errors(*args, allow_unset=True) or func(*args, **kwargs)
return wrapper
[docs]def common_license_errors(response, allow_unset=False):
license_status = response.license_status
if allow_unset and license_status == license_pb2.LicenseInfo.STATUS_UNKNOWN:
return None
elif license_status != license_pb2.LicenseInfo.STATUS_VALID:
return LicenseError(response)
return None
[docs]def maybe_raise(exc):
"""raise the provided exception if it is not None"""
if exc is not None:
raise exc
[docs]def print_response(func):
"""Decorate "error from response" functions to print for debugging specific messages."""
def print_message(response):
print(response)
def print_streaming_message(response_iterator):
for response in response_iterator:
print_message(response)
@functools.wraps(func)
def wrapper(*args, **kwargs):
# pylint: disable=no-value-for-parameter
if isinstance(args[0], types.GeneratorType):
print_streaming_message(*args)
else:
print_message(*args)
return func(*args, **kwargs)
return wrapper
[docs]def process_kwargs(func):
@functools.wraps(func)
def processor(self, rpc_method, request, value_from_response=None, error_from_response=None,
**kwargs):
if kwargs.get("disable_value_handler"):
value_from_response = None
kwargs.pop("disable_value_handler", None)
if kwargs.get("disable_error_handler"):
error_from_response = None
kwargs.pop("disable_error_handler", None)
return func(self, rpc_method, request, value_from_response=value_from_response,
error_from_response=error_from_response, **kwargs)
return processor
[docs]class BaseClient(object):
"""Helper base class for all clients to Boston Dynamics services."""
_SPLIT_SERVICE = '.'
_SPLIT_METHOD = '/'
def __init__(self, stub_creation_func, name=None):
self._service_type_short = getattr(self.__class__, 'service_type',
'BaseClient').split(BaseClient._SPLIT_SERVICE)[-1]
self._channel = None
self._logger = None
self._name = name
self._stub = None
self._stub_creation_func = stub_creation_func
self.logger = logging.getLogger(self._name or 'bosdyn.{}'.format(self._service_type_short))
self.request_processors = []
self.response_processors = []
self.lease_wallet = None
self.client_name = None
self.executor = None
[docs] @staticmethod
@deprecated(reason='Forces serialization even if the logging is not happening. Do not use.',
version='3.3.0')
def request_trim_for_log(req):
return '\n{}\n'.format(req)
[docs] @staticmethod
@deprecated(reason='Forces serialization even if the logging is not happening. Do not use.',
version='3.3.0')
def response_trim_for_log(resp):
return '\n{}\n'.format(resp)
@property
def channel(self):
if self._channel is None:
raise Error('Client channel is unset!')
return self._channel
@channel.setter
def channel(self, channel):
self._channel = channel
self._stub = self._stub_creation_func(channel)
[docs] def update_from(self, other):
"""Adopt key objects like processors, logger, and wallet from other."""
self.request_processors = other.request_processors + self.request_processors
self.response_processors = other.response_processors + self.response_processors
self.logger = other.logger.getChild(self._name or self._service_type_short)
self.lease_wallet = other.lease_wallet
self.client_name = other.client_name
self.executor = other.executor
[docs] def update_request_iterator(self, request_iterator, logger, rpc_method, is_blocking,
copy_request=True):
for request in request_iterator:
request = self._apply_request_processors(request, copy_request=copy_request)
if is_blocking:
logger.debug('blocking request: %s\n%s', rpc_method._method, request)
else:
logger.debug('async request: %s\n%s', rpc_method._method, request)
yield request
[docs] def update_response_iterator(self, response_iterator, logger, rpc_method, is_blocking):
try:
for response in response_iterator:
response = self._apply_response_processors(copy.deepcopy(response))
if is_blocking:
logger.debug('blocking response: %s\n%s', rpc_method._method, response)
else:
logger.debug('async response: %s\n%s', rpc_method._method, response)
yield response
except TransportError as e:
# Iterating through the response_iterator is the point that transport exceptions will
# be thrown for streaming rpcs if any are going to occur.
# Here we make sure that they're translated to our more meaningful exceptions.
# Any ResponseErrors or other exception types can be let through untranslated.
# Use the "raise from None" pattern to reset the exception's context, which produces
# confusing stack traces.
raise translate_exception(e) from None
[docs] @process_kwargs
def call(self, rpc_method, request, value_from_response=None, error_from_response=None,
assemble_type=None, copy_request=True, **kwargs):
"""Returns result of calling rpc_method(request, kwargs) after running processors.
value_from_response and error_from_response should not raise their own exceptions!
Additionally, value_from_response and error_from_response that are not common handlers
must accept streaming responses if it is a grpc streaming response.
"""
logger = self._get_logger(rpc_method)
if isinstance(rpc_method, grpc.StreamUnaryMultiCallable) or isinstance(
rpc_method, grpc.StreamStreamMultiCallable):
# The incoming request is a streaming request.
request = self.update_request_iterator(request, logger, rpc_method, is_blocking=True,
copy_request=copy_request)
else:
request = self._apply_request_processors(request, copy_request=copy_request)
logger.debug('blocking request: %s\n%s', rpc_method._method, request)
try:
timeout = kwargs.pop('timeout', DEFAULT_RPC_TIMEOUT)
response = rpc_method(request, timeout=timeout, **kwargs)
except TransportError as e:
# Use the "raise from None" pattern to reset the exception's context, which produces
# confusing stack traces.
raise translate_exception(e) from None
if isinstance(rpc_method, grpc.UnaryStreamMultiCallable) or isinstance(
rpc_method, grpc.StreamStreamMultiCallable):
# The outgoing response is a streaming response.
if assemble_type is not None:
# Assemble the data chunks into a message before passing to non-streaming handlers.
msg = assemble_type()
# For server streaming response RPCs, transport errors are not raised during the rpc call.
# We cannot explicitly check for them until the RPC deadline has been exceeded.
# To make due, we attempt to parse the response and catch transport errors raised while iterating through the responses.
try:
parse_from_chunks(response, msg)
except TransportError as e:
raise translate_exception(e) from None
msg = self._apply_response_processors(msg)
logger.debug('response: %s\n%s', rpc_method._method, msg)
return self.handle_response(msg, error_from_response, value_from_response)
else:
responses = self.update_response_iterator(response, logger, rpc_method,
is_blocking=True)
return self.handle_response_streaming(list(responses), error_from_response,
value_from_response)
else:
response = self._apply_response_processors(response)
logger.debug('response: %s\n%s', rpc_method._method, response)
return self.handle_response(response, error_from_response, value_from_response)
[docs] def handle_response(self, response, error_from_response, value_from_response):
if error_from_response is not None:
exc = error_from_response(response)
else:
exc = None
if exc is not None:
raise exc # pylint: disable=raising-bad-type
if value_from_response is None:
return response
return value_from_response(response)
[docs] def handle_response_streaming(self, response, error_from_response, value_from_response):
if error_from_response is not None:
exc = error_from_response(response)
else:
exc = None
if exc is not None:
raise exc # pylint: disable=raising-bad-type
if value_from_response is None:
return response
return value_from_response(response)
[docs] @process_kwargs
def call_async(self, rpc_method, request, value_from_response=None, error_from_response=None,
copy_request=True, **kwargs):
"""Returns a Future for rpc_method(request, kwargs) after running processors.
value_from_response and error_from_response should not raise their own exceptions!
call_async does not accept streaming rpcs, see 'call_async_streaming'.
"""
request = self._apply_request_processors(request, copy_request=copy_request)
logger = self._get_logger(rpc_method)
logger.debug('async request: %s\n%s', rpc_method._method, request)
timeout = kwargs.pop('timeout', DEFAULT_RPC_TIMEOUT)
response_future = rpc_method.future(request, timeout=timeout, **kwargs)
def on_finish(fut):
try:
result = fut.result()
except Exception as exc: # pylint: disable=broad-except
logger.debug('async exception: %s\n%s\n', rpc_method._method, exc)
else:
try:
self._apply_response_processors(result)
except Exception: # pylint: disable=broad-except
logger.exception("Error applying response processors.")
else:
logger.debug('async response: %s\n%s', rpc_method._method, result)
response_future.add_done_callback(on_finish)
return FutureWrapper(response_future, value_from_response, error_from_response)
[docs] @process_kwargs
def call_async_streaming(self, rpc_method, request, value_from_response=None,
error_from_response=None, assemble_type=None, copy_request=False,
**kwargs):
"""Returns a Future for rpc_method(request, kwargs) after running processors.
value_from_response and error_from_response should not raise their own exceptions.
A version of 'call_async' for streaming rpcs. True async streaming calls are not supported by
python grpc. Instead, this call creates a thread that runs the synchronous 'call' function.
"""
request = self._apply_request_processors(request, copy_request=copy_request)
if self.executor is None:
self.executor = concurrent.futures.ThreadPoolExecutor()
future = self.executor.submit(self.call, rpc_method, request, assemble_type=assemble_type,
copy_request=copy_request, **kwargs)
return FutureWrapper(future, value_from_response, error_from_response, is_streaming=True)
def _apply_request_processors(self, request, copy_request=True):
if request is None:
return
if copy_request:
request = copy.deepcopy(request)
for proc in self.request_processors:
proc.mutate(request)
return request
def _apply_response_processors(self, response):
if response is None:
return
for proc in self.response_processors:
proc.mutate(response)
return response
def _get_logger(self, rpc_method):
method_name = getattr(rpc_method, '_method', None)
if method_name:
method_name_short = str(method_name.decode()).rsplit(BaseClient._SPLIT_METHOD, 1)[-1]
# This returns the same instance if it's been created before.
return self.logger.getChild(method_name_short)
return self.logger
chunk_message = moved_to(chunk_message, version='3.3.0')
[docs]class FutureWrapper():
"""Wraps a Future to aid more complicated clients' async calls."""
def __init__(self, future, value_from_response, error_from_response, is_streaming=False):
self.original_future = future
self._error_from_response = error_from_response
self._value_from_response = value_from_response
self._is_streaming = is_streaming
def __repr__(self):
return self.original_future.__repr__()
[docs] def cancel(self):
return self.original_future.cancel()
[docs] def cancelled(self):
return self.original_future.cancelled()
[docs] def running(self):
return self.original_future.running()
[docs] def done(self):
return self.original_future.done()
[docs] def traceback(self, **kwargs):
return self.original_future.traceback(**kwargs)
[docs] def add_done_callback(self, cb, assemble_type=None):
"""Add callback executed on FutureWrapper when future is done."""
self.original_future.add_done_callback(lambda not_used_original_future: cb(self))
[docs] def result(self, **kwargs):
"""Get the result of the value_from_response(future.result())."""
error = self.exception()
if error is not None:
raise error
base_result = self.original_future.result(**kwargs)
if self._value_from_response is None:
return base_result
return self._value_from_response(base_result)
[docs] def exception(self, **kwargs):
"""Get exceptions from the Future, or from custom response processing."""
error = self.original_future.exception(**kwargs)
if error is None:
if self._error_from_response is None:
return None
return self._error_from_response(self.original_future.result())
# 'call_async_streaming' uses the non-async 'call' function. 'call' does all of it's
# own error handling so just return any errors from that call as is.
if self._is_streaming:
return error
return translate_exception(error)
[docs]def get_self_ip(robot_hostname):
""" Get the IP address of the ethernet or WiFi interface used to talk to the robot."""
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect((robot_hostname, 1))
ip = s.getsockname()[0]
except socket.error:
ip = '127.0.0.1'
finally:
s.close()
return ip