After meeting André Wakko at the fantastically bonkers workshop Insecure Territories with Bengt Sjolén, Martin Howse and Brendan Howell,
I noticed that he was sonorizing wi-fi traffic and I cheekily introduced him to my GPS project for making some kind of sound project out of the information that is mostly thrown away from a GPS device, namely information about the satellites that are sending their timing signals across the vast silent distances of space.
The first script I only modified from this source. It is required by my script and handles talking to an attached GPS device and parsing the output.
#!/usr/bin/env python
#
# Adapted by Daniel Belasco Rogers (danbelasco@yahoo.co.uk)
# from the following script:
# Copyright (c) IBM Corporation, 2006. All Rights Reserved.
# Author: Simon Johnston (skjohn@us.ibm.com)
import serial
import datetime
import sys
# Usual location (address) of gps plugged into usb (linux)
GPSADDR = "/dev/ttyUSB0"
# Baud rate of device - 115200 for dataloggers, 4800 for Garmin etrex
BAUD = 115200
class GPSDevice(object):
""" General GPS Device interface for connecting to serial port GPS devices
using the default communication params specified by the National Marine
Electronics Association (NMEA) specifications.
"""
def __init__(self, commport):
""" GPSDevice(port)
Connects to the serial port specified, as the port numbers are
zero-based on windows the actual device string would be "COM" +
port+1.
"""
self.commport = commport
self.port = None
def open(self):
""" open() open the GPS device port, the NMEA default serial
I/O parameters are defined as 115200,8,N,1. (4800 for
garmin)
"""
nmea_params = {
'port': self.commport,
'baudrate': BAUD,
'bytesize': serial.EIGHTBITS,
'parity': serial.PARITY_NONE,
'stopbits': serial.STOPBITS_ONE
}
if self.port:
print 'Device port is already open'
sys.exit(2)
try:
self.port = serial.Serial(**nmea_params)
self.port.open()
except serial.SerialException:
print """
Problem connecting to GPS
Is device connected and in NMEA transfer mode?
"""
sys.exit(2)
def read(self):
""" read() -> dict read a single NMEA sentence from the device
returning the data as a dictionary. The 'sentence' key will
identify the sentence type itself with other parameters
extracted and nicely formatted where possible.
"""
sentence = 'error'
line = self._read_raw()
if line:
record = self._validate(line)
if record:
if record[0] in _decode_func:
return _decode_func[record[0]](record)
else:
sentence = record[0]
return {
'sentence': sentence
}
def read_all(self):
""" read_all() -> dict A generator allowing the user to read
data from the device in a for loop rather than having to craft
their own looping method.
"""
while 1:
try:
record = self.read()
except IOError:
raise StopIteration
yield record
def close(self):
""" close() Close the port, note you can no longer read from
the device until you re-open it.
"""
if not self.port:
print 'Device port not open, cannot close'
sys.exit()
self.port.close()
self.port = None
def _read_raw(self):
""" _read_raw() -> str Internal method which reads a line from
the device (line ends in \r\n).
"""
if not self.port:
print 'Device port not open, cannot read'
sys.exit()
return self.port.readline()
def _checksum(self, data):
""" _checksum(data) -> str Internal method which calculates
the XOR checksum over the sentence (as a string, not including
the leading '$' or the final 3 characters, the ',' and
checksum itself).
"""
checksum = 0
for character in data:
checksum = checksum ^ ord(character)
hex_checksum = "%02x" % checksum
return hex_checksum.upper()
def _validate(self, sentence):
""" _validate(sentence) -> str
Internal method.
"""
sentence.strip()
if sentence.endswith('\r\n'):
sentence = sentence[:len(sentence)-2]
if not sentence.startswith('$GP'):
# Note that sentences that start with '$P' are proprietary
# formats and are described as $P where MID is the
# manufacturer identified (Magellan is MGN etc.) and then the
# SID is the manufacturers sentence identifier.
return None
star = sentence.rfind('*')
if star >= 0:
check = sentence[star + 1:]
sentence = sentence[1:star]
sum = self._checksum(sentence)
if sum <> check:
return None
sentence = sentence[2:]
return sentence.split(',')
# The internal decoder functions start here.
def format_date(datestr):
""" format_date(datestr) -> str
Internal function. Turn GPS DDMMYY into DD/MM/YY
"""
if datestr == '':
return ''
year = int(datestr[4:])
now = datetime.date.today()
if year + 2000 > now.year:
year = year + 1900
else:
year = year + 2000
the_date = datetime.date(year, int(datestr[2:4]), int(datestr[:2]))
return the_date.isoformat()
def format_time(timestr):
""" format_time(timestr) -> str Internal function. Turn GPS HHMMSS
into HH:MM:SS UTC
"""
if timestr == '':
return ''
utc_str = ' +00:00'
the_time = datetime.time(int(timestr[:2]),
int(timestr[2:4]),
int(timestr[4:6]))
return the_time.strftime('%H:%M:%S') #+ utc_str
def format_latlong(data, direction):
""" formatp_latlong(data, direction) -> str
Internal function. Turn GPS HHMM.nnnn into standard HH.ddddd
"""
# Check to see if it's HMM.nnnn or HHMM.nnnn or HHHMM.nnnn
if data == '':
return 0 # this to stop blowing up on empty string (Garmin etrex)
dot = data.find('.')
if (dot > 5) or (dot < 3):
raise ValueError, 'Incorrect formatting of "%s"' % data
hours = data[0:dot-2]
mins = float(data[dot-2:])
if hours[0] == '0':
hours = hours[1:]
if direction in ['S', 'W']:
hours = '-' + hours
decimal = mins / 60.0 * 100.0
decimal = decimal * 10000.0
return '%s.%06d' % (hours, decimal)
def _convert(v, f, d):
""" a multi-purpose function that converts
into a number of data types
v = value
f = data type e.g int, float string
d = default value if it doesn't work
"""
try:
return f(v)
except:
return d
def _decode_gsv(data):
""" decode_gsv(date) -> dict
Internal function.
data[0] = sentence
data[1] = number of sentences
data[2] = sentence number
data[3] = satellites in view
"""
if data[3] == '00':
print """
GPS not receiving enough satellites or outputting strange values
Try turning GPS off and back on again before re-starting script
"""
sys.exit(2)
sats = []
if data[2] < data[1]: # if this isn't the last sentence
blockno = 4 # the number of blocks in a full sentence
elif data[2] == data[1]: # this IS the last sentence
# get the remaining number of blocks:
blockno = _convert(data[3], int, 0) % 4
if blockno == 0:
blockno = 4
#print 'number of satellites: %s' % data[3]
#print 'number of sentences: %s' % data[1]
#print 'number of satellites in last sentence: %s' % blockno
for i in range(blockno * 4): # iterate through the sentence
sats.append(_convert(data[i + 4], int, 0))
return {
'sentence': data[0],
'Sentence_no.': _convert(data[2], int, 0),
'NumberOfSentences': _convert(data[1], int, 0),
'inview': _convert(data[3], int, 0),
'satellite_data_list': sats
}
def _decode_rmc(data):
""" Simply parses the rmc sentence into a dictionary that makes it
easier to query for values
"""
return {
'sentence': data[0],
'time': format_time(data[1]),
'active': data[2],
'latitude': _convert(('%s' % format_latlong(data[3], data[4])),
float, 0),
'longitude': _convert(('%s' % format_latlong(data[5], data[6])),
float, 0),
'knots': _convert(data[7], float, 0),
'bearing': _convert(data[8], float, 0),
'date': format_date(data[9]),
'mag_var': '%s,%s' % (data[10], data[11])
}
# dictionary that maps the sentences onto functions
_decode_func = {
'GSV': _decode_gsv,
'RMC': _decode_rmc
}
def main():
"""
test the script by printing the outputs
"""
# setup and connect to gps
gps = GPSDevice(GPSADDR)
gps.open()
for record in gps.read_all():
print record
# test function
if __name__ == '__main__':
sys.exit(main())
If you want to attach a GPS and find you're having problems, this is the script to dig into. For instance if you want to attach a Garmin GPS device via a serial-to-usb adapter, you'll need to modify the global variable 'BAUD' from 115200 to 4800 in gpsdevice.py
My script is below: PirateSatelliteServer.py handles further parcelling of the satellite value information and sending it, if required, over osc (OpenSoundControl) to a receiving programme. I have been using puredata to receive the values but André plans to use supercollider. Processing would also be a good candidate.
#!/usr/bin/env python
#
# Daniel Belasco Rogers 2011 (danbelasco@yahoo.co.uk)
#
# The intention of this script is to send the output of a connected
# GPS (I usually use a Qstarz Q1000x) to a listening programme
# (puredata, supercollider, processing) in neatly packaged bundles
#
# Options:
# -f --file Write the osc bundles to a pickle file for replaying
# later with my script 'SatelliteLogReader.py'
# -p --prn order by prn (unique satellite number) instead of
# azimuth (default)
# -d --dumb Do not send gps values over osc (for
# debugging if you haven't set up a receiving script yet')
#
# Example output (carriage returns are mine):
# /RMC ['12:58:27', 52.496204376220703, 13.445575714111328, 0.0, 0.0],
# /SAT1 [6, 6, 27, 0], /SAT2 [3, 8, 40,0],
# /SAT3 [7, 58, 65, 0], /SAT4 [19, 2, 67, 0],
# /SAT5 [13, 18, 98, 0], /SAT6 [28, 25, 159, 0],
# /SAT7 [8, 82, 168, 0], /SAT8 [10, 43,174, 0],
# /SAT9 [2, 7, 227, 0], /SAT10 [5, 62, 263, 0],
# /SAT11 [26,34, 284, 0], /SAT12 [21, 5, 339, 0]
#
# Last Update: Thu 21 Apr 2011 03:08:16 PM CEST
#
# A re-write of splitbundleGSV-RMC-newOSC.py
# for Andre Wakko and my project 'Pirate Satellite Synthesizer'
#
# Requires pyOSC and gpsdevice (should be bundled with this script)
try:
import OSC
except ImportError:
print """Please install pyOSC from the following location:
https://trac.v2.nl/wiki/pyOSC"""
import cPickle
import sys
import os.path
from time import sleep
from optparse import OptionParser
try:
from gpsdevice import *
except ImportError:
print """
This script requires the script 'gpsdevice.py' to be in the same
folder or in the PYTHONPATH
"""
# Port and address for sending osc messages
# Address is localhost (this machine by default)
# Remember to set port number in receiving programme
OSCADDR = "127.0.0.1"
OSCPORT = 57120
# Usual location (address) of gps plugged into usb (linux)
GPSADDR = "/dev/ttyUSB0"
def reorderList(List, prn):
'''
takes an unsorted list of satellite values and returns a
sorted list by azimuth or prn
List = list of satellite values
prn = Boolean, from options if true, order by prn not azimuth
Dict = dictionary created for sorting purposes only
'''
Dict = {}
for i in range(0, len(List), 4):
if prn:
Dict[List[i]] = (List[i],
List[i + 1],
List[i + 2],
List[i + 3])
if not prn:
Dict[List[i + 2]] = (List[i],
List[i + 1],
List[i + 2],
List[i + 3])
keys = Dict.keys()
keys.sort()
sortedList = [Dict[key] for key in keys]
List = []
for item in sortedList:
for element in item:
List.append(element)
return List
def askyesno(question):
"""
handles yes/no questions
question = what to ask the user at the prompt
answer = a boolean True for yes and False for no
"""
while 1:
answer = raw_input(question).lower()
if answer in ('y', 'yes', 'n', 'no'):
if answer in ('y', 'yes'):
answer = True
if answer in ('n', 'no'):
answer = False
return answer
print '\nplease answer y or n\n'
def main():
"""
the main loop - consider splitting these functions up for greater
clarity
"""
############################################################
# script options
############################################################
usage = "usage: %prog '-f=filename' '-d'"
parser = OptionParser()
parser.add_option("-f", "--file", dest="filename",
help="write report to FILE", metavar="FILE")
parser.add_option("-d", "--dumb", dest="dumb",
help="don't send messages to OSC", action="store_true",
default=False)
parser.add_option("-p", "--prn", dest="prn",
help="sort by prn (default by azi)", action="store_true",
default=False)
(options, args) = parser.parse_args()
if options.filename:
if (os.path.isfile(options.filename)):
answer = askyesno('file already exists - overwrite? y/n ')
if not answer:
return 0
log = open(options.filename, 'wb')
############################################################
# set up osc and gps connections
############################################################
# Osc message address and port
send_address = (OSCADDR, OSCPORT)
# OSC basic client
osc = OSC.OSCClient()
# set the address for all following messages
osc.connect(send_address)
# setup and connect to gps
gps = GPSDevice(GPSADDR)
gps.open()
############################################################
# start reading the values and sending them
############################################################
# initialise local vars
GSVcollection = []
FirstRMC = True
# initiate the bundle
bundle = OSC.OSCBundle()
# main read / send loop
try:
for record in gps.read_all():
if record['sentence'] == 'RMC':
# on receiving an RMC sentence, gather everything
# together and send it.
if FirstRMC == False:
# Skip reorderList if all the azi values are zero
if (GSVcollection[2] != 0 and
GSVcollection[6] != 0 and
GSVcollection[10] != 0 and
GSVcollection[14] != 0):
# sort complete GSV sentence by azimuth
GSVcollection = reorderList(GSVcollection, options.prn)
#bundle.setAddress("/SAT")
count = 0
for i in range(0, len(GSVcollection), 4):
count += 1
print count, GSVcollection[i:i + 4]
bundle.setAddress("/SAT%d" % count)
bundle.append(GSVcollection[i:i + 4])
GSVcollection = []
# Send over OSC if the dumb option is off
if options.dumb == False:
osc.send(bundle)
print bundle
if options.filename:
cPickle.dump(bundle, log)
# clear the bundle otherwise it just fills up
bundle.clearData()
# the below only happens once - the first time an RMC
# sentence is received
elif FirstRMC == True:
FirstRMC = False
RMClist = [
record['time'],
record['latitude'],
record['longitude'],
record['knots'],
record['bearing'],
]
# append to bundle with RMC prefix
bundle.append({'addr': "/RMC", 'args': RMClist})
if record['sentence'] == 'GSV':
if FirstRMC == False:
for item in record['satellite_data_list']:
GSVcollection.append(item)
except KeyboardInterrupt:
osc.close()
gps.close()
if options.filename:
log.close()
print """
user interupt, shutting down
"""
sys.exit
if __name__ == '__main__':
sys.exit(main())
So, what do you do with it? Well, you make sure that you have saved 'gpsdevice.py' and 'PirateSatelliteServer.py' in the same folder. You take your gps data logger, plug it into your Linux computer via USB and turn it on. Then you start your receiving script. If you just want to test that PirateSatelliteServer is running, start it with the '-d' option which won't send anything over OSC. Start PirateSatelliteServer and begin to experiment with sonorizing the values. If you get anywhere, please tell me, I'd love to hear about it.
These the options for the script. They are also documented in the comments of the file itself. -f --file Write the osc bundles to a pickle file for replaying later with my script 'SatelliteLogReader.py' -p --prn order by prn (unique satellite number) instead of azimuth (default) -d --dumb Do not send gps values over osc (for debugging if you haven't set up a receiving script yet')
Example output (carriage returns are mine):
/RMC ['12:58:27', 52.496204376220703, 13.445575714111328, 0.0, 0.0],
/SAT1 [6, 6, 27, 0], /SAT2 [3, 8, 40,0], /SAT3 [7, 58, 65, 0],
/SAT4 [19, 2, 67, 0], /SAT5 [13, 18, 98, 0], /SAT6 [28, 25, 159, 0],
/SAT7 [8, 82, 168, 0], /SAT8 [10, 43,174, 0], /SAT9 [2, 7, 227, 0],
/SAT10 [5, 62, 263, 0], /SAT11 [26,34, 284, 0], /SAT12 [21, 5, 339, 0]
The values are as follows:
RMC = [time of fix, longitude (decimal degrees), latitude (decimal degrees), speed, bearing]
SATn = [pseudo random number, elevation, azimuth, signal to noise]
This is the setup I have been using:
- eeePC 501 running 'easy peasy' (2.6.32-30-generic #59-Ubuntu SMP Tue Mar 1 21:30:21 UTC 2011 i686 GNU/Linux ) OR
- Lenovo ThinkPad t61p running Ubuntu 10.04 (Intrepid)
- GPS device: Qstarz Q1000X data logger attached to computer with USB
If you don't have a GPS, check out SatelliteLogReader.py in the next post which reads the pickled output of PirateSatelliteServer.py from log files, complete with test data which you can download from here.