Python script that sends live GPS values over OSC

After meeting André Wakko at the fantastically bonkers workshop Insecure Territories with Bengt Sjolén, Martin Howse and Brendan Howell,

I noticed that he was sonorizing wi-fi traffic and I cheekily introduced him to my GPS project for making some kind of sound project out of the information that is mostly thrown away from a GPS device, namely information about the satellites that are sending their timing signals across the vast silent distances of space.

The first script I only modified from this source. It is required by my script and handles talking to an attached GPS device and parsing the output.

#!/usr/bin/env python
#
# Adapted by Daniel Belasco Rogers (danbelasco@yahoo.co.uk)
# from the following script:
# Copyright (c) IBM Corporation, 2006. All Rights Reserved.
# Author: Simon Johnston (skjohn@us.ibm.com)

import serial
import datetime
import sys

# Usual location (address) of gps plugged into usb (linux)
GPSADDR = "/dev/ttyUSB0"
# Baud rate of device - 115200 for dataloggers, 4800 for Garmin etrex
BAUD = 115200


class GPSDevice(object):
    """ General GPS Device interface for connecting to serial port GPS devices
        using the default communication params specified by the National Marine
        Electronics Association (NMEA) specifications.
    """

    def __init__(self, commport):
        """ GPSDevice(port)
            Connects to the serial port specified, as the port numbers are
            zero-based on windows the actual device string would be "COM" +
            port+1.
        """
        self.commport = commport
        self.port = None

    def open(self):
        """ open() open the GPS device port, the NMEA default serial
            I/O parameters are defined as 115200,8,N,1. (4800 for
            garmin)
        """
        nmea_params = {
            'port': self.commport,
            'baudrate': BAUD,
            'bytesize': serial.EIGHTBITS,
            'parity': serial.PARITY_NONE,
            'stopbits': serial.STOPBITS_ONE
        }
        if self.port:
            print 'Device port is already open'
            sys.exit(2)
        try:
            self.port = serial.Serial(**nmea_params)
            self.port.open()
        except serial.SerialException:
            print """
Problem connecting to GPS
Is device connected and in NMEA transfer mode?
"""
            sys.exit(2)

    def read(self):
        """ read() -> dict read a single NMEA sentence from the device
        returning the data as a dictionary. The 'sentence' key will
        identify the sentence type itself with other parameters
        extracted and nicely formatted where possible.
        """
        sentence = 'error'
        line = self._read_raw()
        if line:
            record = self._validate(line)
            if record:
                if record[0] in _decode_func:
                    return _decode_func[record[0]](record)
                else:
                    sentence = record[0]
        return {
            'sentence': sentence
        }

    def read_all(self):
        """ read_all() -> dict A generator allowing the user to read
        data from the device in a for loop rather than having to craft
        their own looping method.
        """
        while 1:
            try:
                record = self.read()
            except IOError:
                raise StopIteration
            yield record

    def close(self):
        """ close() Close the port, note you can no longer read from
            the device until you re-open it.
        """
        if not self.port:
            print 'Device port not open, cannot close'
            sys.exit()
        self.port.close()
        self.port = None

    def _read_raw(self):
        """ _read_raw() -> str Internal method which reads a line from
            the device (line ends in \r\n).
        """
        if not self.port:
            print 'Device port not open, cannot read'
            sys.exit()
        return self.port.readline()

    def _checksum(self, data):
        """ _checksum(data) -> str Internal method which calculates
        the XOR checksum over the sentence (as a string, not including
        the leading '$' or the final 3 characters, the ',' and
        checksum itself).
        """
        checksum = 0
        for character in data:
            checksum = checksum ^ ord(character)
        hex_checksum = "%02x" % checksum
        return hex_checksum.upper()

    def _validate(self, sentence):
        """ _validate(sentence) -> str
            Internal method.
        """
        sentence.strip()
        if sentence.endswith('\r\n'):
            sentence = sentence[:len(sentence)-2]
        if not sentence.startswith('$GP'):
            # Note that sentences that start with '$P' are proprietary
            # formats and are described as $P where MID is the
            # manufacturer identified (Magellan is MGN etc.) and then the
            # SID is the manufacturers sentence identifier.
            return None
        star = sentence.rfind('*')
        if star >= 0:
            check = sentence[star + 1:]
            sentence = sentence[1:star]
            sum = self._checksum(sentence)
            if sum <> check:
                return None
        sentence = sentence[2:]
        return sentence.split(',')


# The internal decoder functions start here.

def format_date(datestr):
    """ format_date(datestr) -> str
        Internal function. Turn GPS DDMMYY into DD/MM/YY
    """
    if datestr == '':
        return ''
    year = int(datestr[4:])
    now = datetime.date.today()
    if year + 2000 > now.year:
        year = year + 1900
    else:
        year = year + 2000
    the_date = datetime.date(year, int(datestr[2:4]), int(datestr[:2]))
    return the_date.isoformat()


def format_time(timestr):
    """ format_time(timestr) -> str Internal function. Turn GPS HHMMSS
        into HH:MM:SS UTC
    """
    if timestr == '':
        return ''
    utc_str = ' +00:00'
    the_time = datetime.time(int(timestr[:2]),
                             int(timestr[2:4]),
                             int(timestr[4:6]))
    return the_time.strftime('%H:%M:%S') #+ utc_str


def format_latlong(data, direction):
    """ formatp_latlong(data, direction) -> str

        Internal function. Turn GPS HHMM.nnnn into standard HH.ddddd

    """
    # Check to see if it's HMM.nnnn or HHMM.nnnn or HHHMM.nnnn
    if data == '':
        return 0 # this to stop blowing up on empty string (Garmin etrex)
    dot = data.find('.')
    if (dot > 5) or (dot < 3):
        raise ValueError, 'Incorrect formatting of "%s"' % data
    hours = data[0:dot-2]
    mins = float(data[dot-2:])
    if hours[0] == '0':
        hours = hours[1:]
    if direction in ['S', 'W']:
        hours = '-' + hours
    decimal = mins / 60.0 * 100.0
    decimal = decimal * 10000.0
    return '%s.%06d' % (hours, decimal)


def _convert(v, f, d):
    """ a multi-purpose function that converts
        into a number of data types
        v = value
        f = data type e.g int, float string
        d = default value if it doesn't work
    """
    try:
        return f(v)
    except:
        return d


def _decode_gsv(data):
    """ decode_gsv(date) -> dict
        Internal function.
        data[0] = sentence
        data[1] = number of sentences
        data[2] = sentence number
        data[3] = satellites in view
    """
    if data[3] == '00':
        print """
GPS not receiving enough satellites or outputting strange values
Try turning GPS off and back on again before re-starting script
"""
        sys.exit(2)
    sats = []
    if data[2] < data[1]: # if this isn't the last sentence
        blockno = 4       # the number of blocks in a full sentence
    elif data[2] == data[1]: # this IS the last sentence
        # get the remaining number of blocks:
        blockno = _convert(data[3], int, 0) % 4
        if blockno == 0:
            blockno = 4
        #print 'number of satellites: %s' % data[3]
        #print 'number of sentences: %s' % data[1]
        #print 'number of satellites in last sentence: %s' % blockno
    for i in range(blockno * 4): # iterate through the sentence
        sats.append(_convert(data[i + 4], int, 0))
    return {
        'sentence': data[0],
        'Sentence_no.': _convert(data[2], int, 0),
        'NumberOfSentences': _convert(data[1], int, 0),
        'inview': _convert(data[3], int, 0),
        'satellite_data_list': sats
    }


def _decode_rmc(data):
    """ Simply parses the rmc sentence into a dictionary that makes it
    easier to query for values
    """
    return {
        'sentence': data[0],
        'time': format_time(data[1]),
        'active': data[2],
        'latitude': _convert(('%s' % format_latlong(data[3], data[4])),
                             float, 0),
        'longitude': _convert(('%s' % format_latlong(data[5], data[6])),
                              float, 0),
        'knots': _convert(data[7], float, 0),
        'bearing': _convert(data[8], float, 0),
        'date': format_date(data[9]),
        'mag_var': '%s,%s' % (data[10], data[11])
    }

# dictionary that maps the sentences onto functions
_decode_func = {
    'GSV': _decode_gsv,
    'RMC': _decode_rmc
}


def main():
    """
    test the script by printing the outputs
    """
    # setup and connect to gps
    gps = GPSDevice(GPSADDR)
    gps.open()
    for record in gps.read_all():
        print record


# test function
if __name__ == '__main__':
    sys.exit(main())

If you want to attach a GPS and find you're having problems, this is the script to dig into. For instance if you want to attach a Garmin GPS device via a serial-to-usb adapter, you'll need to modify the global variable 'BAUD' from 115200 to 4800 in gpsdevice.py

My script is below: PirateSatelliteServer.py handles further parcelling of the satellite value information and sending it, if required, over osc (OpenSoundControl) to a receiving programme. I have been using puredata to receive the values but André plans to use supercollider. Processing would also be a good candidate.

#!/usr/bin/env python
#
# Daniel Belasco Rogers 2011 (danbelasco@yahoo.co.uk)
#
# The intention of this script is to send the output of a connected
# GPS (I usually use a Qstarz Q1000x) to a listening programme
# (puredata, supercollider, processing) in neatly packaged bundles
#
# Options:
# -f --file Write the osc bundles to a pickle file for replaying
# later with my script 'SatelliteLogReader.py'
# -p --prn order by prn (unique satellite number) instead of
# azimuth (default)
# -d --dumb Do not send gps values over osc (for
# debugging if you haven't set up a receiving script yet')
#
# Example output (carriage returns are mine):
# /RMC ['12:58:27', 52.496204376220703, 13.445575714111328, 0.0, 0.0],
# /SAT1 [6, 6, 27, 0], /SAT2 [3, 8, 40,0],
# /SAT3 [7, 58, 65, 0], /SAT4 [19, 2, 67, 0],
# /SAT5 [13, 18, 98, 0], /SAT6 [28, 25, 159, 0],
# /SAT7 [8, 82, 168, 0], /SAT8 [10, 43,174, 0],
# /SAT9 [2, 7, 227, 0], /SAT10 [5, 62, 263, 0],
# /SAT11 [26,34, 284, 0], /SAT12 [21, 5, 339, 0]
#
# Last Update: Thu 21 Apr 2011 03:08:16 PM CEST
#
# A re-write of splitbundleGSV-RMC-newOSC.py
# for Andre Wakko and my project 'Pirate Satellite Synthesizer'
#
# Requires pyOSC and gpsdevice (should be bundled with this script)

try:
    import OSC
except ImportError:
    print """Please install pyOSC from the following location:
https://trac.v2.nl/wiki/pyOSC"""

import cPickle
import sys
import os.path
from time import sleep
from optparse import OptionParser
try:
    from gpsdevice import *
except ImportError:
    print """
This script requires the script 'gpsdevice.py' to be in the same
folder or in the PYTHONPATH
"""

# Port and address for sending osc messages
# Address is localhost (this machine by default)
# Remember to set port number in receiving programme
OSCADDR = "127.0.0.1"
OSCPORT = 57120
# Usual location (address) of gps plugged into usb (linux)
GPSADDR = "/dev/ttyUSB0"


def reorderList(List, prn):
    '''
    takes an unsorted list of satellite values and returns a
    sorted list by azimuth or prn

    List = list of satellite values
    prn = Boolean, from options if true, order by prn not azimuth
    Dict = dictionary created for sorting purposes only
    '''
    Dict = {}
    for i in range(0, len(List), 4):
        if prn:
            Dict[List[i]] = (List[i],
                             List[i + 1],
                             List[i + 2],
                             List[i + 3])
        if not prn:
            Dict[List[i + 2]] = (List[i],
                             List[i + 1],
                             List[i + 2],
                             List[i + 3])
    keys = Dict.keys()
    keys.sort()
    sortedList = [Dict[key] for key in keys]
    List = []
    for item in sortedList:
        for element in item:
            List.append(element)
    return List


def askyesno(question):
    """
    handles yes/no questions
    question = what to ask the user at the prompt
    answer = a boolean True for yes and False for no
    """
    while 1:
        answer = raw_input(question).lower()
        if answer in ('y', 'yes', 'n', 'no'):
            if answer in ('y', 'yes'):
                answer = True
            if answer in ('n', 'no'):
                answer = False
            return answer
        print '\nplease answer y or n\n'


def main():
    """
    the main loop - consider splitting these functions up for greater
    clarity
    """
    ############################################################
    # script options
    ############################################################
    usage = "usage: %prog '-f=filename' '-d'"
    parser = OptionParser()
    parser.add_option("-f", "--file", dest="filename",
                      help="write report to FILE", metavar="FILE")
    parser.add_option("-d", "--dumb", dest="dumb",
                      help="don't send messages to OSC", action="store_true",
                      default=False)
    parser.add_option("-p", "--prn", dest="prn",
                      help="sort by prn (default by azi)", action="store_true",
                      default=False)
    (options, args) = parser.parse_args()
    if options.filename:
        if (os.path.isfile(options.filename)):
            answer = askyesno('file already exists - overwrite? y/n ')
            if not answer:
                return 0
        log = open(options.filename, 'wb')
    ############################################################
    # set up osc and gps connections
    ############################################################
    # Osc message address and port
    send_address = (OSCADDR, OSCPORT)
    # OSC basic client
    osc = OSC.OSCClient()
    # set the address for all following messages
    osc.connect(send_address)
    # setup and connect to gps
    gps = GPSDevice(GPSADDR)
    gps.open()
    ############################################################
    # start reading the values and sending them
    ############################################################
    # initialise local vars
    GSVcollection = []
    FirstRMC = True
    # initiate the bundle
    bundle = OSC.OSCBundle()
    # main read / send loop
    try:
        for record in gps.read_all():
            if record['sentence'] == 'RMC':
                # on receiving an RMC sentence, gather everything
                # together and send it.
                if FirstRMC == False:
                    # Skip reorderList if all the azi values are zero
                    if (GSVcollection[2] != 0 and
                        GSVcollection[6] != 0 and
                        GSVcollection[10] != 0 and
                        GSVcollection[14] != 0):
                        # sort complete GSV sentence by azimuth
                        GSVcollection = reorderList(GSVcollection, options.prn)
                    #bundle.setAddress("/SAT")
                    count = 0
                    for i in range(0, len(GSVcollection), 4):
                        count += 1
                        print count, GSVcollection[i:i + 4]
                        bundle.setAddress("/SAT%d" % count)
                        bundle.append(GSVcollection[i:i + 4])
                    GSVcollection = []
                    # Send over OSC if the dumb option is off
                    if options.dumb == False:
                        osc.send(bundle)
                    print bundle
                    if options.filename:
                        cPickle.dump(bundle, log)
                    #  clear the bundle otherwise it just fills up
                    bundle.clearData()
                # the below only happens once - the first time an RMC
                # sentence is received
                elif FirstRMC == True:
                    FirstRMC = False
                RMClist = [
                record['time'],
                record['latitude'],
                record['longitude'],
                record['knots'],
                record['bearing'],
                ]
                # append to bundle with RMC prefix
                bundle.append({'addr': "/RMC", 'args': RMClist})
            if record['sentence'] == 'GSV':
                if FirstRMC == False:
                    for item in record['satellite_data_list']:
                        GSVcollection.append(item)
    except KeyboardInterrupt:
        osc.close()
        gps.close()
        if options.filename:
            log.close()
        print """
user interupt, shutting down
"""
        sys.exit


if __name__ == '__main__':
    sys.exit(main())

So, what do you do with it? Well, you make sure that you have saved 'gpsdevice.py' and 'PirateSatelliteServer.py' in the same folder. You take your gps data logger, plug it into your Linux computer via USB and turn it on. Then you start your receiving script. If you just want to test that PirateSatelliteServer is running, start it with the '-d' option which won't send anything over OSC. Start PirateSatelliteServer and begin to experiment with sonorizing the values. If you get anywhere, please tell me, I'd love to hear about it.

These the options for the script. They are also documented in the comments of the file itself. -f --file Write the osc bundles to a pickle file for replaying later with my script 'SatelliteLogReader.py' -p --prn order by prn (unique satellite number) instead of azimuth (default) -d --dumb Do not send gps values over osc (for debugging if you haven't set up a receiving script yet')

Example output (carriage returns are mine):
/RMC ['12:58:27', 52.496204376220703, 13.445575714111328, 0.0, 0.0],
/SAT1 [6, 6, 27, 0], /SAT2 [3, 8, 40,0], /SAT3 [7, 58, 65, 0],
/SAT4 [19, 2, 67, 0], /SAT5 [13, 18, 98, 0], /SAT6 [28, 25, 159, 0],
/SAT7 [8, 82, 168, 0], /SAT8 [10, 43,174, 0], /SAT9 [2, 7, 227, 0],
/SAT10 [5, 62, 263, 0], /SAT11 [26,34, 284, 0], /SAT12 [21, 5, 339, 0]

The values are as follows:
RMC = [time of fix, longitude (decimal degrees), latitude (decimal degrees), speed, bearing]
SATn = [pseudo random number, elevation, azimuth, signal to noise]

This is the setup I have been using:

  • eeePC 501 running 'easy peasy' (2.6.32-30-generic #59-Ubuntu SMP Tue Mar 1 21:30:21 UTC 2011 i686 GNU/Linux ) OR
  • Lenovo ThinkPad t61p running Ubuntu 10.04 (Intrepid)
  • GPS device: Qstarz Q1000X data logger attached to computer with USB

If you don't have a GPS, check out SatelliteLogReader.py in the next post which reads the pickled output of PirateSatelliteServer.py from log files, complete with test data which you can download from here.

This entry was posted in Code, GPS, Linux, Python, Ubuntu. Bookmark the permalink.