Boston, Where’s My Bus? LLM + Streaming to the Rescue

Tim Spann
Cloudera
Published in
13 min readMay 7, 2024

Apache NiFi, MBTA, Real-Time Transit, Kafka, Postgresql, Flink, Flink SQL, KNiFe AI, FLaNK AI, Python, GTFS, GTFS-RT, Trips, Alerts, Vehicles, Latitude/Longitude, Maps, Math, Buses, Conferences, Data.

So where can I find a bus from the Conference venue?

I am at the venue and I need to catch some public transit somewhere, anywhere. At some point it will be to South Station to catch a train.

The event is at:

Hyatt Regency Boston
One Avenue de Lafayette
Boston, MA 02111

It turns out combining real-time Q&A, Slack, maps, turning words into locations and calling the right transit API and/or LLM is not trivial. I needed a few great Open Source libraries and a number of Python processors. So that’s what I did. There’s some API calls and some new components for processing and parsing my Slack question.

Model Used: HuggingFace Mixtral 8x7B

An important piece of information that can be answered without AI is what bus stops are near me. Well thanks to some math and some helpful Postgresql queries I can get the nearest stations to me.

The first thing I need to do is translate my question to my location as latitude and longitude. Once I have done that I can use my function to calculate the nearest stops. How did we get all the bus stops? Fortunately MBTA (Massachusetts BayTransportation Authority) provides a list of all of there stops as part of the standard GTFS feeds.

It’s a zip file of CSV, which we can easily automate loading into Postgresql with NiFi.

https://cdn.mbta.com/MBTA_GTFS.zip

Our flow downloads the zips, unzips it, parses out the individual files, then we convert them from CSV to JSON. We then split them into individual records and set a primary key for each one. This requires some manual switch statement with the advanced properties in UpdateAttribute. With AI we could assemble automatic insert statements. Perhaps that’s a future update.

So next we assemble our update table (with create in case this is the first time we have run this). As you can imagine occassionally things are updated so I have NiFi scheduled to update once a day. We can adjust that as the first processor has a full chron style scheduler.

We set the primary key and the table name from the file
This takes the SQL generated by the last steps and executes it

We can now use our tables as lookups. The important one being mbtalookupstops:

Following some example SQL out there I built a function against that table.

-- DROP FUNCTION public.location_distance(text, text);

CREATE OR REPLACE FUNCTION public.location_distance(latitude text, longitude text)
RETURNS TABLE(distance text, stop_name text, stop_desc text, stop_lat text, stop_lon text, location_type text, stop_url text)
LANGUAGE sql
AS $function$
select to_char(float8 (point(cast($1 as float),cast($2 as float)) <@> point(stop_lat::float,stop_lon::float)), 'FM999999999.00') as distance,
stop_name, stop_desc, stop_lat, stop_lon, location_type, stop_url
from mbtalookupstops m
order by to_char(float8 (point(cast($1 as float),cast($2 as float)) <@> point(stop_lat::float,stop_lon::float)), 'FM999999999.00')
limit 5

$function$
;

This calculates the distance between the point (lat/long) that you pass in against the point (lat/long) stored in the table for the stop.

select * from location_distance('${latitude}', '${longitude}')

All we have to do is make a regular SQL query against it from NiFi.

We call that from NiFi via the ExecuteSQLRecord.

This will give us five result records.

Detailed Slack Solution Step-by-Step

Tim, you skipped a lot of details. Okay, so let’s show you how to round trip from request to result.

I have one setup for ingesting and parsing all of my Slack interactions and then distribute it to apps to use it.

Our generic Slack NiFi app receives, parses, transforms, enriches, cleanses and distributes it to Kafka, models and if necessary vector database.

For bus, I avoid our RAG setup as my Pinecone (call also be Milvus, Weaviate, ChromaDB, PGVector, …) as I only stored my Medium articles and Slides in there. Those don’t make for good results for bus queries. I do let my original query go to my model to get some generic LLM results which are not bad. We also sent this to our message to our Boston receiver.

flank-slack-messages-prompts

A — ConsumeKafka_2_6 — Continuously receive Slack messages from Kafka

B — EvaluateJSONPath — extract key fields as attributes to use elsewhere.

C — ExtractEntities — custom Python processor. This Python app uses SpaCY to parse out entities including organizations, locations, people, money and more. These are added as individual attributes. We use messagetext extracted from Slack in step B.

${messagetext:trim()}

D — ParseAddresses — custom Python processor

E — UpdateAttribute —In this processor, we build a location

${facs:trim():append(‘ ‘):append(${orgs:trim():append(‘ ‘):append(${gpes:trim()})})}

F — RouteOnAttribute — route to correct question

${messagetext:toUpper():contains(‘BUS’)}

${messagetext:toUpper():contains(‘STOCK’)}

${messagetext:toUpper():contains(‘WEATHER’)}

Capture Latitude and Longitude and then send to weather and bus

1 — InvokeHTTP — Call first type of Geocoding (US Census 2020)—

https://geocoding.geo.census.gov/geocoder/locations/onelineaddress?address=${location:trim():urlEncode()}&benchmark=2020&format=json

2 — EvaluateJsonPath — Extract primary fields for latitude and longitude.

$.result.addressMatches.[0].coordinates.x
$.result.addressMatches.[0].coordinates.y

3 — RouteOnAttribute —If latitude is empty, try something else to geocode the address or pass on to success.

${latitude:trim():equals(‘’)}

4— InvokeHTTP — Let’s try another service. Geocode from Maps.

https://geocode.maps.co/search?q=${location:trim():urlEncode()}&api_key=RegisterForAKey

4b — RouteOnAttribute — The Happy Path, first service worked send this on to the next step either looking up a bus stop, stock quote or the current weather.

${messagetext:toUpper():contains(‘BUS’)}
${messagetext:toUpper():contains(‘STOCK’)}
${messagetext:toUpper():contains(‘WEATHER’)}

5— RouteOnAttribute — The first geocoder failed, so we had to try again in step 4, now we check those results to see if it’s a full result.

${filesize:length():lt(100)}

6 — QueryRecord — We received some JSON rows. Let’s limit to the first one.

SELECT lat as latitude, lon as longitude, display_name
FROM FLOWFILE
ORDER BY importance DESC
LIMIT 1

6b— AddressToLatLong — Try another time, but with my custom Python processor. We pass in the location.

${location:trim()}

This processor is defined here:

It uses geopy.geocoders - Nominatim which is related to OpenStreetMaps and is very solid.

7b — RouteOnAttribute — We check to see if location is empty.

${latitude:trim():equals(‘’)}

7— SplitRecord — We are taking the results of QueryRecord and split down to ensure it’s just one record.

8— EvaluateJsonPath — We parse out latitude and longitude. Now we send it to Step 4b.

Nearest Bus — AI Enhanced Query

This new flow finds the nearest two bus stops based on current lat/lng location. This is the path that answers this slack question

What buses are near Hyatt Regency Boston, Boston, MA?

User asks in Slack group

The results are a threaded reply to your English natural language query.

NiFi Slack Bot Output

The first two results are from this flow and the last is from our previous LLM call to Mixtral 8x7B-Instruct-v0.1 running on HuggingFace.

1 — ExecuteSQLRecord — Call my custom Postgresql function to get nearby stops.

select * from location_distance(‘${latitude}’, ‘${longitude}’)

See Source Code: https://github.com/tspannhw/FLaNKAI-Boston

-- DROP FUNCTION public.location_distance(text, text);

CREATE OR REPLACE FUNCTION public.location_distance(latitude text, longitude text)
RETURNS TABLE(distance text, stop_name text, stop_desc text, stop_lat text, stop_lon text, location_type text, stop_url text)
LANGUAGE sql
AS $function$
select to_char(float8 (point(cast($1 as float),cast($2 as float)) <@> point(stop_lat::float,stop_lon::float)), 'FM999999999.00') as distance,
stop_name, stop_desc, stop_lat, stop_lon, location_type, stop_url
from mbtalookupstops m
order by to_char(float8 (point(cast($1 as float),cast($2 as float)) <@> point(stop_lat::float,stop_lon::float)), 'FM999999999.00')
limit 5

$function$
;
JSON Results

2 — QueryRecord — Retrieve the top two results for now.

SELECT * FROM FLOWFILE LIMIT 2

3 — SplitRecord — split to single record. Reader — JSON, Writer — JSON.

4 — EvaluateJsonPath — Extract attributes from the record.

5 — PublishSlack — Send our output to Slack channel.

Nearest Buses to ${location} (inside this geofenced box ${boundingbox})
You are currently at a ${displayname} which is a ${addresstype} found at this location @ ${latitude}/${longitude}.
This near by bus stop is ${distance} km(s) away.
It is called ${stopname} @ ${stoplat}/${stoplon}. [${stopdesc}]
${stopurl}
========Message: ${messagetext} from ${messagerealname} ${messageusername} @ ${messageusertz}
========= Dates: ${date} TS: ${ts} KT: ${kafka.timestamp}
======== Parsed: Dates: ${dates} Events: ${events} Facs: ${facs} GPE: ${gpes} LOC: ${locs} MONEY: ${moneys}
======== Parsed: ORG: ${orgs} PERSON: ${persons} PRODUCT: ${products} QUANTITY: ${quantities}
=== OSM Details: ${osmclass} ${osmid} ${osmimportance} ${osmlicense} ${osmname} ${osmtype} ${place_id} ${placerank} ${locationtype}

6 — AttributesToJSON — Build a new JSON record from attributes.

addresstype, boundingbox, date, dates, displayname, distance, events, executesql.query.duration, executesql.query.executiontime, executesql.query.fetchtime, executesql.row.count, facs, gpes, inputs, invokehttp.request.duration, invokehttp.tx.id, kafka.timestamp, last-modified, latitude, location, locationtype, locs, longitude, messagechannel, messageid, messagerealname, messagetext, messageusername, messageusertz, moneys, orgs, osmclass, osmid, osmimportance, osmname, osmtype, persons, place_id, placerank, primaryaddress, products, quantities, slack.ts, stopdesc, stoplat, stoplon, stopname, stopurl, times, ts, uuid

7 — PublicKafkaRecord_2_6 — Publish to Kafka topic mbtarequestedbus.

{
"date" : "Tue, 23 Apr 2024 20:23:45 GMT",
"locs" : "",
"stopurl" : "",
"invokehttp.tx.id" : "839291ce-62a6-40cc-8d74-90a2509f6536",
"osmtype" : "hotel",
"inputs" : "What buses are near Hyatt Regency Boston, Boston, MA?",
"quantities" : "",
"osmimportance" : "9.99999999995449e-06",
"uuid" : "579e5cd8-b710-4a7d-a4fe-7174c3b36fb2",
"kafka.timestamp" : "1713903822638",
"executesql.query.executiontime" : "18",
"events" : "",
"place_id" : "18081472",
"longitude" : "-71.06057971743516",
"messagerealname" : "Timothy Spann",
"stoplon" : "-71.060521",
"dates" : "",
"messageusername" : "tspann",
"stopdesc" : "Downtown Crossing - Macy's",
"executesql.query.duration" : "19",
"persons" : "",
"osmid" : "240550058",
"messagetext" : "What buses are near Hyatt Regency Boston, Boston, MA?",
"orgs" : "Hyatt Regency Boston",
"osmclass" : "building",
"primaryaddress" : "",
"distance" : ".04",
"facs" : "",
"latitude" : "42.3535897",
"messageid" : "1987f07d-1108-41e3-9fe3-6a247455cbbb",
"invokehttp.request.duration" : "408",
"stopname" : "Downtown Crossing - Macy's",
"executesql.row.count" : "5",
"products" : "",
"osmname" : "Hyatt Regency Boston",
"last-modified" : "Tue, 23 Apr 2024 20:23:45 GMT",
"times" : "",
"placerank" : "30",
"addresstype" : "building",
"boundingbox" : "['42.3532601', '42.3539761', '-71.0611654', '-71.0603267']",
"gpes" : "Boston, MA",
"slack.ts" : "1713903826.241579",
"stoplat" : "42.355253",
"executesql.query.fetchtime" : "1",
"messagechannel" : "C06MCRGT0JF",
"displayname" : "Hyatt Regency Boston, Chauncy Street, Downtown Crossing, Downtown Boston, Boston, Suffolk County, Massachusetts, 02111, United States",
"messageusertz" : "America/New_York",
"location" : " Hyatt Regency Boston Boston, MA",
"locationtype" : "2",
"moneys" : "",
"ts" : "1713903821.788869"
}

Enhancement thoughts:

Have Flink SQL query the Kafka Topic: mbta-alerts then push any alerts that match our recent requests in the last hour to Slack and send notices to users that asked questions in the last hour.

Have NiFi cache alerts at stops in cache server and then access the most recent ones when posting Slack response.

Include a OpenStreetMap in Slack results

Send results as HTML formatted email with images

Include weather at that stop automatically if it is imporant (such as rain, snow, temperature, etc…)

Stocks

For the stock flow this is just the one we have from this article.

Weather

For the weather flow this is just the one we have from this article.

Video Walk Through

Source Code

Feed URLs

Data Flow Diagram: Feeds -> NiFi -> Slack/Postgresql/Iceberg/Kafka -> Flink

Cloudera DataFlow built on Apache NiFi Flow

MBTA GTFS Real-Time Alerts

MBTA GTFS Real-Time Trip Updates

MBTA GTFS Real-Time Vehicle Position

Boston Vehicle Position Row

{"currentstatus":"INCOMING_AT",
"route_id":"Red","bearing":"85.0",
"directionid":"1","scheduled":"","latitude":"42.3963","stopid":"70061",
"tripid":"","label":"1872","starttime":"","startdate":"",
"uuid":"21ae5366-486f-4655-82fc-33e34e561c89","speed":"",
"recordid":"R-547B5839","currentstopsequence":"220",
"gtfsurl":"https://cdn.mbta.com/realtime/VehiclePositions.pb",
"occupancypercentage":"","routeid":"Red","vehiclelabel":"1826",
"vehicleid":"R-547B5839","occupancystatus":"NO_DATA_AVAILABLE",
"carriagesequence":"5","longitude":"-71.14008","timestamp":"1711775699",
"ts":"1711775722199","route_long_name":"Red Line"}

BOSTON KAFKA RECORDS

SLACK OUTPUT

HTML/JQuery/DataTables.Net/LeafletJS Dashboard

MBTA OpenStreetMaps and Tables

SEE LIVE DEMO AND PRESENTATION AT

RESOURCES

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/