Source code for bosdyn.client.audio_visual

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

import collections
import math

from bosdyn.api import audio_visual_pb2, audio_visual_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
                                  handle_common_header_errors, handle_unset_status_error)

from .exceptions import Error as BaseError
from .exceptions import ResponseError


[docs]class AudioVisualResponseError(ResponseError): """General class of errors for AudioVisual service."""
[docs]class Error(BaseError): """Base class for non-response errors in this module."""
[docs]class NoTimeSyncError(BaseError): """Client has not done timesync with robot."""
[docs]class DoesNotExistError(AudioVisualResponseError): """The specified behavior does not exist."""
[docs]class PermanentBehaviorError(AudioVisualResponseError): """Permanent behaviors cannot be modified or deleted."""
[docs]class BehaviorExpiredError(AudioVisualResponseError): """The specified end_time has already expired."""
[docs]class InvalidBehaviorError(AudioVisualResponseError): """The request contained a behavior with invalid fields."""
[docs]class InvalidClientError(AudioVisualResponseError): """The behavior cannot be stopped because a different client is running it."""
[docs]class AudioVisualClient(BaseClient): """Client for calling the Audio Visual Service.""" default_service_name = 'audio-visual' service_type = 'bosdyn.api.AudioVisualService' def __init__(self): super(AudioVisualClient, self).__init__(audio_visual_service_pb2_grpc.AudioVisualServiceStub) self.timesync_endpoint = None
[docs] def update_from(self, other): """Update instance from another object. Args: other: The object where to copy from. """ super(AudioVisualClient, self).update_from(other) # Grab a timesync endpoint if it is available. try: self.timesync_endpoint = other.time_sync.endpoint except AttributeError: pass # other doesn't have a time_sync accessor
[docs] def run_behavior(self, name, end_time_secs, restart=False, timesync_endpoint=None, **kwargs): """Run a behavior on the robot. Args: name: The name of the behavior to run. end_time_secs: The time that this behavior should stop. restart: If this behavior is already running, should we restart it from the beginning. timesync_endpoint: Timesync endpoint. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: The behavior name specified has not been added to the system. BehaviorExpiredError: The specified end_time has already expired. NoTimeSyncError: Time sync has not been established with the robot yet. """ end_time = self._timestamp_to_robot_time(end_time_secs, timesync_endpoint) req = audio_visual_pb2.RunBehaviorRequest(name=name, end_time=end_time, restart=restart) return self.call(self._stub.RunBehavior, req, error_from_response=_run_behavior_error, copy_request=False, **kwargs)
[docs] def run_behavior_async(self, name, end_time_secs, restart=False, timesync_endpoint=None, **kwargs): """Async version of run_behavior(). Args: name: The name of the behavior to run. end_time_secs: The time that this behavior should stop. restart: If this behavior is already running, should we restart it from the beginning. timesync_endpoint: Timesync endpoint. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: The behavior name specified has not been added to the system. BehaviorExpiredError: The specified end_time has already expired. NoTimeSyncError: Time sync has not been established with the robot yet. """ end_time = self._timestamp_to_robot_time(end_time_secs, timesync_endpoint) req = audio_visual_pb2.RunBehaviorRequest(name=name, end_time=end_time, restart=restart) return self.call_async(self._stub.RunBehavior, req, error_from_response=_run_behavior_error, copy_request=False, **kwargs)
[docs] def stop_behavior(self, name, **kwargs): """Stop a behavior that is currently running. Args: name: The name of the behavior to stop. Raises: RpcError: Problem communicating with the robot. InvalidClientError: A different client is running this behavior.""" req = audio_visual_pb2.StopBehaviorRequest(behavior_name=name) return self.call(self._stub.StopBehavior, req, error_from_response=_stop_behavior_error, copy_request=False, **kwargs)
[docs] def stop_behavior_async(self, name, **kwargs): """Async version of stop_behavior(). Args: name: The name of the behavior to stop. Raises: RpcError: Problem communicating with the robot. InvalidClientError: A different client is running this behavior.""" req = audio_visual_pb2.StopBehaviorRequest(behavior_name=name) return self.call_async(self._stub.StopBehavior, req, error_from_response=_stop_behavior_error, copy_request=False, **kwargs)
[docs] def add_or_modify_behavior(self, name, behavior, **kwargs): """Add or modify an AudioVisualBehavior. Args: name: The name of the behavior to add. behavior: The AudioVisualBehavior proto to add. Returns: The LiveAudioVisualBehavior proto that was just added or modified. Raises: RpcError: Problem communicating with the robot. PermanentBehaviorError: The behavior specified is permanent and cannot be modified. InvalidBehaviorError: The request contained a behavior with invalid fields. """ # Clamp and normalize colors in the behavior before sending the request. led_sequence_group = getattr(behavior, "led_sequence_group", None) if led_sequence_group is not None: behavior.led_sequence_group.CopyFrom(check_color(led_sequence_group)) req = audio_visual_pb2.AddOrModifyBehaviorRequest(name=name, behavior=behavior) return self.call(self._stub.AddOrModifyBehavior, req, value_from_response=_get_live_behavior, error_from_response=_add_or_modify_behavior_error, copy_request=False, **kwargs)
[docs] def add_or_modify_behavior_async(self, name, behavior, **kwargs): """Add or modify an AudioVisualBehavior. Args: name: The name of the behavior to add. behavior: The AudioVisualBehavior proto to add. Returns: The LiveAudioVisualBehavior proto that was just added or modified. Raises: RpcError: Problem communicating with the robot. PermanentBehaviorError: The behavior specified is permanent and cannot be modified. InvalidBehaviorError: The request contained a behavior with invalid fields. """ # Clamp and normalize colors in the behavior before sending the request. led_sequence_group = getattr(behavior, "led_sequence_group", None) if led_sequence_group is not None: behavior.led_sequence_group.CopyFrom(check_color(led_sequence_group)) req = audio_visual_pb2.AddOrModifyBehaviorRequest(name=name, behavior=behavior) return self.call_async(self._stub.AddOrModifyBehavior, req, value_from_response=_get_live_behavior, error_from_response=_add_or_modify_behavior_error, copy_request=False, **kwargs)
[docs] def delete_behaviors(self, behavior_names, **kwargs): """Delete an AudioVisualBehavior. Args: behavior_names: A list of behavior names to delete. Returns: A list of LiveAudioVisualBehavior protos that were deleted. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: A specified behavior name has not been added to the system. PermanentBehaviorError: A specified behavior is permanent and cannot be deleted. """ req = audio_visual_pb2.DeleteBehaviorsRequest(behavior_names=behavior_names) return self.call(self._stub.DeleteBehaviors, req, value_from_response=_get_deleted_behaviors, error_from_response=_delete_behaviors_error, copy_request=False, **kwargs)
[docs] def delete_behaviors_async(self, behavior_names, **kwargs): """Async version of delete_behaviors(). Args: behavior_names: A list of behavior names to delete. Returns: A list of LiveAudioVisualBehavior protos that were deleted. Raises: RpcError: Problem communicating with the robot. DoesNotExistError: A specified behavior name has not been added to the system. PermanentBehaviorError: A specified behavior is permanent and cannot be deleted. """ req = audio_visual_pb2.DeleteBehaviorsRequest(behavior_names=behavior_names) return self.call_async(self._stub.DeleteBehaviors, req, value_from_response=_get_deleted_behaviors, error_from_response=_delete_behaviors_error, copy_request=False, **kwargs)
[docs] def list_behaviors(self, **kwargs): """List all currently added AudioVisualBehaviors. Returns: A list of all LiveAudioVisualBehavior protos. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.ListBehaviorsRequest() return self.call(self._stub.ListBehaviors, req, value_from_response=_get_behavior_list, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def list_behaviors_async(self, **kwargs): """Async version of list_behaviors(). Returns: A list of all LiveAudioVisualBehavior protos. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.ListBehaviorsRequest() return self.call_async(self._stub.ListBehaviors, req, value_from_response=_get_behavior_list, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_system_params(self, **kwargs): """Get the current system params. Returns: An AudioVisualSystemParams proto containing the current system param values. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.GetSystemParamsRequest() return self.call(self._stub.GetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def get_system_params_async(self, **kwargs): """Async version of get_system_params(). Returns: An AudioVisualSystemParams proto containing the current system param values. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.GetSystemParamsRequest() return self.call_async(self._stub.GetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def set_system_params(self, enabled=None, max_brightness=None, buzzer_max_volume=None, speaker_max_volume=None, normal_color_association=None, warning_color_association=None, danger_color_association=None, **kwargs): """Set the system params. Args: enabled: [optional] System is enabled or disabled (boolean). max_brightness: [optional] New max_brightness value [0, 1]. buzzer_max_volume: [optional] New buzzer_max_volume value [0, 1]. speaker_max_volume: [optional] New speaker_max_volume value [0, 1]. normal_color_association: [optional] The color to associate with the normal color preset. warning_color_association: [optional] The color to associate with the warning color preset. danger_color_association: [optional] The color to associate with the danger color preset. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.SetSystemParamsRequest() if (enabled is not None): req.enabled.value = enabled if (max_brightness is not None): req.max_brightness.value = max_brightness if (buzzer_max_volume is not None): req.buzzer_max_volume.value = buzzer_max_volume if (speaker_max_volume is not None): req.speaker_max_volume.value = speaker_max_volume if (normal_color_association is not None): req.normal_color_association.CopyFrom(normal_color_association) if (warning_color_association is not None): req.warning_color_association.CopyFrom(warning_color_association) if (danger_color_association is not None): req.danger_color_association.CopyFrom(danger_color_association) return self.call(self._stub.SetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def set_system_params_async(self, enabled=None, max_brightness=None, buzzer_max_volume=None, speaker_max_volume=None, normal_color_association=None, warning_color_association=None, danger_color_association=None, **kwargs): """Async version of set_system_params(). Args: enabled: [optional] System is enabled or disabled (boolean). max_brightness: [optional] New max_brightness value [0, 1]. buzzer_max_volume: [optional] New buzzer_max_volume value [0, 1]. speaker_max_volume: [optional] New speaker_max_volume value [0, 1]. normal_color_association: [optional] The color to associate with the normal color preset. warning_color_association: [optional] The color to associate with the warning color preset. danger_color_association: [optional] The color to associate with the danger color preset. Raises: RpcError: Problem communicating with the robot. """ req = audio_visual_pb2.SetSystemParamsRequest() if (enabled is not None): req.enabled.value = enabled if (max_brightness is not None): req.max_brightness.value = max_brightness if (buzzer_max_volume is not None): req.buzzer_max_volume.value = buzzer_max_volume if (speaker_max_volume is not None): req.speaker_max_volume.value = speaker_max_volume if (normal_color_association is not None): req.normal_color_association.CopyFrom(normal_color_association) if (warning_color_association is not None): req.warning_color_association.CopyFrom(warning_color_association) if (danger_color_association is not None): req.danger_color_association.CopyFrom(danger_color_association) return self.call_async(self._stub.SetSystemParams, req, error_from_response=common_header_errors, copy_request=False, **kwargs)
def _timestamp_to_robot_time(self, timestamp, timesync_endpoint=None): # Create a time converter to convert timestamp to robot time time_converter = None if (timesync_endpoint): time_converter = timesync_endpoint.get_robot_time_converter() elif (self.timesync_endpoint): time_converter = self.timesync_endpoint.get_robot_time_converter() else: raise NoTimeSyncError("No timesync endpoint was passed to audio visual client.") return time_converter.robot_timestamp_from_local_secs(timestamp)
def _get_behavior_list(response): return response.behaviors def _get_live_behavior(response): return response.live_behavior def _get_deleted_behaviors(response): return response.deleted_behaviors _AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR.update({ audio_visual_pb2.RunBehaviorResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.RunBehaviorResponse.STATUS_DOES_NOT_EXIST: error_pair(DoesNotExistError), audio_visual_pb2.RunBehaviorResponse.STATUS_EXPIRED: error_pair(BehaviorExpiredError), }) _AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR.update({ audio_visual_pb2.StopBehaviorResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.StopBehaviorResponse.STATUS_INVALID_CLIENT: error_pair(InvalidClientError) }) _AUDIO_VISUAL_ADD_OR_MODIFY_BEHAVIOR_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_ADD_OR_MODIFY_BEHAVIOR_STATUS_TO_ERROR.update({ audio_visual_pb2.AddOrModifyBehaviorResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.AddOrModifyBehaviorResponse.STATUS_INVALID: error_pair(InvalidBehaviorError), audio_visual_pb2.AddOrModifyBehaviorResponse.STATUS_MODIFY_PERMANENT: error_pair(PermanentBehaviorError), }) _AUDIO_VISUAL_DELETE_BEHAVIORS_STATUS_TO_ERROR = collections.defaultdict( lambda: (AudioVisualResponseError, None)) _AUDIO_VISUAL_DELETE_BEHAVIORS_STATUS_TO_ERROR.update({ audio_visual_pb2.DeleteBehaviorsResponse.STATUS_SUCCESS: (None, None), audio_visual_pb2.DeleteBehaviorsResponse.STATUS_DOES_NOT_EXIST: error_pair(DoesNotExistError), audio_visual_pb2.DeleteBehaviorsResponse.STATUS_DELETE_PERMANENT: error_pair(PermanentBehaviorError), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _run_behavior_error(response): """RunBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.RunBehaviorResponse.Status.Name, status_to_error=_AUDIO_VISUAL_RUN_BEHAVIOR_STATUS_TO_ERROR) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _stop_behavior_error(response): """StopBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.StopBehaviorResponse.Status.Name, status_to_error=_AUDIO_VISUAL_STOP_BEHAVIOR_STATUS_TO_ERROR) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _add_or_modify_behavior_error(response): """AddOrModifyBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.AddOrModifyBehaviorResponse.Status.Name, status_to_error=_AUDIO_VISUAL_ADD_OR_MODIFY_BEHAVIOR_STATUS_TO_ERROR) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _delete_behaviors_error(response): """DeleteBehaviorResponse response to exception.""" return error_factory(response, response.status, status_to_string=audio_visual_pb2.DeleteBehaviorsResponse.Status.Name, status_to_error=_AUDIO_VISUAL_DELETE_BEHAVIORS_STATUS_TO_ERROR)
[docs]def check_color(led_sequence_group): # Check every LED leds = ["center", "front_left", "front_right", "hind_left", "hind_right"] for led in leds: # Get the LED sequence by location led_sequence = getattr(led_sequence_group, led, None) if led_sequence is not None: # Now, normalize the color in the LED sequence by location if led_sequence.HasField("animation_sequence"): for frame, idx in enumerate(led_sequence.animation_sequence.frames): if frame.HasField("color"): color = clamp_and_normalize_color(frame.color) led_sequence.animation_sequence.frames[idx] = color elif led_sequence.HasField("blink_sequence"): if led_sequence.blink_sequence.HasField("color"): led_sequence.blink_sequence.color.CopyFrom( clamp_and_normalize_color(led_sequence.blink_sequence.color)) elif led_sequence.HasField("pulse_sequence"): if led_sequence.pulse_sequence.HasField("color"): led_sequence.pulse_sequence.color.CopyFrom( clamp_and_normalize_color(led_sequence.pulse_sequence.color)) elif led_sequence.HasField("synced_blink_sequence"): for frame, idx in enumerate(led_sequence.synced_blink_sequence.frames): if frame.HasField("color"): color = clamp_and_normalize_color(frame.color) led_sequence.synced_blink_sequence.frames[idx] = color elif led_sequence.HasField("solid_color_sequence"): if led_sequence.solid_color_sequence.HasField("color"): led_sequence.solid_color_sequence.color.CopyFrom( clamp_and_normalize_color(led_sequence.solid_color_sequence.color)) return led_sequence_group
# Scale color so that their Euclidean norm does not exceed max_color_magnitude. # NOTE: max_color_magnitude of 255 (roughly 50% of sqrt(3*255^2)=441.67) is a heuristic chosen to prevent damage to the robot's LEDs. # Exceeding this value may result in damage to the robot's LEDs that will NOT be covered under warranty.
[docs]def clamp_and_normalize_color(color, max_color_magnitude=255): r, g, b = color.rgb.r, color.rgb.g, color.rgb.b norm = math.sqrt(r**2 + g**2 + b**2) if norm > max_color_magnitude and norm > 0: scale = max_color_magnitude / norm scaled_color = audio_visual_pb2.Color( rgb=audio_visual_pb2.Color.RGB(r=int(r * scale), g=int(g * scale), b=int(b * scale))) print(f"Input color {color} scaled by {scale:.2f}. Clamped color: {scaled_color}.") color = scaled_color return color