# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to the Spot CAM Health service."""
import logging
_LOGGER = logging.getLogger(__name__)
from bosdyn.api.spot_cam import health_pb2, service_pb2_grpc
from bosdyn.client.common import BaseClient, common_header_errors, handle_common_header_errors
[docs]class HealthClient(BaseClient):
"""A client calling Spot CAM Health service.
"""
default_service_name = 'spot-cam-health'
service_type = 'bosdyn.api.spot_cam.HealthService'
def __init__(self):
super(HealthClient, self).__init__(service_pb2_grpc.HealthServiceStub)
[docs] def clear_bit_events(self, **kwargs):
"""Clear out the events list of the BITStatus structure."""
request = health_pb2.ClearBITEventsRequest()
return self.call(self._stub.ClearBITEvents, request, self._clear_bit_events_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def clear_bit_events_async(self, **kwargs):
"""Async version of clear_bit_events()."""
request = health_pb2.ClearBITEventsRequest()
return self.call_async(self._stub.ClearBITEvents, request,
self._clear_bit_events_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_bit_status(self, **kwargs):
"""Retrieve (system events, degradations) as a tuple of two lists."""
request = health_pb2.GetBITStatusRequest()
return self.call(self._stub.GetBITStatus, request, self._get_bit_status_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_bit_status_async(self, **kwargs):
"""Async version of get_bit_status()."""
request = health_pb2.GetBITStatusRequest()
return self.call_async(self._stub.GetBITStatus, request, self._get_bit_status_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_temperature(self, **kwargs):
"""Retrieve a list of thermometers measuring the temperature (mC) of corresponding on-board devices."""
request = health_pb2.GetTemperatureRequest()
return self.call(self._stub.GetTemperature, request, self._get_temperature_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_temperature_async(self, **kwargs):
"""Async version of get_temperature()."""
request = health_pb2.GetTemperatureRequest()
return self.call_async(self._stub.GetTemperature, request,
self._get_temperature_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_system_log(self, **kwargs):
"""Retrieve a list of thermometers measuring the temperature (mC) of corresponding on-board devices."""
request = health_pb2.GetSystemLogRequest()
return self.call(self._stub.GetSystemLog, request, self._get_system_log_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
[docs] def get_system_log_async(self, **kwargs):
"""Async version of get_system_log()."""
request = health_pb2.GetSystemLogRequest()
return self.call_async(self._stub.GetSystemLog, request, self._get_system_log_from_response,
self._health_error_from_response, copy_request=False, **kwargs)
@staticmethod
def _clear_bit_events_from_response(response):
pass
@staticmethod
def _get_bit_status_from_response(response):
return response.events, response.degradations
@staticmethod
def _get_temperature_from_response(response):
return response.temps
@staticmethod
def _get_system_log_from_response(responses):
total = 0
local_chunks = []
for response in responses:
chunk = response.data
total += len(chunk.data)
_LOGGER.debug('Retrieved {} bytes ({}/{})'.format(len(chunk.data), total,
chunk.total_size))
local_chunks.append(chunk)
return b''.join(chunk.data for chunk in local_chunks)
@staticmethod
@handle_common_header_errors
def _health_error_from_response(response): # pylint: disable=unused-argument
return None