Transit Watch: Real-Time Feeds

Tim Spann
12 min readOct 28, 2022

--

Source Code: https://github.com/tspannhw/pulsar-transit-function

In today’s article, we will cover the transportation data from trains. We will read mass transit status, alerts and regional highway data from RSS feeds.

As part of a FLiPN Stack application, we use Apache NiFi to read RSS/XML feeds from a few different REST endpoints. We access the TRANSCOM highway data from the TRANSCOM coalition.

It’s a very simple process of building an event-driven application for streams of transit data.

Below is a walk through of the entire process.

Step 1: Collect RSS/XML Feeds for NJ Transit (Bus, Rail, and Light Rail)

Step 2: Collect RSS/XML Feed for TRANSCOM Data (roads, tunnels, …)

Apache NiFi has data lineage/provenance that you can inspect live while data is in the stream. You can validate that the incoming data is in a nice clean format and in JSON.

Step 3: NiFi publishes data to Apache Pulsar topics

If you wish, most production systems will want a scripted, repeatable process to create tenants, namespaces, and topics. By default, NiFi can create these topics for you, but for production, you’ll want to manage this and also decide on partitions for your topics.

Create a Pulsar Topic

bin/pulsar-admin topics create persistent://public/default/transcom

Step 4: Check the topic

We can use Pulsar Manager or StreamNative cloud to monitor and check the statistics for our topics.

bin/pulsar-client consume “persistent://public/default/transit” -s ts-reader -n 0 — subscription-type “Shared” — subscription-position “Earliest” — subscription-mode “Durable” — schema-type “auto_consume”

— — — got message — — -
key:[e5b94097-ac5a-4378–95a9-a8479e0a0cd2], properties:[language=Java, processor=transit], content:{companyname=transcom, advisoryAlert=NULL, link=https://www.511nj.org/home, description=TRANSCOM, Jersey City: hockey game on UBS Arena at (Hempstead) New York Islanders vs. Seattle Kraken, Tuesday February 7th, 2023, 07:30 PM thru 10:30 PM, guid=40.711673,-73.726249, servicename=transcom, title=UBS Arena :hockey game, pubDate=2022–08–16T10:42:45, uuid=e5b94097-ac5a-4378–95a9-a8479e0a0cd2, ts=1666291837242}
— — — got message — — -
key:[b778a1de-519a-4f60-bd34-c1b212784971], properties:[language=Java, processor=transit], content:{companyname=transcom, advisoryAlert=NULL, link=https://www.511nj.org/home, description=NYSDOT — Region 3: Construction , bridge work on NY 5 eastbound between I-481 SB Ramp (De Witt) and I-481 NB Ramp (De Witt) 1 Right lane of 2 lanes closed until 3:00 PM, guid=43.034178,-76.062942, servicename=transcom, title=NY 5 eastbound:Construction, pubDate=2022–10–20T09:00:06, uuid=b778a1de-519a-4f60-bd34-c1b212784971, ts=1666291837244}

It is easy to consume and point-check our data using the standard command line interface for Pulsar.

Step 5: Enhance, Enrich, Normalize, Transform and Routing

We could be done, we have data available for any number of consumers from Java to Python to Go to Kotlin to Scala to NodeJS to C# to Spark to Flink to NiFi to …. For my use cases, I want to run some real-time analytics, build some dashboards and show some transit items on a map. The esteemed author and data genius David showed me the magic of WebSockets plus LeafletJS at KubeCon, so I had to use it.

The easiest way to do the enrichments I need is to use Pulsar Functions. These are serverless functions written in Java, Python, or Go. SQL will be joining the mix soon. I chose Java as I am on a bit of a Java kick and had some existing code that did some similar enrichments to build from.

We add schemas to the Rail, Light Rail, and Bus records for their new clean topics. We also add a clean topic for Transcom records. Finally, I create a new transit topic that contains all of the different sources and will have more in the future.

For Transit topic records, we added the results of Apache OpenNLP 2.0’s Named Entity Resolution NLP for Locations using the English 1.5.0 model. This is put into the guid field and provides generic location data including things like city names and street names. It’s not enough to map, but it’s a good start.

So our live maps are working for Transcom data since it has latitude and longitude. This is the simplest code that is just HTML, JavaScript, and the powerful LeafletJS.

Step 6: Pulsar Functions Status

bin/pulsar-admin functions status — name TransitParser

{
“numInstances” : 1,
“numRunning” : 1,
“instances” : [ {
“instanceId” : 0,
“status” : {
“running” : true,
“error” : “”,
“numRestarts” : 0,
“numReceived” : 1421,
“numSuccessfullyProcessed” : 1421,
“numUserExceptions” : 0,
“latestUserExceptions” : [ ],
“numSystemExceptions” : 0,
“latestSystemExceptions” : [ ],
“averageLatency” : 13.630545629838124,
“lastInvocationTime” : 1666388452170,
“workerId” : “c-standalone-fw-127.0.0.1–8080”
}
} ]
}

Step 7: Check the output of our function

bin/pulsar-client consume "persistent://public/default/transit" -s ts-reader -n 0  --subscription-type "Shared" --subscription-position "Earliest" --subscription-mode "Durable" --schema-type "auto_consume"

----- got message -----
key:[9153a42c-010d-41ef-88ac-bf1b1ec3c258], properties:[language=Java, processor=transit], content:{companyname=transcom, advisoryAlert=NULL, link=https://www.511nj.org/home, description=NYSDOT - Region 2: construction on NY 13 both directions between I-90; on Ramp (Lenox) and Village of Canastota; Town of Lenox (Lenox) More specifically between NYS Thruway Exit 34 and Warners Road. Motorists will encounter lane closures in both directions with a temporary signal in place, Continuous Monday April 12th, 2021 7:00 AM thru Wednesday November 30th, 2022 5:30 PM, guid=Canastota, servicename=transcom, title=NY 13 both directions:construction, pubDate=2022-09-30T07:01:48, uuid=9153a42c-010d-41ef-88ac-bf1b1ec3c258, ts=1666388173654}
----- got message -----
key:[efcd44ac-dd09-476a-b9f1-883bf83c1c34], properties:[language=Java, processor=transit], content:{companyname=transcom, advisoryAlert=NULL, link=https://www.511nj.org/home, description=NYSDOT - Region 11: construction on I-278 eastbound from Atlantic Avenue (New York) to Exit 28B - Brooklyn Bridge (New York) Lane Reduction, Continuous Monday August 30th, 2021 12:00 AM thru Saturday December 31st, 2022 11:59 PM 1 Left lane of 4 lanes closed, guid=Atlantic Avenue, New York, New York, servicename=transcom, title=I-278 eastbound:construction, pubDate=2021-10-04T09:54:58, uuid=efcd44ac-dd09-476a-b9f1-883bf83c1c34, ts=1666388173649}

Step 8: Query the Data With Pulsar SQL / Trino / Presto

This is an easy way to validate our data and check that our schemas are well formed.

presto:public/default> select * from transit;

presto:public/default> select __publish_time__, __key__, __producer_name__, servicename, description, title, ts, uuid, pubdate, link, guid from transit;
__publish_time__ | __key__ | __producer_name__ | servicename |
-------------------------+--------------------------------------+-------------------+-------------+-------
2022-10-21 12:01:17.294 | ec632b86-d81b-4f97-96a3-86237f3e94f1 | standalone-2-444 | transcom | NJ PAC
2022-10-21 12:01:17.304 | eacbf7da-9a24-4a45-960a-bd1d065c03d9 | standalone-2-444 | transcom | Barcla
2022-10-21 12:01:17.314 | 73b5f73d-39c9-4c0b-ab49-e296465a09bd | standalone-2-444 | transcom | NJ DOT
2022-10-21 12:01:17.324 | 28e788e3-4722-4d11-9d04-7ce178c99cf3 | standalone-2-444 | transcom | CT DOT
2022-10-21 12:01:17.333 | f06c0b9a-29b0-4ab3-bda2-759539426089 | standalone-2-444 | transcom | Barcla
2022-10-21 12:01:17.345 | 7420ef47-b1ce-4495-bde9-d61b9e150148 | standalone-2-444 | transcom | Barcla
2022-10-21 12:01:17.354 | d3782d30-ce93-424f-9cf8-e9c858a594d1 | standalone-2-444 | transcom | Barcla
2022-10-21 12:01:17.364 | 4875b8dd-49b1-44b8-ae6c-4b392fd3f3bc | standalone-2-444 | transcom | Barcla
2022-10-21 12:01:17.374 | 2390bdd6-f858-4f7d-b86e-9bb99d0ab808 | standalone-2-444 | transcom | NYSDOT
2022-10-21 12:01:17.384 | cf7a733d-bb47-41db-9734-9578d1758011 | standalone-2-444 | transcom | NJ PAC
2022-10-21 12:01:17.395 | 42fc9792-35d6-4023-8de8-c276419d939e | standalone-2-444 | transcom | NYSDOT
2022-10-21 12:01:17.404 | c544bbb9-17bf-4595-bc95-72a2ed682286 | standalone-2-444 | transcom | NJ Tur

Step 9: Real-Time Analytics with Apache Flink SQL

SQL queries are easy against any number of topics that are virtual tables in a continuous format.

CREATE CATALOG pulsar WITH (
'type' = 'pulsar',
'service-url' = 'pulsar://pulsar1:6650',
'admin-url' = 'http://pulsar1:8080',
'format' = 'json'
);

USE CATALOG pulsar;

set table.dynamic-table-options.enabled = true;

SHOW TABLES;

describe `transcom-clean`;
+-------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+-------------+--------+------+-----+--------+-----------+
| description | STRING | true | | | |
| latitude | STRING | true | | | |
| longitude | STRING | true | | | |
| point | STRING | true | | | |
| pubDate | STRING | true | | | |
| title | STRING | true | | | |
| ts | STRING | true | | | |
| uuid | STRING | true | | | |
+-------------+--------+------+-----+--------+-----------+
8 rows in set

describe `newjerseybus-clean`;
+---------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+---------------+--------+------+-----+--------+-----------+
| advisoryAlert | STRING | true | | | |
| companyname | STRING | true | | | |
| description | STRING | true | | | |
| guid | STRING | true | | | |
| link | STRING | true | | | |
| pubDate | STRING | true | | | |
| servicename | STRING | true | | | |
| title | STRING | true | | | |
| ts | STRING | true | | | |
| uuid | STRING | true | | | |
+---------------+--------+------+-----+--------+-----------+
10 rows in set

desc `newjerseylightrail-clean`;
+---------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+---------------+--------+------+-----+--------+-----------+
| advisoryAlert | STRING | true | | | |
| companyname | STRING | true | | | |
| description | STRING | true | | | |
| guid | STRING | true | | | |
| link | STRING | true | | | |
| pubDate | STRING | true | | | |
| servicename | STRING | true | | | |
| title | STRING | true | | | |
| ts | STRING | true | | | |
| uuid | STRING | true | | | |
+---------------+--------+------+-----+--------+-----------+
10 rows in set

desc `newjerseyrail-clean`;
+---------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+---------------+--------+------+-----+--------+-----------+
| advisoryAlert | STRING | true | | | |
| companyname | STRING | true | | | |
| description | STRING | true | | | |
| guid | STRING | true | | | |
| link | STRING | true | | | |
| pubDate | STRING | true | | | |
| servicename | STRING | true | | | |
| title | STRING | true | | | |
| ts | STRING | true | | | |
| uuid | STRING | true | | | |
+---------------+--------+------+-----+--------+-----------+
10 rows in set

select description, pubDate, title, ts from `newjerseyrail-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */ ;

select description, pubDate, title, latitude, longitude, ts from `transcom-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */ ;

select *
FROM `newjerseyrail-clean` LEFT JOIN `transcom-clean`
ON aircraft.lat = aircraftweather.latitude
and aircraft.lon = aircraftweather.longitude;

select description, pubDate, title, ts, servicename
FROM `newjerseyrail-clean`
UNION
select description, pubDate, title, ts, servicename
FROM `newjerseylightrail-clean`
UNION
select description, pubDate, title, ts, servicename
FROM `newjerseybus-clean`;


select servicename, description, title, pubDate, ts
FROM `newjerseyrail-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */
UNION
select servicename, description, title,pubDate, ts
FROM `newjerseylightrail-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */
UNION
select servicename, description, title,pubDate, ts
FROM `newjerseybus-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */
UNION
SELECT 'transcom' as servicename, description, title, pubDate, ts
FROM `transcom-clean` /*+ OPTIONS('scan.startup.mode' = 'earliest') */;


describe `transit`;

+---------------+--------+------+-----+--------+-----------+
| name | type | null | key | extras | watermark |
+---------------+--------+------+-----+--------+-----------+
| advisoryAlert | STRING | true | | | |
| companyname | STRING | true | | | |
| description | STRING | true | | | |
| guid | STRING | true | | | |
| link | STRING | true | | | |
| pubDate | STRING | true | | | |
| servicename | STRING | true | | | |
| title | STRING | true | | | |
| ts | STRING | true | | | |
| uuid | STRING | true | | | |
+---------------+--------+------+-----+--------+-----------+
10 rows in set

select servicename, description, title, pubDate, ts, uuid, guid, link
from transit /*+ OPTIONS('scan.startup.mode' = 'earliest') */;

Step 10: WebSocket Direct Consuming Pulsar Data in Web Pages

For more on this journey of real-time data, see:

On for adding your own local environmental data via MQTT and a simple device.

TRANSCOM is a coalition of 16 transportation and public safety agencies in the New York - New Jersey - Connecticut metropolitan region.  It was created in 1986 to provide a cooperative, coordinated approach to regional transportation management.This is a free service that allows various user groups (i.e. the general public, commercial vendors, transportation agencies, researchers, media and others) to access TRANSCOM real-time event and link (travel time) data for use in their applications. The data feed contains 'real-time' event information provided by member agencies of TRANSCOM listed below:

TRANSCOM Member Agencies
Connecticut Department of Transportation, Metropolitan Transportation Authority, MTA – Bridges and Tunnels, MTA – New York City Transit, New Jersey Department of Transportation, New Jersey Transit, New York City Department of TransportationNew York City Police Department, New York State Bridge Authority, New York State Department of Transportation, New York State Police, New York State Thruway Authority, Port Authority Trans-Hudson Corp (PATH), The Port Authority of New York and New Jersey.Register for AccessFor full access to the documentation and data please register.After your registration has been processed you will have access to the full sites content!For organizations, we request that you only request a single account for all users, and share this information internally. If there are valid reasons for multiple accounts, please explain the circumstances in the comments section of the registration form. Please ensure you have included the domain data.xcmdata.org on any SPAM filters exceptions so you will receive the confirmations and any announcements. Duplicate requests will be ignored - if you have questions use the link below!To register for access.

https://data.xcmdata.org/DEWeb/Pages/index

--

--

Tim Spann

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/