Never Get Lost in the Stream

Tim Spann
Cloudera
Published in
6 min readDec 13, 2023

--

NiFi-Kafka-Flink for getting to work. Can’t we just work remote?

Let’s look at some previous work.

Bing AI

First we had to fly —

Second I wanted to build a system so I could use mass transit anywhere in the world!??!

Third when I first started I thought to build local and just cover MTA. New York City’s MTA has a ton of different data streams since there are so many avenues of transit. I started using the feed they provide as JSON for SiRi. Let’s go deeper.

bing ai

If we look at our original source code for MTA access, a lot was left undone. Let’s build a new stream.

The first thing I noticed is I was not processing all of the subway lines in New York City, there are a lot of them and a lot of data.

We will standardize the formats so we can reuse our new generic GTFS-RF processing.

We have a list of URLs to call as JSON, we split them and send them in the generic GTFS-RT format to get processed by our GTFS-RT processor.

https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-ace
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-g
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-nqrw
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-bdfm
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-jz
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-l
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/nyct%2Fgtfs-si
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/lirr%2Fgtfs-lirr
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/mnr%2Fgtfs-mnr
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fall-alerts
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fsubway-alerts
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fbus-alerts
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Flirr-alerts
https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds/camsys%2Fmnr-alerts
http://nycferry.connexionz.net/rtt/public/utility/gtfsrealtime.aspx/tripupdate

So we just iterate a long list of different subways in NYC (ACE, G, BDFM, LIRR and more) and make them ready to run in our universal GTFS-RT system. There is one caveat that will have to be handled, this GTFS-RT requires an extra header x-api-key that needs our registered key (https://api.mta.info/#/AccessKey) If you don’t have one create one.

Once we got the MTA data in the right style, we can use our universal GTFS-RT processor for trip updates, service alerts and vehicle positions.

We can process like everyone else.

{
"stopsequence":"0",
"arrivaltime":"1702078380",
"stopid":"903049",
"tripid":"MV_D3-Weekday-SDon-111300_M3_334",
"tripstartdate":"20231208",
"departuretime":"1702078380",
"triprouteid":"M2",
"locationcountrycode":"US",
"maxlat":"","maxlong":"",
"locationmunicipality":"New York City",
"minlong":"",
"locationsubdivisionname":"New York",
"minlat":"",
"ts":"1702072263668",
"uuid":"e9a41230-77e2-4143-b8e9-a8a837ee9a04",
"rundate":"",
"providername":"MTA",
"transitname":"obanyc"
}

We can see the new data stream into Flink.

bing ai

Example Vehicle Positions

{
"route_id" : "B82",
"bearing" : "21.801409",
"directionid" : "0",
"latitude" : "40.608383",
"tripid" : "EN_D3-Weekday-SDon-071200_B82_604",
"vehiclelabel" : "",
"vehicleid" : "MTA NYCT_7149",
"startdate" : "20231212",
"uuid" : "86ee5e4f-a17b-49b5-bf1a-49b6bd70c85f",
"speed" : "",
"longitude" : "-73.95911",
"timestamp" : "1702402015",
"locationcountrycode" : "US",
"maxlat" : "",
"maxlong" : "",
"locationmunicipality" : "New York City",
"minlong" : "",
"locationsubdivisionname" : "New York",
"minlat" : "",
"ts" : "1702402064160",
"rundate" : "",
"providername" : "MTA",
"transitname" : "obanyc"
}
CREATE TABLE `ssb`.`Meetups`.`transitvehiclepositions` (
`route_id` VARCHAR(2147483647),
`bearing` VARCHAR(2147483647),
`directionid` VARCHAR(2147483647),
`latitude` VARCHAR(2147483647),
`tripid` VARCHAR(2147483647),
`vehiclelabel` VARCHAR(2147483647),
`vehicleid` VARCHAR(2147483647),
`startdate` VARCHAR(2147483647),
`uuid` VARCHAR(2147483647),
`speed` VARCHAR(2147483647),
`longitude` VARCHAR(2147483647),
`timestamp` VARCHAR(2147483647),
`locationcountrycode` VARCHAR(2147483647),
`maxlat` VARCHAR(2147483647),
`maxlong` VARCHAR(2147483647),
`locationmunicipality` VARCHAR(2147483647),
`minlong` VARCHAR(2147483647),
`locationsubdivisionname` VARCHAR(2147483647),
`minlat` VARCHAR(2147483647),
`ts` VARCHAR(2147483647),
`rundate` VARCHAR(2147483647),
`providername` VARCHAR(2147483647),
`transitname` VARCHAR(2147483647),
`eventTimeStamp` TIMESTAMP(3) WITH LOCAL TIME ZONE METADATA FROM 'timestamp',
WATERMARK FOR `eventTimeStamp` AS `eventTimeStamp` - INTERVAL '3' SECOND
) WITH (
'deserialization.failure.policy' = 'ignore_and_log',
'properties.request.timeout.ms' = '120000',
'format' = 'json',
'properties.bootstrap.servers' = 'kafka:9092',
'connector' = 'kafka',
'properties.transaction.timeout.ms' = '900000',
'topic' = 'transitvehiclepositions',
'scan.startup.mode' = 'group-offsets',
'properties.auto.offset.reset' = 'earliest',
'properties.group.id' = 'vehposflank1'
)

Transit Service Alerts


{"routeid4":"","routeid5":"","descriptionlang":"en","cause":"",
"stopid":"","alertlang":"en",
"alerttext":"Northbound Bx27 stop on Soundview Ave at O'Brien Ave is closed",
"routeid":"BX27","effect":"","stopid5":"",
"descriptiontext":"Please use the stops on Soundview Ave at Gildersleeve Ave or Patterson Ave.\n\nWhat's happening?\nNYC DDC Sewer Reconstruction Project\n\nNote: Real-time tracking on BusTime may be inaccurate in the service change area.",
"stopid4":"","activeperiodend":"1717977600","stopid3":"","stopid2":"",
"routeid2":"","routeid3":"","activeperiodstart":"1680321600",
"locationcountrycode":"US",
"maxlat":"","maxlong":"",
"locationmunicipality":"New York City","minlong":"",
"locationsubdivisionname":"New York","minlat":"",
"ts":"1702431671175","uuid":"e5196039-5de5-4f40-9167-cd5f915b0e41",
"rundate":"",
"providername":"MTA",
"transitname":"Subway"}

Trip Updates

{
"stopsequence":"",
"arrivaltime":"1702433710",
"stopid":"D18N",
"tripid":"124000_M..N",
"tripstartdate":"20231212",
"departuretime":"1702433710",
"triprouteid":"M","locationcountrycode":"US",
"maxlat":"",
"maxlong":"",
"locationmunicipality":"New York City",
"minlong":"",
"locationsubdivisionname":"New York",
"minlat":"",
"ts":"1702431859805",
"uuid":"971e054a-9e27-47a7-95fa-b1ea66e264fd",
"rundate":"",
"providername":"MTA",
"transitname":"Subway BDFM"
}

RESOURCES

DATA ACCESS

https://api-endpoint.mta.info/Dataservice/mtagtfsfeeds

ROUTES, TRIPS & STOPS

http://nycferry.connexionz.net/rtt/public/utility/gtfs.aspx

REAL TIME ALERTS

http://nycferry.connexionz.net/rtt/public/utility/gtfsrealtime.aspx/alert

REAL TIME TRIP UPDATES

http://nycferry.connexionz.net/rtt/public/utility/gtfsrealtime.aspx/tripupdate

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Cloudera. Principal Engineer - Big Data, IoT, Deep Learning, Streaming, Machine Learning, Cloud. https://www.datainmotion.dev/