Events, Streams, Flows and Maps

Tim Spann
Cloudera
Published in
6 min readApr 23, 2024

--

How to crawl, walk and run with any data, anywhere, anytime with open source tools

I was looking deeper into my Irish Transit flows when I noticed I had only touched the surface on the Irish Rail data. I need to see more and examine for streams of real data from places. I love data with latitude and longitudes or addresses as this is real-world. Bridging the real and the virtual makes me feel like I am programming in the Matrix but without cool clothes. Hmmm.. wardrobe update time for the next event?

Any way when you have location data it needs to be visualized on a map. Usually I use Flink SQL to build a materialized view as a JSON REST endpoint and build my own dashboard. Or I can with just a few clicks build a SQL app in Cloudera SQL Stream Builder and visualize that with Cloudera Data Visualization.

This is really easy, I’ll do that for the industrial strength secure cloud version of this article. For this initial one, I will keep it small, local and open source. This just needs Apache NiFi 2.0.0-M2 running locally, JDK 21, Python 3.10+ and some HTML with OpenStreetMaps, CSS, LeafletJS and Javascript.

DataFlow — Combined

Two NiFi Modules
We can call REST endpoints for Current Trains and All the Stations and then convert them from XML to JSON.

Current Date in Format for Irish Rail

${now():format(“dd MMMM yyyy”):urlEncode()}

We need to get today’s data from Irish Rail, which is in the above format. NiFi gives us the current date via now().

Example flow for stations

Stations DataFlow

For our main flow that walks from all the stations in Irish Rail, to all their details to the movements of the trains and then plots it in a simple dashboard with OpenStreetMap. We also send the output to Slack per event.

We start with the simple getAllStationsXML which returns all the stations as XML. We convert that to JSON, clean it up and then start augmenting it with two deeper URL of REST live data.

Primary URL

https://api.irishrail.ie/realtime/realtime.asmx/getAllStationsXML

Second URL

https://api.irishrail.ie/realtime/realtime.asmx/getStationDataByCodeXML?StationCode=${StationCode:trim()}

Third URL

https://api.irishrail.ie/realtime/realtime.asmx/getTrainMovementsXML?TrainId=${Traincode:trim()}&TrainDate=${urltrain}

Filename:

stations.${filename:append(${now():format(‘yyyyMMddHHmmSS’):append(${md5}):append(‘.json’)})}

We use JSON Path to split up the large chunks of JSON into individual events and enrich them with the parent station data.

Station Split:

$.*.objStation

Second Split:

$.objStationData.*

Third Split:

$.objTrainMovements.*

Cache

irishrailfull

Finally we push this data to a NiFi cache. We use the development, local option of the built in DistributedMapCache so we don’t need a cloud or other servers. We could also choose Couchbase, Cassandra, Hazelcast or Redis.

Cache options

Now that we have our data cached, a non-streaming consumer can consume it. And that’s what we will do with JQuery, LeafletJS and DataTables that can consume JSON from our NiFi endpoint.

HTML Visualization Snippet

    while (i < trainlocation.length) {
marker = new L.marker([trainlocation[i].StationLatitude, trainlocation[i].StationLongitude], { icon: customerIcon }).addTo(map).bindPopup(' ' + trainlocation[i].StationDesc +
' @ ' + trainlocation[i].LocationFullName +
' on '+ trainlocation[i].TrainDate + ' - ' +
trainlocation[i].Status + ' - Destination Time: ' +
trainlocation[i].DestinationTime + ' Leaving at: ' +
trainlocation[i].ExpectedDeparture).openPopup();

i++;
}
}

};

xhr.open('GET', 'http://localhost:9871/rail');
xhr.send();

The full HTML page is available in Github and shows you how we easily build a table and map of our transit data.

We split our flow depending if we are station or a train call. We could separate these out.
Split into records
This is Station data parsing
Split
We then enrich our station data with an additional HTTP REST Call and then send the data to Kafka. We also merge a chunk of JSON data together and cache it. This cache will feed an REST request for recent data. We will most likely update every 15–45 seconds. After watching the data, we can decide if we need it more frequently. For demos once a minute or even once every 5 minutes is good.
MergRecord -> PutDistributedMapCache (on local cache)
Our two calls to InvokeHTTP for our real-time Irish Rail XML
REST Call to getCurrentTrainsXML
Set the name and source for this feeds

Handle HTTP Requests from Web Sites and Apps

This is for people who don’t want to consume real-time events from Kafka.

Full Web Site running in NiFi and returns your cached JSON to the REST Caller
Clean and Route
And then respond
The context map keeps track of web users
Capture any passed in key and server path
Check for JSON
Share with Context Map from HTTP Request
Station and Train Detailed Data
See train and station information

Trains DataFlow

http://api.irishrail.ie/realtime/realtime.asmx/getCurrentTrainsXML

Filename:

stations.${filename:append(${now():format(‘yyyyMMddHHmmSS’):append(${md5}):append(‘.json’)})}

Slack Template

Irish Rail - Current Train Tracking
Message: ${PublicMessage:replaceAll("(\\\\r\\\\n|\\\\n)"," "):replaceAll("\\\n", " "):trim()}
Request: ${invokehttp.request.url} ${invokehttp.status.message} ${invokehttp.tx.id}
Lat/Long: ${TrainLatitude}/${TrainLongitude} Heading ${Direction}
Train: ${TrainCode} Status: ${TrainStatus}
Train Date: ${TrainDate}
Start Date/Time/TS: ${startdate} / ${starttime} / ${timestamp} / ${Date}
IDs: ${uuid} ${recordid} TripID ${tripid} File: ${filename}

Slack Output

Train
Flow for Data Processing for the Trains URL

We push the data to Kafka, Slack and to a cache to use for our REST calls.

For our trains dashboard we display all the information we have on individual trains.

It was not that hard after all. We have our two streams of data filling a cache, Slack and Kafka.

Resources

My previous article that touches on Irish Transit data feeds.

Other Data To Examine

https://api.irishrail.ie/realtime/

https://api.irishrail.ie/realtime/

https://api.irishrail.ie/realtime/realtime.asmx/getAllStationsXML

https://api.irishrail.ie/realtime/realtime.asmx/getAllStationsXML_WithStationType?StationType=A

https://api.irishrail.ie/realtime/realtime.asmx/getCurrentTrainsXML

https://data.tii.ie/Datasets/Its/DatexII/WeatherData/Content.xml

https://data.gov.ie/showcase/irish-rail-realtime

Detailed Screen Shots of NiFi Flow

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/