# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""GrpcSeriesWriter is a class for registering a series which stores GRPC request/response pairs."""
from bosdyn.util import timestamp_to_nsec
from .bosdyn import GrpcRequests, GrpcResponses
from .common import PROTOBUF_CONTENT_TYPE
[docs]class GrpcServiceWriter: # pylint: disable=too-many-instance-attributes
"""A class for logging GRPC request and response messages."""
def __init__(self, data_writer, service_name):
self._data_writer = data_writer
self._service_name = service_name
self._request_types = {}
self._response_types = {}
[docs] def log_request(self, protobuf):
"""Store request protobuf in the file.
Args
protobuf: a protobuf request message, not serialized.
"""
series_index = self._get_series_index(protobuf, is_request=True)
self._data_writer.write_data(
series_index,
# clock correction??? -- what does C++ do?
timestamp_to_nsec(protobuf.header.request_timestamp),
protobuf.SerializeToString())
[docs] def log_response(self, protobuf):
"""Store response protobuf in the file.
Args
protobuf: a protobuf response message, not serialized.
"""
series_index = self._get_series_index(protobuf, is_request=False)
self._data_writer.write_data(series_index,
timestamp_to_nsec(protobuf.header.response_timestamp),
protobuf.SerializeToString())
def _get_series_index(self, protobuf, is_request):
message_name = protobuf.DESCRIPTOR.full_name # pylint: disable=no-member
if is_request:
name_to_index = self._request_types
series_type = GrpcRequests
else:
name_to_index = self._response_types
series_type = GrpcResponses
try:
return name_to_index[message_name]
except KeyError:
pass
series_spec = {
series_type.SERVICE_NAME: self._service_name,
series_type.MESSAGE_TYPE: message_name
}
series_index = self._data_writer.add_message_series(series_type.SERIES_TYPE, series_spec,
content_type=PROTOBUF_CONTENT_TYPE,
type_name=message_name)
name_to_index[message_name] = series_index
return series_index