One of the most valuable tools for advocacy groups to ensure transit agencies are providing high quality service is a historical database of train and bus movement data. At my day job, I work with data sets like these every day to find ways to improve train and bus service. However, these analyses typically stay internal to New York City Transit. Most large agencies these days (WMATA, MBTA, CTA, SEPTA, etc.) have research groups whose full time job is to analyze operational data, but most of them face the same restrictions publishing data that we do. With historic records of real-time data though, academics and nonprofits can perform their own creative analyses to better hold agencies accountable and offer useful suggestions.

Getting Started

Most agencies, WMATA included, require you to create a developer account to get an API key. This allows the agency to collect more information on API usage and helps them prevent DDOS attacks. Once you have completed the sign up process and verified your e-mail, you have to subscribe to the "Default Tier" under the Products tab on the WMATA developer site. You can see all of the different APIs available here. You can find your API key by clicking on your name in the top right (assuming you are logged in) and clicking profile. You should be able to see your primary key by clicking 'Show'. Testing out your API key is simple with just few lines of Python code.

import requests

sesh = requests.Session()
headers = {'api_key': 'a2799613073c4ca0aa0453b957389994'}
validate_url = 'https://api.wmata.com/Misc/Validate'

req = sesh.get(validate_url, headers=headers)
print(req.json())
42

When I tried this, I received the answer to the Ultimate Question of Life, the Universe, and Everything.

Collecting Train Movements

If you go to the train position API page, there is a description of each of the fields in the JSON response. Clicking "Try It" will show you the structure of the JSON response. It is also easy to try it out in Python using the code below.

import pprint

train_position_url = 'https://api.wmata.com/TrainPositions/TrainPositions?contentType=json'

def get_trains(sesh, train_position_url, headers):
    req = sesh.get(train_position_url, headers=headers)
    return req.json()
    
pprint.pprint(get_trains(sesh, train_position_url, headers)['TrainPositions'][0])
    {'CarCount': 8,
     'CircuitId': 2634,
     'DestinationStationCode': None,
     'DirectionNum': 1,
     'LineCode': None,
     'SecondsAtLocation': 93,
     'ServiceType': 'NoPassengers',
     'TrainId': '081'}

These train movements update every 7-10 seconds. There are a few ways we can set up a Python function to query this every 10 seconds. The method below is the most straightforward I have seen, but will likely result in some slight drift (e.g. the code may  occasionally run every 10.0001 seconds).

import time

nexttime = time.time()  # initializing

# switch to a while loop to run indefinitely
for i in range(2):
    pprint.pprint(get_trains(sesh, train_position_url, headers)['TrainPositions'][0])
    nexttime += 10
    sleeptime = nexttime - time.time()
    if sleeptime > 0:
        time.sleep(sleeptime)
    {'CarCount': 8,
     'CircuitId': 2634,
     'DestinationStationCode': None,
     'DirectionNum': 1,
     'LineCode': None,
     'SecondsAtLocation': 93,
     'ServiceType': 'NoPassengers',
     'TrainId': '081'}
    {'CarCount': 8,
     'CircuitId': 2637,
     'DestinationStationCode': None,
     'DirectionNum': 1,
     'LineCode': None,
     'SecondsAtLocation': 1,
     'ServiceType': 'NoPassengers',
     'TrainId': '081'}

Storage and Processing

We now need a way to store all of this data for analysis later. Since the data has a defined structure, and I will hopefully be mapping this geographically later, it makes sense to use PostgreSQL with the PostGIS extension to store the data. For more information on the installation process, see this previous blog post. While it is not important for this post, I typically use DBeaver as my SQL client/DBA tool for PostgreSQL since pgAdmin IV is so buggy, and pgAdmin III is no longer officially supported. This project will start by focusing on the fairly simple (but performance sensitive) code to store the real-time data, while common analyses we can do will follow in later articles.

import psycopg2

try:
    conn = psycopg2.connect("dbname='postgres' user='postgres' host='localhost' password='6TOcn0YoBsEkGRjyty'")
except:
    print("I am unable to connect to the database")

cur = conn.cursor()
train_position_create = ("""CREATE TABLE IF NOT EXISTS public.train_positions (
                              train_id varchar NOT NULL,
                              car_count numeric NULL,
                              direction_num numeric NULL,
                              circuit_id numeric NULL,
                              destination_station_code varchar NULL,
                              line_code varchar NULL,
                              seconds_at_location numeric NULL,
                              service_type varchar NULL,
                              request_time timestamp NOT NULL,
                              request_date date NOT NULL
                            )
                            WITH (
                              OIDS=FALSE
                            ) ;
                            CREATE INDEX IF NOT EXISTS train_positions_request_date_idx ON public.train_positions 
                              USING btree (request_date, service_type, line_code) ;
                         """
)
cur.execute(train_position_create)

With the table created, the code below inserts the train position data into the train position table, adding a value for the request time.

from datetime import datetime
request_t = datetime.now()
train_positions = get_trains(sesh, train_position_url, headers)['TrainPositions']
#train_positions = [dict(train_positions, request_time=request_t) for item in train_positions]
for item in train_positions:
    item.update({'request_time':request_t})

train_position_insert = """INSERT INTO train_positions(train_id, car_count, direction_num, circuit_id,
                                                       destination_station_code, line_code, seconds_at_location,
                                                       service_type, request_time, request_date)
                           VALUES (%(TrainId)s, %(CarCount)s, %(DirectionNum)s, %(CircuitId)s,
                                   %(DestinationStationCode)s, %(LineCode)s, %(SecondsAtLocation)s,
                                   %(ServiceType)s, %(request_time)s, (%(request_time)s)::date)
                        """

cur.executemany(train_position_insert, train_positions)
conn.commit()

Combining the above code cells, you can easily run the insert continuously (although it has no error handling). Be careful before trying to store a month's worth of train position data on your laptop, since it will quickly use up a lot of storage space, particularly if you are also storing bus positions. To put this in perspective, a single day of NYCT bus GTFS data is about 140 MB zipped. The data will need some more processing to get it into an easier to analyze format, but for now let's move on to getting bus position data.

Collecting Bus Positions

Building off of the train position processing, you can find more bus information on the Bus Position API page. The Bus Position API differs from the train position API in that it provides Lat/Longs instead of track circuit positions.

bus_position_url = 'https://api.wmata.com/Bus.svc/json/jBusPositions'

def get_bus_positions(sesh, bus_position_url, headers):
    req = sesh.get(bus_position_url, headers=headers)
    return req.json()

pprint.pprint(get_bus_positions(sesh, bus_position_url, headers)['BusPositions'][0])
    {'DateTime': '2018-10-28T17:46:33',
     'Deviation': 63.0,
     'DirectionNum': 0,
     'DirectionText': 'EAST',
     'Lat': 39.061886,
     'Lon': -77.11956,
     'RouteID': 'C4',
     'TripEndTime': '2018-10-28T17:06:00',
     'TripHeadsign': 'PRINCE GEORGES PLAZA STATION',
     'TripID': '852570010',
     'TripStartTime': '2018-10-28T15:52:00',
     'VehicleID': '7230'}

You can use the same looping code to collect bus positions that I used to collect train positions.

Storage and Processing

Bus position data has a fairly different structure than the train position data. Beyond the track circuit vs lat/long difference, the data from buses includes the date/time from the last GPS update. While you still may want to worry about timed transfers, there are not the same problems with interlining and merging/diverging that trains have. The code to create the Bus Position table is below, using the same database cursor from above.

bus_position_create = ("""CREATE TABLE IF NOT EXISTS public.bus_positions (
                            service_date date NOT NULL,
                            ping_time timestamp NOT NULL,
                            deviation numeric NULL,
                            direction_num varchar NULL,
                            direction_text varchar NULL,
                            latitude numeric NULL,
                            longitude numeric NULL,
                            route_id varchar NULL,
                            trip_start_time timestamp NULL,
                            trip_end_time timestamp NULL,
                            trip_headsign varchar NULL,
                            trip_id varchar NULL,
                            vehicle_id varchar NOT NULL
                          )
                          WITH (
                            OIDS=FALSE
                          ) ;
                            CREATE INDEX IF NOT EXISTS bus_positions_service_date_idx ON public.bus_positions 
                              USING btree (service_date, direction_text, route_id) ;
                       """
)
cur.execute(bus_position_create)

With the new bus positions table in the database, there is just a little bit of processing to do before the data is ready to insert. In the API, DateTime is the date and time of the last position update, or GPS ping. In order to make it easier to index and query the tables, it is also useful to have a dedicated service date column as well.

bus_positions = get_bus_positions(sesh, bus_position_url, headers)['BusPositions']

bus_position_insert = """INSERT INTO bus_positions(service_date, ping_time, deviation, direction_num,
                                                   direction_text, latitude, longitude, route_id, trip_start_time,
                                                   trip_end_time, trip_headsign, trip_id, vehicle_id)
                           VALUES ((%(DateTime)s)::date, %(DateTime)s, %(Deviation)s, %(DirectionNum)s,
                                   %(DirectionText)s, %(Lat)s, %(Lon)s, %(RouteID)s, %(TripStartTime)s,
                                   %(TripEndTime)s, %(TripHeadsign)s, %(TripID)s, %(VehicleID)s)
                        """

cur.executemany(bus_position_insert, bus_positions)
conn.commit()

The code above will help you aggregate raw bus ping data, but the raw ping data is difficult to use by itself. From my own experience, the raw data table can become large and unwieldy fairly quickly even with appropriate indices. We need to process the data into a more usable format, although most of the details will have to wait for a future article. We will match bus pings to bus stops, which means we need to know which stops the bus is supposed to stop at. This requires tables for the bus stops and for the bus schedules as well.

While the WMATA API allows us to see the routes that typically stop at a bus stop, these routes may change over time. Instead of maintaining multiple versions of bus stops, we will just rely on the schedule table for that information.

bus_stop_create = """CREATE TABLE IF NOT EXISTS public.bus_stops (
                       stop_id varchar NULL,
                       stop_name varchar NOT NULL,
                       lon numeric NULL,
                       lat numeric NULL,
                       PRIMARY KEY (stop_id)
                     )
                     WITH (
                       OIDS=FALSE
                     );
                  """

cur.execute(bus_stop_create)
conn.commit()

bus_stop_url = 'https://api.wmata.com/Bus.svc/json/jStops'

def get_bus_stops(sesh, bus_stop_url, headers):
    req = sesh.get(bus_stop_url, headers=headers)
    return req.json()

bus_stops = get_bus_stops(sesh, bus_stop_url, headers)['Stops']

bus_stop_insert = """INSERT INTO bus_stops(stop_id, stop_name, lon, lat)
                     VALUES (%(StopID)s, %(Name)s, %(Lon)s, %(Lat)s)
                     ON CONFLICT DO NOTHING
                  """
cur.executemany(bus_stop_insert, bus_stops)
conn.commit()

Next we need to create the bus schedule table for a particular day. The bus schedule API requires a route ID and a date, which means we first need a list of routes.

bus_routes_create = """CREATE TABLE IF NOT EXISTS public.bus_routes (
                         route_id varchar NOT NULL,
                         route_name varchar NULL,
                         line_description varchar NULL,
                         PRIMARY KEY (route_id)
                       )
                       WITH (
                         OIDS=FALSE
                       );"""

cur.execute(bus_routes_create)
conn.commit()

bus_route_url = 'https://api.wmata.com/Bus.svc/json/jRoutes'

def get_bus_routes(sesh, bus_route_url, headers):
    req = sesh.get(bus_route_url, headers=headers)
    return req.json()

bus_routes = get_bus_routes(sesh, bus_route_url, headers)['Routes']

bus_route_insert = """INSERT INTO bus_routes(route_id, route_name, line_description)
                      VALUES (%(RouteID)s, %(Name)s, %(LineDescription)s)
                      ON CONFLICT DO NOTHING
                   """
cur.executemany(bus_route_insert, bus_routes)
conn.commit()

After grabbing a list of routes, which should not change too much over time, we need to pull the schedule for each route. For each route, the API returns a list of each trip in direction 0 or direction 1. Each list item has some basic details about the start and end date of a trip, but they also have info about the expected departure time at each stop. To avoid duplicating all of the higher level trip information, the data is broken out into a trip table and a trip-stop table that are connected by a TripID. Any joins between these tables should be very fast, but we can still save on storage space.

bus_schedule_create = """CREATE TABLE IF NOT EXISTS public.bus_schedule_trip (
                           service_date date NOT NULL,
                           trip_id varchar NOT NULL,
                           route_id varchar NOT NULL,
                           direction_num numeric NOT NULL,
                           trip_direction_text varchar NULL,
                           trip_headsign varchar NULL,
                           start_time timestamp NULL,
                           end_time timestamp NULL,
                           PRIMARY KEY (service_date, trip_id)
                         )
                         WITH (
                           OIDS=FALSE
                         );"""

bus_schedule_stop_create = """CREATE TABLE IF NOT EXISTS public.bus_schedule_trip_stop (
                                service_date date NOT NULL,
                                trip_id varchar NOT NULL,
                                stop_id varchar NOT NULL,
                                stop_name varchar NULL,
                                stop_seq numeric NULL,
                                sched_leave_time timestamp NULL,
                                PRIMARY KEY (service_date, trip_id, stop_id)
                              )
                              WITH (
                                OIDS=FALSE
                              );"""

cur.execute(bus_schedule_create)
cur.execute(bus_schedule_stop_create)
conn.commit()

bus_schedule_url = 'https://api.wmata.com/Bus.svc/json/jRouteSchedule'

def get_bus_schedule(sesh, bus_schedule_url, headers, route_id):
    params = {"RouteID":route_id}
    req = sesh.get(bus_schedule_url, headers=headers, params=params)
    return req.json()

bus_schedule_insert = """INSERT INTO bus_schedule_trip (service_date, trip_id, route_id, direction_num,
                                                        trip_direction_text, trip_headsign, start_time, end_time)
                         VALUES ((%(StartTime)s)::date, %(TripID)s, %(RouteID)s, %(DirectionNum)s,
                                 %(TripDirectionText)s, %(TripHeadsign)s, %(StartTime)s, %(EndTime)s)
                         ON CONFLICT DO NOTHING
                        """

bus_schedule_stop_insert = """INSERT INTO bus_schedule_trip_stop (service_date, trip_id, stop_id, stop_name,
                                                                  stop_seq, sched_leave_time)
                              VALUES ((%(Time)s)::date, %(TripID)s, %(StopID)s, %(StopName)s, %(StopSeq)s, %(Time)s)
                              ON CONFLICT DO NOTHING
                           """

for route in bus_routes:
    route_id = route['RouteID']
    route_schedule = get_bus_schedule(sesh, bus_schedule_url, headers, route_id)
    if 'Direction0' in route_schedule:
        route_direction_0 = route_schedule['Direction0']
        cur.executemany(bus_schedule_insert, route_direction_0)
        for bus_trip in route_direction_0:
            trip_id = bus_trip['TripID']
            bus_trip_stops = bus_trip['StopTimes']
            bus_trip_stops = [dict(item, TripID=trip_id) for item in bus_trip_stops]
            cur.executemany(bus_schedule_stop_insert, bus_trip_stops)
    if 'Direction1' in route_schedule:
        route_direction_1 = route_schedule['Direction0']
        cur.executemany(bus_schedule_insert, route_direction_0)
        for bus_trip in route_direction_1:
            trip_id = bus_trip['TripID']
            bus_trip_stops = bus_trip['StopTimes']
            bus_trip_stops = [dict(item, TripID=trip_id) for item in bus_trip_stops]
            cur.executemany(bus_schedule_stop_insert, bus_trip_stops)

conn.commit()

The above code will be fairly slow since it needs to make a new API call for each route, but thankfully it only needs to run once per day. I would recommend running it in a separate Python script to avoid having to wait for it to complete and causing issues with the 10 second cycle necessary to capture the real time data.

Next Steps and Final Remarks

While the code above should serve as a good example for storing all of the real-time and schedule data that WMATA provides, it is not battle tested in production yet. The detailed primary keys and indices might cause inserts to be too slow for production use after storing a lot of data, and it will total up to a lot of data quickly.

In the coming weeks, I plan to develop a more detailed python process that is more robust to store this data, so that I can keep the code running 24/7 and eventually show how to analyze it as well. While I wrote this article as an example of how to start this type of project from scratch, MetroHero has already put together several APIs using extra supplemental information that the creator has gotten through FOIA requests. For most folks' needs, it makes more sense to start there.