Source code for bosdyn.client.robot_state

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to use the robot state service."""

from bosdyn.api import robot_state_pb2, robot_state_service_pb2_grpc
from bosdyn.client.common import BaseClient, common_header_errors


[docs]class RobotStateClient(BaseClient): """Client for the RobotState service.""" default_service_name = 'robot-state' service_type = 'bosdyn.api.RobotStateService' def __init__(self): super(RobotStateClient, self).__init__(robot_state_service_pb2_grpc.RobotStateServiceStub)
[docs] def get_robot_state(self, **kwargs): """Obtain current state of the robot. Returns: RobotState: The current robot state. Raises: RpcError: Problem communicating with the robot. """ req = self._get_robot_state_request() return self.call(self._stub.GetRobotState, req, _get_robot_state_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_robot_state_async(self, **kwargs): """Async version of get_robot_state()""" req = self._get_robot_state_request() return self.call_async(self._stub.GetRobotState, req, _get_robot_state_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_robot_metrics(self, **kwargs): """Obtain robot metrics, such as distance traveled or time powered on. Returns: All of the current robot metrics. Raises: RpcError: Problem communicating with the robot. """ req = self._get_robot_metrics_request() return self.call(self._stub.GetRobotMetrics, req, _get_robot_metrics_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_robot_metrics_async(self, **kwargs): """Async version of get_robot_state()""" req = self._get_robot_metrics_request() return self.call_async(self._stub.GetRobotMetrics, req, _get_robot_metrics_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_robot_hardware_configuration(self, **kwargs): """Obtain current hardware configuration of robot. Returns: The hardware configuration, which includes the link names. Raises: RpcError: Problem communicating with the robot. """ req = self._get_robot_hardware_configuration_request() return self.call(self._stub.GetRobotHardwareConfiguration, req, _get_robot_hardware_configuration_value, common_header_errors, copy_request=False, **kwargs)
[docs] def get_robot_hardware_configuration_async(self, **kwargs): """Async version of get_robot_hardware_configuration()""" req = self._get_robot_hardware_configuration_request() return self.call_async(self._stub.GetRobotHardwareConfiguration, req, _get_robot_hardware_configuration_value, common_header_errors, copy_request=False, **kwargs)
@staticmethod def _get_robot_state_request(): return robot_state_pb2.RobotStateRequest() @staticmethod def _get_robot_metrics_request(): return robot_state_pb2.RobotMetricsRequest() @staticmethod def _get_robot_hardware_configuration_request(): return robot_state_pb2.RobotHardwareConfigurationRequest() @staticmethod def _get_robot_link_model_request(link_name): return robot_state_pb2.RobotLinkModelRequest(link_name=link_name)
[docs]class RobotStateStreamingClient(BaseClient): """Client for the RobotState service. This client is in BETA and may undergo changes in future releases. """ default_service_name = 'robot-state-streaming' service_type = 'bosdyn.api.RobotStateStreamingService' def __init__(self): super(RobotStateStreamingClient, self).__init__(robot_state_service_pb2_grpc.RobotStateStreamingServiceStub)
[docs] def get_robot_state_stream(self, **kwargs): """Returns an iterator providing current state updates of the robot.""" req = self._get_robot_state_stream_request() return self._stub.GetRobotStateStream(req)
@staticmethod def _get_robot_state_stream_request(): return robot_state_pb2.RobotStateStreamRequest()
def _get_robot_state_value(response): return response.robot_state def _get_robot_state_stream_value(response): return response def _get_robot_metrics_value(response): return response.robot_metrics def _get_robot_hardware_configuration_value(response): return response.hardware_configuration def _get_robot_link_model_value(response): return response.link_model
[docs]def has_arm(state_client, timeout=None): """Check if the robot has an arm attached. Args: state_client: RobotStateClient to query for robot state. timeout: Number of seconds to wait for RPC response. Returns: bool: Returns True if robot has an arm, False otherwise. Raises: RpcError: A problem occurred trying to communicate with the robot. """ state = state_client.get_robot_state(timeout=timeout) return state.HasField("manipulator_state")