Source code for bosdyn.client.bddf_download

# Copyright (c) 2023 Boston Dynamics, Inc.  All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).

"""Code for downloading robot data in bddf format."""
import logging
import re
import ssl
import sys
from urllib.parse import urlencode
from urllib.request import Request, urlopen

from bosdyn.client.time_sync import (NotEstablishedError, TimeSyncClient, TimeSyncEndpoint,
                                     robot_time_range_from_nanoseconds, timespec_to_robot_timespan)
from bosdyn.util import TIME_FORMAT_DESC

LOGGER = logging.getLogger()

REQUEST_CHUNK_SIZE = 10 * (1024**2)  # This value is not guaranteed.
REQUEST_TIMEOUT = 20  # Seconds.

DEFAULT_OUTPUT = "./download.bddf"


def _print_help_timespan():
    print("""\
A timespan is {{timeval}} or {{timeval}}-{{timeval}}.

{}

For example:
  '5m'                    From 5 minutes ago until now.
  '20201107-20201108'     All of 2020/11/07.
""".format(TIME_FORMAT_DESC))


def _bddf_url(hostname):
    return 'https://{}/v1/data-buffer/bddf/'.format(hostname)


def _http_headers(robot):
    return {"Authorization": "Bearer {}".format(robot.user_token)}


def _request_timespan_from_time_range(time_range):
    ret = {}
    # pylint: disable=no-member
    if time_range.HasField('start'):
        ret['from_sec'] = str(time_range.start.seconds)
    if time_range.HasField('end'):
        ret['to_sec'] = str(time_range.end.seconds)
    return ret


def _request_timespan_from_spec(timespec, time_sync_endpoint):
    return _request_timespan_from_time_range(
        timespec_to_robot_timespan(timespec, time_sync_endpoint))


def _request_timespan_from_nanoseconds(start_nsec, end_nsec, time_sync_endpoint):
    return _request_timespan_from_time_range(
        robot_time_range_from_nanoseconds(start_nsec, end_nsec, time_sync_endpoint))


[docs]def download_data( # pylint: disable=too-many-arguments,too-many-locals robot, hostname, start_nsec=None, end_nsec=None, timespan_spec=None, output_filename=None, robot_time=False, channel=None, message_type=None, grpc_service=None, show_progress=False): """ Download data from robot in bddf format Args: robot: API robot object hostname: hostname/ip-address of robot start_nsec: start time of log end_nsec: end time of log timespan_spec: if start_time, end_time are None, string representing the timespan to download robot_time: if True, timespan is in robot_clock, if False, in host clock channel: if set, limit data to download to a specific channel message_type: if set, limit data by specified message-type grpc_service: if set, limit GRPC log data by name of service Returns: output filename, or None on error """ time_sync_endpoint = None if not robot_time: # Establish time sync with robot to obtain skew. time_sync_client = robot.ensure_client(TimeSyncClient.default_service_name) time_sync_endpoint = TimeSyncEndpoint(time_sync_client) if not time_sync_endpoint.establish_timesync(): raise NotEstablishedError("time sync not established") # Now assemble the query to obtain a bddf file. # Get the parameters for limiting the timespan of the response. if start_nsec or end_nsec: get_params = _request_timespan_from_nanoseconds(start_nsec, end_nsec, time_sync_endpoint) else: get_params = _request_timespan_from_spec(timespan_spec, time_sync_endpoint) # Optional parameters for limiting the messages if channel: get_params['channel'] = channel if message_type: get_params['type'] = message_type if grpc_service: get_params['grpc_service'] = grpc_service # Request the data. url = _bddf_url(hostname) + '?{}'.format(urlencode(get_params)) request = Request(url, headers=_http_headers(robot)) context = ssl._create_unverified_context() # pylint: disable=protected-access with urlopen(request, context=context, timeout=REQUEST_TIMEOUT) as resp: if resp.status != 200: LOGGER.error("%s %s response: %d", url, get_params, resp.status) return None outfile = output_filename if output_filename else _output_filename(resp) with open(outfile, 'wb') as fid: while True: chunk = resp.read(REQUEST_CHUNK_SIZE) if len(chunk) == 0: break if show_progress: print('.', end='', flush=True) fid.write(chunk) if show_progress: print() return outfile
def _output_filename(response): """Get output filename either from http response, or default value.""" content = response.headers['Content-Disposition'] if len(content) < 2: LOGGER.debug("Content-Disposition not set correctly.") return DEFAULT_OUTPUT match = re.search(r'filename=\"?([^\"]+)', content) if not match: return DEFAULT_OUTPUT return match.group(1)
[docs]def main(): """Command-line interface""" # pylint: disable=import-outside-toplevel import argparse from bosdyn.client import InvalidLoginError, create_standard_sdk from bosdyn.client.util import add_common_arguments, authenticate parser = argparse.ArgumentParser() parser.add_argument('-T', '--timespan', default='5m', help='Time span (default last 5 minutes)') parser.add_argument('--help-timespan', action='store_true', help='Print time span formatting options') parser.add_argument('-c', '--channel', help='Specify channel for data (default=all)') parser.add_argument('-t', '--type', help='Specify message type (default=all)') parser.add_argument('-s', '--service', help='Specify service name (default=all)') parser.add_argument('-o', '--output', help='Output file name (default is "download.bddf"') parser.add_argument('-R', '--robot-time', action='store_true', help='Specified timespan is in robot time') add_common_arguments(parser, credentials_no_warn=True) options = parser.parse_args() options.verbose = level = logging.DEBUG if options.verbose else logging.INFO logging.basicConfig(level=level, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') if options.help_timespan: _print_help_timespan() return 0 # Create a robot object. sdk = create_standard_sdk('bddf') robot = sdk.create_robot(options.hostname) # Use the robot object to authenticate to the robot. # A JWT Token is required to download log data. try: if options.username or options.password: robot.authenticate(options.username, options.password) else: authenticate(robot) except InvalidLoginError as err: LOGGER.error("Cannot authenticate to robot to obtain token: %s", err) return 1 output_filename = download_data(robot, options.hostname, timespan_spec=options.timespan, robot_time=options.robot_time, channel=options.channel, message_type=options.type, grpc_service=options.service, show_progress=True) if not output_filename: return 1 LOGGER.info("Wrote '%s'.", output_filename) return 0
if __name__ == "__main__": sys.exit(main())