NYC Traffic!?!??! Are You Kidding Me?

Tim Spann
Cloudera
Published in
11 min readFeb 18, 2024

--

Apache NiFi, Python, Traffic, JSON, Web Camera, REST, XML, RSS, JSON

NYC Speed Data

The Department of Transportation (DOT) in New York City posts updates to it’s every growing data table of “Average Speed of a Vehicle Traveled Between End Points”. This is currently 13 columns of data with 79.7 Million Rows and more coming several times a day. So I want to download this entire dataset, but they only let you download the entire dataset as CSV (I’ll do that as well) or as a paged REST endpoint 1,000 rows at a time. So we will grab 1,000 at a time and loop through a lot. I will also grab (they allow sorting!) the latest 1,000 every hour.

Where does this data come from? Sensors!

What does this data set describe?
Title:
New York City Traffic Speed Detectors
Abstract: New York City DOT’s traffic speed detector feed is a free service that allows various user groups (i.e. the general public, private sector, commercial vendors, transportation agencies, researchers, media and others) to download traffic speed information on a regular basis to use in their applications and research efforts. This data feed contains ‘real‐time’ traffic information from locations where NYCDOT has installed sensors, mostly on major arterials and highways within the City limits. NYCDOT uses this information for emergency response and management.

From: https://data.cityofnewyork.us/api/views/i4gi-tjb9/files/cc7f3b15-58b7-46e3-94e7-4c5753c3a8b8?download=true&filename=metadata_trafficspeeds.pdf

The URL we need is like so:

https://data.cityofnewyork.us/resource/i4gi-tjb9.json?$limit=1000&$offset=${i}&$order=data_as_of+DESC

The flexibility of this API is pretty great as we can set sorting, limits, offsets, queries and more. The offset is a NiFi variable that we increment. The limit is the maximum we can access at once (1,000 rows). I am ordering by data_as_of field to get it in order of most recent first.

Below we can see an example of some of the RAW JSON data returned from the API.

[{"id":"434","speed":"47.84","travel_time":"66","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616212","link_points":"40.6020904,-74.1877 40.600331,-74.18943 40.59783,-74.19099 40.592071,-74.193131 40.5907404,-74.19333 40.5902,-74.19332","encoded_poly_line":"abyvFbxxcM~IxIrNvH~b@jLhGf@jBA","encoded_poly_line_lvls":"BBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616212","borough":"Staten Island","link_name":"WSE N VICTORY BLVD - SOUTH AVENUE"}
,{"id":"351","speed":"40.38","travel_time":"133","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616210","link_points":"40.63092,-74.14592 40.62975,-74.14593 40.62877,-74.14579 40.6279506,-74.145671 40.62713,-74.145681 40.62637,-74.14586 40.6254205,-74.14618 40.6247506,-74.146381 40.6241006,-74.146661 40.6234004,-74.14704 40.6226006,-74.147501 40.6217506,-74.148181 40.6208","encoded_poly_line":"gv~vF~rpcMhF@bE[bDWbD@vCb@\\\\|D~@dCf@`Cv@jCjA~CzAhDfCrDjDfJ~IhDhDxB~AlDlBdEdArDp@jC\\\\|Bt@zBCjDeB~DgAzA","encoded_poly_line_lvls":"BBBBBBBBBBBBBBBBBBBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616210","borough":"Staten Island","link_name":"MLK S - SIE W WALKER STREET - RICHMOND AVENUE"}
,{"id":"378","speed":"41.01","travel_time":"65","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616197","link_points":"40.6210105,-74.168861 40.6207604,-74.168 40.6182105,-74.162381 40.6154,-74.15806 40.6149404,-74.15748","encoded_poly_line":"ix\\\\|vFjbucMp@kD\\\\|Ncb@pP_ZzAsB","encoded_poly_line_lvls":"BBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616197","borough":"Staten Island","link_name":"SIE E SOUTH AVENUE - RICHMOND AVENUE"}
,{"id":"383","speed":"0.00","travel_time":"0","status":"-101","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616217","link_points":"40.6167105,-74.15242 40.61572,-74.15276 40.6148106,-74.152971 40.6140506,-74.15282 40.61301,-74.15227 40.61034,-74.150871 40.6096,-74.150101","encoded_poly_line":"m}{vFr{qcMdEbAtDh@vC]nEmBtOwGrCyC","encoded_poly_line_lvls":"BBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616217","borough":"Staten Island","link_name":"SIE W - MLK N WOOLEY AVENUE - WLAKER STREET"}
,{"id":"422","speed":"39.14","travel_time":"227","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616298","link_points":"40.7278806,-73.832761 40.7270705,-73.83231 40.7265105,-73.831911 40.72603,-73.8314 40.724771,-73.829501 40.72428,-73.82886 40.72379,-73.828341 40.72326,-73.827851 40.7224605,-73.82729 40.721951,-73.82696 40.7215205,-73.826681 40.7209605,-73.826371 40.7204","encoded_poly_line":"gtqwFvmsaM`DyAnBoA~AeBzF{J`B_C`BgBhBaB~CoBdBaAtAw@nB}@\\\\|Ac@pAOx@Ez@AfCNj@Bh@LhDRtAExAQhA[x@Y~AcAxE}DbBeAjAg@t@Ur@c@tJgLlAwAdAy@t@q@jEcDvC_B\\\\|EeCp@a@x@_@bMwFtBq@xGcC","encoded_poly_line_lvls":"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616298","borough":"Queens","link_name":"VWE N MP4.63 (Exit 6 - Jamaica Ave) - MP6.39 (Exit 11 Jewel Ave)"}
,{"id":"170","speed":"38.52","travel_time":"98","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616356","link_points":"40.6665206,-73.76246 40.66738,-73.77021 40.66751,-73.77209 40.66752,-73.772861 40.66749,-73.775591 40.66722,-73.78108 40.66673,-73.786471","encoded_poly_line":"wtewFjveaMkDlo@YvJAxCD`Pt@ha@`Bt`@","encoded_poly_line_lvls":"BBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616356","borough":"Queens","link_name":"Belt Pkwy W 182nd St - JFK Expressway"}
,{"id":"171","speed":"25.47","travel_time":"472","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616357","link_points":"40.66673,-73.78649 40.66642,-73.78958 40.66642,-73.78958 40.66642,-73.790421 40.6665006,-73.79161 40.666771,-73.793241 40.666771,-73.793241 40.6667404,-73.796111 40.6667404,-73.796111 40.6667205,-73.799361 40.6668105,-73.799681 40.6669706,-73.79989 40.666","encoded_poly_line":"avewFpljaM\\\\|@hR???fDOlFu@dI??D\\\\|P??BhSQ~@_@h@??uDdDcAf@}ANiEQkIG??{GXeC^gBj@EH??qQlGiSvHuC\\\\|@u]bNwd@jTwQfI{K`GkPxHqPlG","encoded_poly_line_lvls":"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616357","borough":"Queens","link_name":"Belt Pkwy W JFK Expressway - VWE N Jamaica Ave"}
,{"id":"439","speed":"47.84","travel_time":"59","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616200","link_points":"40.5900505,-74.19356 40.59141,-74.19352 40.59413,-74.19266 40.5984005,-74.19099 40.59978,-74.19014 40.60139,-74.188741 40.60213,-74.187981","encoded_poly_line":"yvvvFv\\\\|ycMoGG_PkDuYmIsGiDaIwGsCwC","encoded_poly_line_lvls":"BBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616200","borough":"Staten Island","link_name":"WSE S SOUTH AVENUE - VICTORY BOULEVARD"}
,{"id":"385","speed":"48.46","travel_time":"29","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616208","link_points":"40.6077805,-74.14091 40.60826,-74.132101","encoded_poly_line":"sezvFtsocM_Bav@","encoded_poly_line_lvls":"BB","owner":"NYC_DOT_LIC","transcom_id":"4616208","borough":"Staten Island","link_name":"SIE W BRADLEY AVENUE - WOOLEY AVENUE"}
,{"id":"426","speed":"39.76","travel_time":"173","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616272","link_points":"40.7024204,-73.816481 40.700841,-73.815751 40.69726,-73.81382 40.6959105,-73.812981 40.68863,-73.808861 40.6775105,-73.803261 40.67445,-73.80181","encoded_poly_line":"culwF~gpaMzHqCjUaKlGgDnl@wXndA_b@bRaH","encoded_poly_line_lvls":"BBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616272","borough":"Queens","link_name":"VWE S MP4.63 (Exit 6 Jamaica Ave) - MP2.66 (Exit 2 Roackaway Blvd)"}
,{"id":"453","speed":"42.87","travel_time":"128","status":"0","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616239","link_points":"40.7714206,-73.83345 40.7702704,-73.8354 40.76924,-73.836711 40.7675005,-73.838271 40.76592,-73.839261 40.7654804,-73.83954 40.7649004,-73.839841 40.76456,-73.83995 40.76418,-73.84001 40.7636705,-73.84 40.7633,-73.839941 40.7626006,-73.839671","encoded_poly_line":"kdzwF`rsaMdFdKlEdGzIvHzHdEvAv@rBz@bATjAJdBAhAKjCu@","encoded_poly_line_lvls":"BBBBBBBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616239","borough":"Queens","link_name":"Whitestone Expwy S Exit 14 (Linden Pl) - VWE S MP8.65 (Exit 13 Northern Blvd)"}
,{"id":"338","speed":"0.00","travel_time":"0","status":"-101","data_as_of":"2024-02-13T07:09:09.000","link_id":"4616255","link_points":"40.81376,-73.93128 40.8127206,-73.9316 40.8121104,-73.93169 40.81148,-73.93161 40.81091,-73.931261 40.81044,-73.93079 40.8082604,-73.927901 40.80772,-73.9269 40.80653,-73.923971 40.80503,-73.92024 40.8037906,-73.917471 40.8036,-73.916931 40.803411,-73.915","encoded_poly_line":"_mbxFnufbMnE~@xBP\\\\|BOpBeA\\\\|A}ArLaQjBgElFiQjHiVvFiPd@kBd@_EB{DQcBi@uB{@cCcEmL??wCcI_AyCgEwMo@wAa^{j@eGiJcAiAyB}Bya@gc@_A{@sFgJcC}CK_@cGm{@uAmS","encoded_poly_line_lvls":"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB","owner":"NYC_DOT_LIC","transcom_id":"4616255","borough":"Bronx","link_name":"MDE N RFK Bridge - 142nd St"}

DATA LINK

NY 511 Step-by-Step

DataFlow for Ingesting All 74+ Million Rows (1,000 at a time)

Start of Apache NiFi / Cloudera DataFlow

1 — GenerateFlowFile — start full download process

2 — UpdateAttribute — initialize counter (${i}) to 0

3 — RouteOnAttribute — keep going until we hit 70 million. ${i:lt(80000000)}

4a—UpdateAttribute — increment our counter (${i}) and loop back. ${i:plus(1000)}

4b — InvokeHTTP — Call first with counter of 0, then 1000, 2000, 3000, … There is no login required, but since I have an account. I set my X-App-Token from my profile as a header utilizing a sensitive NiFi attribute.

5 — SplitRecord — Read JSON and write JSON in 1 record splits.

6 — UpdateRecord — add extra fields. /ts = ${now():toNumber()} /uuid = ${uuid}

7 — UpdateRecord — parse lat/long. /parselatlong=substringBefore( /link_points, ‘ ‘ )

8 — UpdateRecord — parse out. /latitude=substringBefore( /parselatlong, ‘,’ ) /longitude=substringAfter( /parselatlong, ‘,’ )

9 — ControlRate — maximum rate 5 in 1 second. This is for demos.

10 — PublishKafkaRecord_2_6 — Produce Kafka records to nytrafficspeed

11 — RetryFlowFile — If kafka produce failed, let’s try three more times.

Example Attributes Showing SODA2 Values

Example Rows

Apache Kafka Topic

nytrafficspeed

Streaming Analytics with Flink SQL

Materialized Views with Postgresql, REST, JSON, Cloudera SQL Stream Builder

We can build simple queries, joins, aggregates and add many functions.

Query Joining with Full Outer Join of NY Traffic Speed, MTA Buses and TRANSCOM.

SELECT n.speed, n.travel_time, n.borough, n.link_name, n.link_points,
n.latitude, n.longitude, DISTANCE_BETWEEN(CAST(t.latitude as STRING),
CAST(t.latitude as STRING),
m.VehicleLocationLatitude, m.VehicleLocationLongitude) as miles,
t.title, t.`description`, t.pubDate, t.latitude, t.longitude,
m.VehicleLocationLatitude, m.VehicleLocationLongitude,
m.StopPointRef, m.VehicleRef,
m.ProgressRate, m.ExpectedDepartureTime, m.StopPoint,
m.VisitNumber, m.DataFrameRef, m.StopPointName,
m.Bearing, m.OriginAimedDepartureTime, m.OperatorRef,
m.DestinationName, m.ExpectedArrivalTime, m.BlockRef,
m.LineRef, m.DirectionRef, m.ArrivalProximityText,
m.DistanceFromStop, m.EstimatedPassengerCapacity,
m.AimedArrivalTime, m.PublishedLineName,
m.ProgressStatus, m.DestinationRef, m.EstimatedPassengerCount,
m.OriginRef, m.NumberOfStopsAway, m.ts, n.eventTimeStamp, (m.uuid || t.uuid || n.uuid) as allid
FROM jsonmta /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */ m
FULL OUTER JOIN jsontranscom /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */ t
ON (t.latitude >= CAST(m.VehicleLocationLatitude as float) - 0.3)
AND (t.longitude >= CAST(m.VehicleLocationLongitude as float) - 0.3)
AND (t.latitude <= CAST(m.VehicleLocationLatitude as float) + 0.3)
AND (t.longitude <= CAST(m.VehicleLocationLongitude as float) + 0.3)
FULL OUTER JOIN nytrafficspeed /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */ n
ON (n.latitude >= CAST(m.VehicleLocationLatitude as float) - 0.3)
AND (n.longitude >= CAST(m.VehicleLocationLongitude as float) - 0.3)
AND (n.latitude <= CAST(m.VehicleLocationLatitude as float) + 0.3)
AND (n.longitude <= CAST(m.VehicleLocationLongitude as float) + 0.3)
WHERE m.VehicleRef is not null
AND t.title is not null

The above query let’s me join in real-time three different feeds. The NY Speed Sensor one we just built (nytrafficspeed) to the MTA Bus feed (jsonmta) we did before.

Also the TRANSCOM feed (jsontranscom) which is pretty awesome.

Traffic Management & Intelligent Transportation System

TRANSCOM is a coalition of 16 transportation and public safety agencies in the New York — New Jersey — Connecticut metropolitan region. It was created in 1986 to provide a cooperative, coordinated approach to regional transportation management.

https://www.xcm.org/home

In the future I may connect this query to one I have for airplanes and weather since they also have Latitude and Longitude.

You can see the join is kind of a hack as it’s a range of nearness in lat/long which is not super accurate. I am looking at the some better lat/long nearness calculations.

I have used this DISTANCE_BETWEEN function from:

This is a little better, so we will probably be looking at that one.

Source Code:

https://github.com/tspannhw/FLaNK-Transit

Flink SQL Note:

/*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */

The above is a Flink SQL hint to look at the earliest offset in the Kafka topics, this gets us all available data. This can potentially be petabytes. You can remove this and only get messages as they come. Even with the full outer joins and the range matching on the lat/long you could have zero rows.

Once the query runs and gives us good data, let’s build a materialized view.

I have created so there are always rows in the view.

REST/JSON Feed

HTML View of JSON/REST Feed

NYC Cameras

Data URL

https://511ny.org/api/getcameras?key=SIGNUP&format=json

Routing

${Blocked:equalsIgnoreCase("False"):and(${Disabled:equalsIgnoreCase("False")})}

NiFi Flow

Slack Message

Camera: ${ID}  ${Name}  ${RoadwayName}
Direction of Travel ${DirectionOfTravel}
Lat/Long: ${Latitude}/${Longitude}
URL: ${Url} ${VideoUrl} ${url}
Date: ${date} ${ending}
File: ${filename}

Example Camera — Apache Kafka Message

{
"Latitude":41.07311,
"Longitude":-73.91839,
"ID":"Skyline-9962",
"Name":"I-87 MP 016.10a SB Gov. Mario M. Cuomo Bridge View 1",
"DirectionOfTravel":"Southbound",
"RoadwayName":"I-87 - NYS Thruway",
"Url":"https://511ny.org/map/Cctv/9962--43",
"VideoUrl":"https://s58.nysdot.skyvdn.com:443/rtplive/TA_168/playlist.m3u8",
"Disabled":false,
"Blocked":false
}

Send to Kafka

Send to Slack

NYC Message Signs — Example Data

[ 
{
"Latitude" : 43.122105,
"Longitude" : -76.138595,
"ID" : "Region3-13030",
"Name" : "F128604",
"Roadway" : "Mobile",
"DirectionOfTravel" : "None",
"Messages" : [ "NO_MESSAGE" ],
"ts" : "1707880547804",
"uuid" : "a800321b-1b6c-4fed-b9bb-74568b41960d"
}, {
"Latitude" : 41.523651,
"Longitude" : -74.020903,
"ID" : "Region8-42080",
"Name" : "US 9WSB North of I84 (Newburgh Beacon Brdg) [VMS-214]",
"Roadway" : "US 9W SB",
"DirectionOfTravel" : "None",
"Messages" : [ "NO_MESSAGE" ],
"ts" : "1707880547805",
"uuid" : "a800321b-1b6c-4fed-b9bb-74568b41960d"
}, {
"Latitude" : 42.65456,
"Longitude" : -78.95493,
"ID" : "NITTEC-1109",
"Name" : "90EW Exit 57A Ramp",
"Roadway" : "Unknown",
"DirectionOfTravel" : "None",
"Messages" : [ "NO_MESSAGE" ],
"ts" : "1707880547805",
"uuid" : "a800321b-1b6c-4fed-b9bb-74568b41960d"
}
]

COMING SOON

RESOURCES

Google ImageFX

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/