# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
import io
import json
import logging
import os
import ssl
import time
from pathlib import Path
from urllib.error import URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Struct
import bosdyn.client
import bosdyn.client.util
from bosdyn.api import data_acquisition_pb2, data_acquisition_store_pb2
from bosdyn.client.exceptions import ResponseError
# Logger for all the debug information from the tests.
_LOGGER = logging.getLogger()
[docs]def issue_acquire_data_request(data_acq_client, acquisition_requests, group_name, action_name,
metadata=None, data_timestamp=None):
"""Sends the data acquisition request without blocking until the acquisition completes.
Args:
data_acq_client: DataAcquisition client for send the acquisition requests.
acquisition_requests: Acquisition requests to include in request message.
group_name: Group name for the acquisitions.
action_name: Action name for the acquisitions.
metadata: Metadata to include in the request message.
data_timestamp: Timestamp to use for the acquisitions. If None the timestamp will be
generated by the data acquisition client.
Returns:
The request id (int) and the action id (CaptureActionId). A request id set as None
indicates the AcquireData rpc failed.
"""
# Create action id for the query for this request.
action_id = data_acquisition_pb2.CaptureActionId(action_name=action_name, group_name=group_name,
timestamp=data_timestamp)
# Send an AcquireData request
request_id = None
try:
request_id = data_acq_client.acquire_data(acquisition_requests=acquisition_requests,
action_name=action_name,
group_name=action_id.group_name,
metadata=metadata, data_timestamp=data_timestamp)
except ResponseError as err:
print("Exception raised by issue_acquire_data_request: " + str(err))
return request_id, action_id
[docs]def acquire_and_process_request(data_acquisition_client, acquisition_requests, group_name,
action_name, metadata=None, block_until_complete=True,
data_timestamp=None):
"""Send acquisition request and optionally block until the acquisition completes.
If blocking, the GetStatus RPC is used to monitor the status of the acquisition request.
Args:
data_acquisition_client (DataAcquisitionClient): The client for send the acquisition requests.
acquisition_requests(data_acquisition_pb2.AcquisitionRequestList): Acquisition requests
to include in request message.
group_name(string): Group name for the acquisitions.
action_name(string): Action name for the acquisitions.
metadata(data_acquisition_pb2.Metadata): Metadata to include in the request message.
block_until_complete(Boolean): If true, don't return until the GetStatus completes.
data_timestamp: Timestamp to use for the acquisitions. If None the timestamp will be
generated by the data acquisition client.
Returns:
Boolean indicating if the acquisition completed successfully or not.
"""
# Make the acquire data request. This will return our current request id.
request_id, action_id = issue_acquire_data_request(data_acquisition_client,
acquisition_requests, group_name,
action_name, metadata, data_timestamp)
if not request_id:
# The AcquireData request failed for some reason. No need to attempt to
# monitor the status.
return False
if not block_until_complete:
return True
# Monitor the status of the data acquisition.
print("Waiting for acquisition (id: %s) to complete." % str(request_id))
while True:
get_status_response = None
try:
get_status_response = data_acquisition_client.get_status(request_id)
except ResponseError as err:
print("Exception: %s" % str(err))
return False
print("Current status is: %s" %
data_acquisition_pb2.GetStatusResponse.Status.Name(get_status_response.status))
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_COMPLETE:
return True
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_TIMEDOUT:
print("Unrecoverable request timeout: %s" % get_status_response)
return False
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_DATA_ERROR:
print("Data error was received: %s" % get_status_response)
return False
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_REQUEST_ID_DOES_NOT_EXIST:
print(
"The acquisition request id %s is unknown: %s" % (request_id, get_status_response))
return False
time.sleep(0.2)
return True
[docs]def cancel_acquisition_request(data_acq_client, request_id):
"""Cancels an acquisition request based on the request id
Args:
data_acq_client: DataAcquisition client for send the acquisition requests.
request_id: The id number for the AcquireData request to cancel.
Returns:
None.
"""
if not request_id:
# The incoming request id is invalid. No need to attempt to cancel the request or
# monitor the status.
return
try:
is_cancelled_response = data_acq_client.cancel_acquisition(request_id)
print("Status of the request to cancel the data-acquisition in progress: " +
data_acquisition_pb2.CancelAcquisitionResponse.Status.Name(
is_cancelled_response.status))
except ResponseError as err:
print("ResponseError raised when cancelling: " + str(err))
# Don't attempt to wait for the cancellation success status.
return
# Monitor the status of the cancellation to confirm it was successfully cancelled.
while True:
get_status_response = None
try:
get_status_response = data_acq_client.get_status(request_id)
except ResponseError as err:
print("Exception: " + str(err))
break
print("Request " + str(request_id) + " status: " +
data_acquisition_pb2.GetStatusResponse.Status.Name(get_status_response.status))
if get_status_response.status == data_acquisition_pb2.GetStatusResponse.STATUS_ACQUISITION_CANCELLED:
print("The request is fully cancelled.")
break
[docs]def clean_filename(filename):
"""Removes bad characters in a filename.
Args:
filename(string): Original filename to clean.
Returns:
Valid filename with removed characters :*?<>|
"""
return "".join(i for i in filename if i not in ":*?<>|")
[docs]def make_time_query_params(start_time_secs, end_time_secs, robot):
"""Create time-based query params for the download request.
Args:
start_time_secs(float): The start time for the download data range.
end_time_secs(float): The end time for the download range.
robot (Robot): The robot object, used to acquire timesync and convert the
times to robot time.
Returns:
The query params (data_acquisition_store_pb2.DataQueryParams) for the time-range download.
"""
from_timestamp = robot.time_sync.robot_timestamp_from_local_secs(start_time_secs)
to_timestamp = robot.time_sync.robot_timestamp_from_local_secs(end_time_secs)
print(from_timestamp.ToJsonString(), to_timestamp.ToJsonString())
query_params = data_acquisition_store_pb2.DataQueryParams(
time_range=data_acquisition_store_pb2.TimeRangeQuery(from_timestamp=from_timestamp,
to_timestamp=to_timestamp))
return query_params
[docs]def make_time_query_params_from_group_name(group_name, data_store_client):
"""Create time-based query params for the download request using the group name.
Args:
group_name(string): The group name for the data to be downloaded.
data_store_client(DataAcquisitionStoreClient): The data store client, used to get the
action ids for the group name.
Returns:
The query params (data_acquisition_store_pb2.DataQueryParams) for the time-range download.
"""
action_id = data_acquisition_pb2.CaptureActionId(group_name=group_name)
query_params = data_acquisition_store_pb2.DataQueryParams(
action_ids=data_acquisition_store_pb2.ActionIdQuery(action_ids=[action_id]))
saved_capture_actions = []
try:
saved_capture_actions = data_store_client.list_capture_actions(query_params)
except Exception as err:
_LOGGER.error("Failed to list the capture action ids for group_name %s: %s", group_name,
err)
return None
# Filter all the CaptureActionIds for the start/end time. These end times are already in
# the robots clock and do not need to be converted using timesync.
start_time = (None, None)
end_time = (None, None)
for action_id in saved_capture_actions:
timestamp = action_id.timestamp
time_secs = timestamp.seconds + timestamp.nanos / 1e9
if time_secs == 0:
# The plugin captures don't seem to set a timestamp, so ignore them when determining
# the start/end times for what to download.
continue
if start_time[0] is None or time_secs < start_time[0]:
start_time = (time_secs, timestamp)
if end_time[0] is None or time_secs > end_time[0]:
end_time = (time_secs, timestamp)
if not (start_time and end_time):
_LOGGER.error("Could not find a start/end time from the list of capture action ids: %s",
saved_capture_actions)
return None
# Ensure the timestamps are ordered correctly and the
assert start_time[0] <= end_time[0]
# Adjust the start/end time by a few seconds each to give buffer room.
start_time[1].seconds -= 3
end_time[1].seconds += 3
_LOGGER.info("Downloading data with a start time of %s seconds and end time of %s seconds.",
start_time[0], end_time[0])
# Make the download data request with a time query parameter.
query_params = data_acquisition_store_pb2.DataQueryParams(
time_range=data_acquisition_store_pb2.TimeRangeQuery(from_timestamp=start_time[1],
to_timestamp=end_time[1]))
return query_params
[docs]def download_data_REST(query_params, hostname, token, destination_folder='.',
additional_params=None):
"""Retrieve all data for a query from the DataBuffer REST API and write it to files.
Args:
query_params(bosdyn.api.DataQueryParams): Query parameters to use to retrieve metadata from
the DataStore service. Must be time-based query parameters only.
hostname(string): Hostname to specify in URL where the DataBuffer service is running.
token(string): User token to specify in https GET request for authentication.
destination_folder(string): Folder where to download the data.
additional_params(dict): Additional GET parameters to append to the URL.
Returns:
Boolean indicating if the data was downloaded successfully or not.
"""
try:
url = 'https://{}/v1/data-buffer/daq-data/'.format(hostname)
absolute_path = Path(destination_folder).absolute()
folder = Path(absolute_path.parent, clean_filename(absolute_path.name), 'REST')
folder.mkdir(parents=True, exist_ok=True)
headers = {"Authorization": "Bearer {}".format(token)}
get_params = additional_params or {}
if query_params.HasField('time_range'):
get_params.update({
'from_nsec': query_params.time_range.from_timestamp.ToNanoseconds(),
'to_nsec': query_params.time_range.to_timestamp.ToNanoseconds()
})
chunk_size = 10 * (1024**2) # This value is not guaranteed.
url = url + '?{}'.format(urlencode(get_params))
request = Request(url, headers=headers)
context = ssl._create_unverified_context()
with urlopen(request, context=context) as resp:
print("Download request HTTPS status code: %s" % resp.status)
# This is the default file name used to download data, updated from response.
if resp.status == 204:
print("No content available for the specified download time range (in seconds): "
"[%d, %d]" % (query_params.time_range.from_timestamp.ToNanoseconds() / 1.0e9,
query_params.time_range.to_timestamp.ToNanoseconds() / 1.0e9))
return False
download_file = Path(folder, "download.zip")
content = resp.headers['Content-Disposition']
if not content or len(content) < 2:
print("ERROR: Content-Disposition is not set correctly")
return False
else:
start_ind = content.find('\"')
if start_ind == -1:
print("ERROR: Content-Disposition does not have a \"")
return False
else:
start_ind += 1
download_file = Path(folder, clean_filename(content[start_ind:-1]))
with open(str(download_file), 'wb') as fid:
while True:
chunk = resp.read(chunk_size)
if len(chunk) == 0:
break
print('.', end='', flush=True)
fid.write(chunk)
except URLError as rest_error:
print("REST Exception:\n")
print(rest_error)
return False
except IOError as io_error:
print("IO Exception:\n")
print(io_error)
return False
# Data downloaded and saved to local disc successfully.
return True