You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
509 lines
15 KiB
509 lines
15 KiB
#!@PYSHEBANG@
|
|
# @GENERATED@
|
|
#
|
|
#
|
|
# This file is Copyright 2010 by the GPSD project
|
|
# SPDX-License-Identifier: BSD-2-clause
|
|
|
|
# This code runs compatibly under Python 2 and 3.x for x >= 2.
|
|
# Preserve this property!
|
|
# Codacy D203 and D211 conflict, I choose D203
|
|
# Codacy D212 and D213 conflict, I choose D212
|
|
|
|
"""gpsfake -- test harness for gpsd.
|
|
|
|
Simulates one or more GPSes, playing back logfiles.
|
|
Most of the logic for this now lives in gps.fake,
|
|
factored out so we can write other test programs with it.
|
|
"""
|
|
from __future__ import absolute_import, print_function, division
|
|
|
|
from distutils import spawn
|
|
import argparse
|
|
import os
|
|
import platform
|
|
import pty
|
|
import socket
|
|
import sys
|
|
import time
|
|
|
|
# pylint wants local modules last
|
|
try:
|
|
import gps
|
|
import gps.fake as gpsfake # The "as" pacifies pychecker
|
|
import gps.misc # for polybyte() polystr()
|
|
except ImportError as e:
|
|
sys.stderr.write(
|
|
"gpsfake: can't load Python gps libraries -- check PYTHONPATH.\n")
|
|
sys.stderr.write("%s\n" % e)
|
|
sys.exit(1)
|
|
|
|
gps_version = '@VERSION@'
|
|
if gps.__version__ != gps_version:
|
|
sys.stderr.write("gpsfake: ERROR: need gps module version %s, got %s\n" %
|
|
(gps_version, gps.__version__))
|
|
sys.exit(1)
|
|
|
|
try:
|
|
my_input = raw_input
|
|
except NameError:
|
|
my_input = input
|
|
|
|
# Get version of stdout for bytes data (NOP in Python 2)
|
|
bytesout = gps.get_bytes_stream(sys.stdout)
|
|
|
|
|
|
class Baton(object):
|
|
|
|
"""Ship progress indications to stderr."""
|
|
# By setting this > 1 we reduce the frequency of the twirl
|
|
# and speed up test runs. Should be relatively prime to the
|
|
# number of baton states, otherwise it will cause beat artifacts
|
|
# in the twirling.
|
|
SPINNER_INTERVAL = 11
|
|
|
|
def __init__(self, prompt, endmsg=None):
|
|
"""Init class Baton."""
|
|
self.stream = sys.stderr
|
|
self.stream.write(prompt + "...")
|
|
if os.isatty(self.stream.fileno()):
|
|
self.stream.write(" \b")
|
|
self.stream.flush()
|
|
self.count = 0
|
|
self.endmsg = endmsg
|
|
self.time = time.time()
|
|
|
|
def twirl(self, ch=None):
|
|
"""Twirl the baton."""
|
|
if self.stream is None:
|
|
return
|
|
if os.isatty(self.stream.fileno()):
|
|
if ch:
|
|
self.stream.write(ch)
|
|
self.stream.flush()
|
|
elif self.count % Baton.SPINNER_INTERVAL == 0:
|
|
self.stream.write("-/|\\"[self.count % 4])
|
|
self.stream.write("\b")
|
|
self.stream.flush()
|
|
self.count = self.count + 1
|
|
|
|
def end(self, mesg=None):
|
|
"""Write end message."""
|
|
if mesg is None:
|
|
mesg = self.endmsg
|
|
if self.stream:
|
|
self.stream.write("...(%2.2f sec) %s.\n"
|
|
% (time.time() - self.time, mesg))
|
|
|
|
|
|
def hexdump(s):
|
|
"""Convert string to hex"""
|
|
rep = ""
|
|
for c in s:
|
|
rep += "%02x" % ord(c)
|
|
return rep
|
|
|
|
|
|
def check_xterm():
|
|
"""Check whether xterm and DISPLAY are available."""
|
|
xterm = spawn.find_executable('xterm')
|
|
return xterm and os.access(xterm, os.X_OK) and os.environ.get('DISPLAY')
|
|
|
|
|
|
def fakehook(linenumber, fakegps):
|
|
"""Do the real work."""
|
|
if not fakegps.testload.sentences:
|
|
sys.stderr.write("fakegps: no sentences in test load.\n")
|
|
raise SystemExit(1)
|
|
if linenumber % len(fakegps.testload.sentences) == 0:
|
|
if options.singleshot and linenumber > 0:
|
|
return False
|
|
if options.progress:
|
|
baton.twirl('*\b')
|
|
elif not options.singleshot:
|
|
if not options.quiet:
|
|
sys.stderr.write("gpsfake: log cycle of %s begins.\n"
|
|
% fakegps.testload.name)
|
|
time.sleep(options.cycle)
|
|
if options.linedump and fakegps.testload.legend:
|
|
ml = fakegps.testload.sentences[
|
|
linenumber % len(fakegps.testload.sentences)].strip()
|
|
if not fakegps.testload.textual:
|
|
ml = hexdump(ml)
|
|
announce = ((fakegps.testload.legend %
|
|
(linenumber % len(fakegps.testload.sentences) + 1)) +
|
|
gps.polystr(ml))
|
|
if options.promptme:
|
|
my_input(announce + "? ")
|
|
else:
|
|
print(announce)
|
|
if options.progress:
|
|
baton.twirl()
|
|
return True
|
|
|
|
|
|
if __name__ == '__main__':
|
|
description = 'Fake gpsd from a log file.'
|
|
usage = '%(prog)s [OPTIONS] logfile...'
|
|
epilog = ('BSD terms apply: see the file COPYING in the distribution root'
|
|
' for details.')
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description=description,
|
|
epilog=epilog,
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
usage=usage)
|
|
parser.add_argument(
|
|
'-?',
|
|
action="help",
|
|
help='show this help message and exit'
|
|
)
|
|
parser.add_argument(
|
|
'-1',
|
|
'--singleshot',
|
|
dest='singleshot',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Logfile is interpreted once only rather than repeatedly. '
|
|
'[Default %(default)s]')
|
|
)
|
|
parser.add_argument(
|
|
'-b',
|
|
'--baton',
|
|
dest='progress',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Enable a twirling-baton progress indicator. '
|
|
'[Default %(default)s]')
|
|
)
|
|
parser.add_argument(
|
|
'-c',
|
|
'--cycle',
|
|
default=0.0,
|
|
dest='cycle',
|
|
metavar='CYCLE',
|
|
type=float,
|
|
help=('Sets the delay between sentences in seconds. '
|
|
'[Default %(default)s]'),
|
|
)
|
|
parser.add_argument(
|
|
'-D',
|
|
'--debug',
|
|
action='append',
|
|
dest='debug',
|
|
metavar='OPT',
|
|
help='Pass a -D option OPT to the daemon.',
|
|
)
|
|
parser.add_argument(
|
|
'-g',
|
|
'--gdb',
|
|
dest='gdb',
|
|
default=False,
|
|
action="store_true",
|
|
help='Run the gpsd instance within gpsfake under control of gdb.',
|
|
)
|
|
parser.add_argument(
|
|
'-G',
|
|
'--lldb',
|
|
dest='lldb',
|
|
default=False,
|
|
action="store_true",
|
|
help='Run the gpsd instance within gpsfake under control of lldb.',
|
|
)
|
|
parser.add_argument(
|
|
'-i',
|
|
'--promptme',
|
|
dest='promptme',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Single-stepping through logfile. [Default %(default)s]'),
|
|
)
|
|
parser.add_argument(
|
|
'-l',
|
|
'--linedump',
|
|
dest='linedump',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Dump a line or packet number just before each sentence. '
|
|
'[Default %(default)s]'),
|
|
)
|
|
parser.add_argument(
|
|
'-m',
|
|
'--monitor',
|
|
default=None,
|
|
dest='mon',
|
|
metavar='PROG',
|
|
help='Specifies a monitor program under which the daemon is run.',
|
|
)
|
|
parser.add_argument(
|
|
'-n',
|
|
'--nowait',
|
|
dest='nowait',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Start the daemon reading from gpsfake without '
|
|
'waiting for a client.'),
|
|
)
|
|
parser.add_argument(
|
|
'-o',
|
|
'--options',
|
|
dest='options',
|
|
metavar='="OPT"',
|
|
help=('Pass options ="OPT" to the daemon.\n'
|
|
'The equal sign and Quotes required.'),
|
|
)
|
|
parser.add_argument(
|
|
'-p',
|
|
'--pipe',
|
|
dest='pipe',
|
|
default=False,
|
|
action="store_true",
|
|
help='Sets watcher mode and dump to stdout. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-P',
|
|
'--port',
|
|
default=None,
|
|
dest='port',
|
|
metavar='PORT',
|
|
type=int,
|
|
help="Sets the daemon's listening port to PORT [Default %(default)s]",
|
|
)
|
|
parser.add_argument(
|
|
'-q',
|
|
'--quiet',
|
|
dest='quiet',
|
|
default=False,
|
|
action="store_true",
|
|
help='Act in a quiet manner. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-r',
|
|
'--clientinit',
|
|
default='?WATCH={"json":true,"nmea":true}',
|
|
dest='client_init',
|
|
metavar='STR',
|
|
help=('Specifies an initialization command to use in pipe mode. '
|
|
'[Default %(default)s]'),
|
|
)
|
|
parser.add_argument(
|
|
'-s',
|
|
'--speed',
|
|
default=4800,
|
|
dest='speed',
|
|
metavar='SPEED',
|
|
type=int,
|
|
help='Sets the baud rate for the slave tty. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-S',
|
|
'--slow',
|
|
dest='slow',
|
|
default=False,
|
|
action="store_true",
|
|
help=('Insert realistic delays in the test input. '
|
|
'[Default %(default)s]'),
|
|
)
|
|
parser.add_argument(
|
|
'-t',
|
|
'--tcp',
|
|
dest='tcp',
|
|
default=False,
|
|
action="store_true",
|
|
help='Force TCP. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-T',
|
|
'--sysinfo',
|
|
dest='sysinfo',
|
|
default=False,
|
|
action="store_true",
|
|
help='Print some system information and exit.',
|
|
)
|
|
parser.add_argument(
|
|
'-u',
|
|
'--udp',
|
|
dest='udp',
|
|
default=False,
|
|
action="store_true",
|
|
help='Force UDP. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-v',
|
|
'--verbose',
|
|
dest='verbose',
|
|
default=0,
|
|
action='count',
|
|
help='Verbose. Repeat for more verbosity. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-W',
|
|
'--timeout',
|
|
default=60,
|
|
dest='timeout',
|
|
metavar='SEC',
|
|
type=int,
|
|
help='Specify timeout. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'-V', '--version',
|
|
action='version',
|
|
version="%(prog)s: Version " + gps_version + "\n",
|
|
help='Output version to stderr, then exit'
|
|
)
|
|
parser.add_argument(
|
|
'-x',
|
|
'--predump',
|
|
dest='predump',
|
|
default=False,
|
|
action="store_true",
|
|
help='Dump packets as gpsfake gathers them. [Default %(default)s]',
|
|
)
|
|
parser.add_argument(
|
|
'arguments',
|
|
metavar='logfile',
|
|
nargs='*',
|
|
help='Logfile(s) to read and feed to gpsd. At least one required.'
|
|
)
|
|
options = parser.parse_args()
|
|
|
|
if options.sysinfo:
|
|
sys.stdout.write("sys %s platform %s\nWRITE_PAD = %.5f\n"
|
|
% (sys.platform, platform.platform(),
|
|
gpsfake.GetDelay(options.slow)))
|
|
|
|
raise SystemExit(0)
|
|
|
|
if not options.arguments:
|
|
sys.stderr.write("gpsfake: requires at least one logfile argument.\n")
|
|
raise SystemExit(0)
|
|
|
|
if options.promptme:
|
|
options.linedump = True
|
|
|
|
# debug options to pass to gpsd
|
|
doptions = []
|
|
if options.debug:
|
|
for dopt in options.debug:
|
|
doptions.append("-D " + dopt)
|
|
if options.options:
|
|
for opt in options.options:
|
|
doptions.append(opt)
|
|
if options.nowait:
|
|
doptions.append("-n")
|
|
doptions = ' '.join(doptions)
|
|
|
|
monitor = ()
|
|
timeout = 0
|
|
if options.gdb:
|
|
if check_xterm():
|
|
monitor = "xterm -e gdb -tui --args "
|
|
else:
|
|
monitor = "gdb --args "
|
|
timeout = 0
|
|
elif options.lldb:
|
|
if check_xterm():
|
|
monitor = "xterm -e lldb -- "
|
|
else:
|
|
monitor = "lldb -- "
|
|
timeout = 0
|
|
elif options.mon:
|
|
monitor = options.mon + " "
|
|
|
|
if options.timeout:
|
|
timeout = options.timeout
|
|
|
|
if not options.tcp and not options.udp:
|
|
try:
|
|
pty.openpty()
|
|
except (AttributeError, OSError):
|
|
sys.stderr.write('gpsfake: ptys not available, falling back'
|
|
' to UDP.\n')
|
|
options.udp = True
|
|
|
|
if options.progress:
|
|
baton = Baton("Processing %s" % ",".join(options.arguments), "done")
|
|
elif not options.quiet:
|
|
sys.stderr.write("Processing %s\n" % ",".join(options.arguments))
|
|
|
|
# Don't allocate a private port when cycling logs for client testing.
|
|
if options.port is None and not options.pipe:
|
|
options.port = int(gps.GPSD_PORT)
|
|
|
|
test = gpsfake.TestSession(options=doptions,
|
|
prefix=monitor,
|
|
predump=options.predump,
|
|
port=options.port,
|
|
slow=options.slow,
|
|
tcp=options.tcp,
|
|
timeout=timeout,
|
|
udp=options.udp,
|
|
verbose=options.verbose)
|
|
|
|
if options.pipe:
|
|
test.reporter = bytesout.write
|
|
if options.verbose:
|
|
options.progress = False
|
|
test.progress = sys.stderr.write
|
|
test.spawn()
|
|
try:
|
|
for logfile in options.arguments:
|
|
try:
|
|
test.gps_add(logfile, speed=options.speed, pred=fakehook,
|
|
oneshot=options.singleshot)
|
|
except gpsfake.TestLoadError as e:
|
|
sys.stderr.write("gpsfake: " + e.msg + "\n")
|
|
raise SystemExit(1)
|
|
except gpsfake.PacketError as e:
|
|
sys.stderr.write("gpsfake: " + e.msg + "\n")
|
|
raise SystemExit(1)
|
|
except gpsfake.DaemonError as e:
|
|
sys.stderr.write("gpsfake: " + e.msg + "\n")
|
|
raise SystemExit(1)
|
|
except IOError as e:
|
|
if e.filename is None:
|
|
sys.stderr.write("gpsfake: unknown internal I/O error %s\n"
|
|
% e)
|
|
else:
|
|
sys.stderr.write("gpsfake: no such file as %s or "
|
|
"file unreadable\n" % e.filename)
|
|
raise SystemExit(1)
|
|
except OSError:
|
|
sys.stderr.write("gpsfake: can't open pty.\n")
|
|
raise SystemExit(1)
|
|
|
|
try:
|
|
if options.pipe:
|
|
test.client_add(options.client_init + "\n")
|
|
# Give daemon time to get ready for the feeds.
|
|
# Without a delay here there's a window for test
|
|
# sentences to arrive before the watch takes effect.
|
|
# This needs to increase if leading sentences in
|
|
# test loads aren't being processed.
|
|
# Until the ISYNC driver was introduced, 1 sec was
|
|
# sufficient here. The extra 0.4s allows for the
|
|
# additional two 200ms delays introduced by the
|
|
# calls to gpsd_set_speed() in isync_detect()
|
|
time.sleep(1.4)
|
|
test.run()
|
|
except socket.error as msg:
|
|
sys.stderr.write("gpsfake: socket error %s.\n" % msg)
|
|
raise SystemExit(1)
|
|
except gps.client.json_error as e:
|
|
sys.stderr.write("gpsfake: JSON error on line %s is %s.\n"
|
|
% (repr(e.data), e.explanation))
|
|
raise SystemExit(1)
|
|
except KeyboardInterrupt:
|
|
sys.stderr.write("gpsfake: aborted\n")
|
|
raise SystemExit(1)
|
|
finally:
|
|
test.cleanup()
|
|
|
|
if options.progress:
|
|
baton.end()
|
|
|
|
# The following sets edit modes for GNU EMACS
|
|
# Local Variables:
|
|
# mode:python
|
|
# End:
|
|
# vim: set expandtab shiftwidth=4
|
|
|