Source code for bosdyn.client.fault

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""For clients to use the fault service."""
import collections

from bosdyn.api import fault_service_pb2_grpc, service_fault_pb2
from bosdyn.client.common import (BaseClient, common_header_errors, error_factory, error_pair,
                                  handle_common_header_errors, handle_unset_status_error)

from .exceptions import ResponseError


[docs]class FaultResponseError(ResponseError): """General class of errors for directory registration responses."""
[docs]class ServiceFaultAlreadyExistsError(FaultResponseError): """The specified service fault id already exists as an active fault on the robot."""
[docs]class ServiceFaultDoesNotExistError(FaultResponseError): """The specified service fault id does not match any active service faults on the robot."""
[docs]class FaultClient(BaseClient): """Client for the Fault service.""" default_service_name = 'fault' service_type = 'bosdyn.api.FaultService' def __init__(self): super(FaultClient, self).__init__(fault_service_pb2_grpc.FaultServiceStub)
[docs] def trigger_service_fault(self, service_fault, **kwargs): """Broadcast a new service fault through the robot. Args: service_fault (bosdyn.api.ServiceFault): Populated fault message to broadcast. Returns: An instance of bosdyn.api.TriggerServiceFaultResponse Raises: RpcError: Problem communicating with the robot. ServiceFaultAlreadyExistsError: The service fault already exists. FaultResponseError: Something went wrong during the fault trigger. """ req = service_fault_pb2.TriggerServiceFaultRequest() req.fault.CopyFrom(service_fault) return self.call(self._stub.TriggerServiceFault, req, None, error_from_response=_trigger_service_fault_error, copy_request=False, **kwargs)
[docs] def trigger_service_fault_async(self, service_fault, **kwargs): """Async version of trigger_service_fault()""" req = service_fault_pb2.TriggerServiceFaultRequest() req.fault.CopyFrom(service_fault) return self.call_async(self._stub.TriggerServiceFault, req, None, error_from_response=_trigger_service_fault_error, copy_request=False, **kwargs)
[docs] def clear_service_fault(self, service_fault_id, clear_all_service_faults=False, clear_all_payload_faults=False, **kwargs): """Clear a service fault from the robot state. Args: service_fault_id (bosdyn.api.ServiceFaultId): ServiceFault to clear. clear_all_service_faults (bool): Clear all faults associated with the service name. clear_all_payload_faults (bool): Clear all faults associated with the payload guid. Returns: An instance of bosdyn.api.ClearServiceFaultResponse Raises: RpcError: Problem communicating with the robot. ServiceFaultDoesNotExistError: The service fault does not exist in active service faults. FaultResponseError: Something went wrong during the fault clear. """ req = service_fault_pb2.ClearServiceFaultRequest() req.fault_id.CopyFrom(service_fault_id) req.clear_all_service_faults = clear_all_service_faults req.clear_all_payload_faults = clear_all_payload_faults return self.call(self._stub.ClearServiceFault, req, None, error_from_response=_clear_service_fault_error, copy_request=False, **kwargs)
[docs] def clear_service_fault_async(self, service_fault_id, clear_all_service_faults=False, clear_all_payload_faults=False, **kwargs): """Async version of clear_service_fault()""" req = service_fault_pb2.ClearServiceFaultRequest() req.fault_id.CopyFrom(service_fault_id) req.clear_all_service_faults = clear_all_service_faults req.clear_all_payload_faults = clear_all_payload_faults return self.call_async(self._stub.ClearServiceFault, req, None, error_from_response=_clear_service_fault_error, copy_request=False, **kwargs)
_TRIGGER_STATUS_TO_ERROR = collections.defaultdict(lambda: (FaultResponseError, None)) _TRIGGER_STATUS_TO_ERROR.update({ service_fault_pb2.TriggerServiceFaultResponse.STATUS_OK: (None, None), service_fault_pb2.TriggerServiceFaultResponse.STATUS_FAULT_ALREADY_ACTIVE: error_pair(ServiceFaultAlreadyExistsError), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _trigger_service_fault_error(response): """Return an exception based on response from Trigger RPC, None if no error.""" return error_factory(response, response.status, status_to_string=service_fault_pb2.TriggerServiceFaultResponse.Status.Name, status_to_error=_TRIGGER_STATUS_TO_ERROR) _CLEAR_STATUS_TO_ERROR = collections.defaultdict(lambda: (FaultResponseError, None)) _CLEAR_STATUS_TO_ERROR.update({ service_fault_pb2.ClearServiceFaultResponse.STATUS_OK: (None, None), service_fault_pb2.ClearServiceFaultResponse.STATUS_FAULT_NOT_ACTIVE: error_pair(ServiceFaultDoesNotExistError), }) @handle_common_header_errors @handle_unset_status_error(unset='STATUS_UNKNOWN') def _clear_service_fault_error(response): """Return an exception based on response from Clear RPC, None if no error.""" return error_factory(response, response.status, status_to_string=service_fault_pb2.ClearServiceFaultResponse.Status.Name, status_to_error=_CLEAR_STATUS_TO_ERROR)