Building a Library of Python Processors

Tim Spann
Cloudera
Published in
10 min readJan 26, 2024

Apache NiFi has a large palette of processors to handle everything from ingest of REST Feeds, Databases, CDC, sFTP, WebSockets, Cloud Sources, NoSQL Data, Logs, Raw Network Data, Text Files, PDFs, Documents, and hundreds of other items. It also has dozens of processors for enriching, munching, un-encrypting, unzipped, untaring and cleaning up data. There are even more to handle record processing, querying, updating and manipulating data with SQL, JSONPath, XSLT and RecordPaths. There are also ones for working with Cloud resources, ML, Vector Stores and GenAI model inference.

This covers a lot, but not everything. So we can quickly spin up new processors using Python 3.x. So I am porting useful apps and libraries over very quickly. You can as well, let’s make building data engineering and Generative AI pipelines easy.

Using HuggingFace, NLP, SpaCY, PyTorch and JSON to Parse Company Names from Text Strings

Using WatsonX SDK To Run Inference Against Production WatsonX.AI LLM Foundation Models

Storing and Querying Data from Pinecone and Chroma Vector Databases

Adding System and Process Monitoring

absolute.path
/Users/tspann/Downloads/servers/medium/posts/
companylist
[]
cpu
26.0
diskusage
67683.2 MB
document.count
5
file.creationTime
2024-01-15T16:27:52-0500
file.group
staff
file.lastAccessTime
2024-01-15T21:16:20-0500
file.lastModifiedTime
2024-01-15T21:16:20-0500
file.owner
tspann
file.permissions
rwxrwxrwx
file.size
45747
filename
2023-12-19_Ten-Tips-To-Turbo-Charge-Your-Streaming-b4749465ad48.html
memory
66.9
mime.type
application/json
netaddr
{"lo0": [[2, "127.0.0.1", "255.0.0.0", null, null], [30, "::1", "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", null, null], [30, "fe80::1%lo0", "ffff:ffff:ffff:ffff::", null, null]], "en0": [[2, "192.168.1.153", "255.255.255.0", "192.168.1.255", null], [18, "bc:d0:74:65:4b:da", null, null, null], [30, "fe80::821:6f53:8208:ee72%en0", "ffff:ffff:ffff:ffff::", null, null]], "anpi2": [[18, "9a:b8:70:7d:1e:1d", null, null, null], [30, "fe80::98b8:70ff:fe7d:1e1d%anpi2", "ffff:ffff:ffff:ffff::", null, null]], "anpi1": [[18, "9a:b8:70:7d:1e:1c", null, null, null], [30, "fe80::98b8:70ff:fe7d:1e1c%anpi1", "ffff:ffff:ffff:ffff::", null, null]], "anpi0": [[18, "9a:b8:70:7d:1e:1b", null, null, null], [30, "fe80::98b8:70ff:fe7d:1e1b%anpi0", "ffff:ffff:ffff:ffff::", null, null]], "en4": [[18, "9a:b8:70:7d:1e:fb", null, null, null]], "en5": [[18, "9a:b8:70:7d:1e:fc", null, null, null]], "en7": [[18, "9a:b8:70:7d:1e:fd", null, null, null]], "en1": [[18, "36:6d:e8:f0:47:00", null, null, null]], "en2": [[18, "36:6d:e8:f0:47:04", null, null, null]], "en3": [[18, "36:6d:e8:f0:47:08", null, null, null]], "bridge0": [[18, "36:6d:e8:f0:47:00", null, null, null]], "ap1": [[18, "be:d0:74:65:4b:da", null, null, null], [30, "fe80::bcd0:74ff:fe65:4bda%ap1", "ffff:ffff:ffff:ffff::", null, null]], "awdl0": [[18, "ba:11:4d:78:20:84", null, null, null], [30, "fe80::b811:4dff:fe78:2084%awdl0", "ffff:ffff:ffff:ffff::", null, null]], "llw0": [[18, "ba:11:4d:78:20:84", null, null, null], [30, "fe80::b811:4dff:fe78:2084%llw0", "ffff:ffff:ffff:ffff::", null, null]], "utun0": [[30, "fe80::5704:5c25:1115:1fca%utun0", "ffff:ffff:ffff:ffff::", null, null]], "utun1": [[30, "fe80::8abc:15c3:6336:ff62%utun1", "ffff:ffff:ffff:ffff::", null, null]], "utun2": [[30, "fe80::ce81:b1c:bd2c:69e%utun2", "ffff:ffff:ffff:ffff::", null, null]]}
netio
{"lo0": [581977088, 581977088, 1716004025, 1716004025, 0, 0, 0, 0], "gif0": [0, 0, 0, 0, 0, 0, 0, 0], "stf0": [0, 0, 0, 0, 0, 0, 0, 0], "anpi2": [0, 0, 0, 0, 0, 0, 0, 0], "anpi1": [0, 0, 0, 0, 0, 0, 0, 0], "anpi0": [0, 0, 0, 0, 0, 0, 0, 0], "en4": [0, 0, 0, 0, 0, 0, 0, 0], "en5": [0, 0, 0, 0, 0, 0, 0, 0], "en7": [0, 0, 0, 0, 0, 0, 0, 0], "en1": [0, 0, 0, 0, 0, 0, 0, 0], "en2": [0, 0, 0, 0, 0, 0, 0, 0], "en3": [0, 0, 0, 0, 0, 0, 0, 0], "bridge0": [0, 0, 0, 0, 0, 0, 0, 0], "ap1": [106310656, 0, 295591, 0, 0, 0, 0, 0], "en0": [2794720256, 2806706176, 142318847, 303910675, 0, 0, 0, 0], "awdl0": [3589120, 1068032, 17529, 2684, 0, 0, 0, 0], "llw0": [0, 0, 0, 0, 0, 0, 0, 0], "utun0": [2740224, 0, 13696, 0, 0, 0, 0, 0], "utun1": [2740224, 0, 13695, 0, 0, 0, 0, 0], "utun2": [2740224, 0, 13695, 0, 0, 0, 0, 0], "utun3": [0, 0, 0, 0, 0, 0, 0, 0]}
parsedcompany
Empty string set
path
./
pids
[0, 1, 107, 115, 117, 118, 142, 143, 144, 157, 158, 174, 176, 244, 258, 277, 278, 293, 294, 304, 306, 308, 309, 310, 313, 315, 317, 322, 327, 333, 334, 339, 342, 343, 344, 345, 346, 347, 348, 350, 351, 353, 354, 355, 357, 361, 362, 363, 367, 369, 370, 371, 373, 374, 377, 379, 382, 385, 386, 388, 392, 393, 394, 404, 406, 407, 408, 413, 416, 419, 420, 431, 467, 477, 480, 489, 492, 493, 494, 495, 496, 499, 503, 511, 516, 520, 526, 532, 535, 537, 546, 555, 599, 604, 606, 608, 610, 614, 615, 620, 621, 623, 629, 642, 644, 647, 650, 651, 654, 662, 665, 669, 670, 671, 674, 685, 689, 690, 692, 702, 704, 707, 712, 714, 729, 732, 788, 791, 802, 809, 810, 821, 843, 849, 851, 855, 986, 1014, 1016, 1017, 1038, 1048, 1060, 1784, 1785, 1813, 1817, 1826, 1836, 1838, 1839, 1841, 1844, 1846, 2160, 2197, 2259, 2266, 2297, 2414, 2523, 2555, 2693, 2720, 2735, 3148, 3149, 3155, 3204, 3210, 3212, 3221, 3826, 3939, 4163, 4285, 4286, 4593, 4628, 4629, 4631, 4710, 4724, 4725, 4726, 4728, 4730, 4731, 4741, 4742, 4748, 4784, 4785, 4786, 4792, 4798, 4803, 4830, 4832, 4833, 4835, 4836, 4838, 4843, 5539, 5540, 6461, 8579, 8848, 8849, 9164, 10766, 11939, 11963, 11964, 12249, 12250, 12252, 12716, 12734, 12735, 12736, 12737, 12738, 12739, 12771, 15387, 17244, 18630, 18641, 19772, 20143, 22855, 23139, 28725, 28726, 31104, 32624, 32626, 32627, 32642, 32644, 33002, 33006, 35338, 35339, 35340, 35662, 38288, 38289, 39092, 39173, 39732, 41013, 41938, 42161, 42171, 43024, 43057, 43076, 43354, 43380, 43388, 43577, 43652, 43853, 45596, 45597, 45759, 46397, 47573, 47577, 47642, 48386, 48695, 50137, 50139, 50141, 50142, 50144, 50145, 50155, 50454, 50456, 51053, 51737, 52687, 52989, 53451, 53513, 53539, 53600, 53861, 53997, 54530, 55366, 55612, 56311, 58225, 58810, 59296, 61539, 61634, 62337, 62338, 62800, 63457, 65306, 66379, 68965, 69022, 69880, 69899, 69902, 69904, 69905, 69910, 69914, 69917, 69951, 70025, 70103, 70108, 72597, 72995, 72997, 73456, 73621, 73630, 73645, 73656, 73704, 73705, 73711, 75147, 75636, 75650, 75823, 76272, 77422, 77756, 78914, 78915, 80974, 82604, 82606, 82653, 82756, 82809, 83133, 83587, 83591, 83593, 83594, 83597, 84828, 84829, 84865, 86309, 86335, 86612, 86619, 86665, 86672, 86673, 87635, 87705, 88016, 88699, 90064, 90736, 91286, 91493, 92145, 92944, 93618, 93786, 93787, 93826, 93828, 93832, 93833, 93834, 93835, 93850, 93851, 93852, 93853, 93855, 93856, 93858, 93867, 93873, 93876, 93877, 93878, 93881, 93882, 93883, 93884, 93899, 93925, 93927, 93928, 93929, 93930, 93931, 93932, 93933, 93934, 93936, 93937, 93938, 93939, 93955, 93956, 93957, 93958, 93959, 93960, 93961, 93962, 93963, 93964, 93965, 93966, 93969, 93970, 93971, 93972, 93973, 93974, 93975, 93976, 93977, 93978, 93979, 93986, 93987, 93988, 94006, 94007, 94008, 94009, 94053, 94199, 94200, 94201, 94202, 94203, 94205, 94206, 94243, 94244, 94245, 94246, 94247, 94327, 94409, 94546, 94577, 94611, 94706, 94707, 94709, 95221, 95548, 95549, 95584, 95610, 95717, 96605, 96632, 96642, 96643, 96645, 96649, 96650, 96651, 96666, 96667, 96668, 96669, 96670, 96672, 96675, 97330, 97498, 97788, 97971, 98196, 98762, 98764, 98775, 98776, 99133, 99876, 99877, 99931, 99941, 99944, 99945, 99947, 99948, 99949, 99950, 99951, 99952, 99953, 99954, 99956, 99957, 99964, 99972, 99984, 99990, 99991]
prompt_text
Codeless Generative AI Pipelines with Chroma Vector DB & Apache NiFi Codeless Generative AI Pipelines with Chroma Vector DB & Apache NiFi Software: chroma vector database, Apache NiFi, Apache Kafka, Slack Python 3.x Let’s grab some data from right here, yes this article is in the data: It is very easy to start a data pipeline ingesting documents in any format. For this one we will grab HTML documents from a Medium via my RSS feed. RSS is a type of XML, oldy but goody. This is no problem for NiFi. We easily grab all these documents and get them in a format to parse, chunk and store in a vector database. We can also do Pinecone, Milvus and others. NiFi Flow Consume Medium Feed: InvokeHTTP — Get against https://medium.com/@tspann/feed Consume Medium Feed: QueryRecord — convert RSS to JSON Consume Medium Feed: SplitJSON — $.[0].channel.item Consume Medium Feed: EvaluateJSONPath — parse out best fields Consume Medium Feed: UpdateAttribute — Set JSON Consume Medium Feed: UpdateRecord — add prime fields Consume Medium Feed: contentFull Output Port AttributesToJSON: cleanup EvaluateJSONPath: convert to FlowFile ParseDocument (Python): convert HTML FlowFile to pre-load JSON format ChunkDocument (Python): chunk parsed document into smaller pieces if needed. PutChroma (Python): store to Chroma Vector Database. QueryChroma (Python): query the database and match the query results. Kafka Path: PublishKafkaRecord_2_6 — send Chroma results to Kafka chromaresultsmedium Kafka Path: RetryFlowFile — if Apache Kafka send fails try again. Slack Path: SplitRecord — split into 1 record for display Slack Path: EvaluateJSONPath — pull out fields to display Slack Path: PutSlack — send formatted message to #chat group Parsed Document JSON Format "text"
swapmemory
sswap(total=15032385536, used=13471907840, free=1560477696, percent=89.6, sin=4002171568128, sout=125978902528)
users
[["tspann", "console", null, 1697720344.0, 377], ["tspann", "ttys006", null, 1702400538.0, 50454], ["tspann", "ttys002", null, 1705540161.0, 82604], ["tspann", "ttys004", null, 1701382397.0, 99876], ["tspann", "ttys001", null, 1706054866.0, 8848], ["tspann", "ttys000", null, 1706042484.0, 78914], ["tspann", "ttys005", null, 1701462429.0, 28725], ["tspann", "ttys011", null, 1705515675.0, 45596], ["tspann", "ttys012", null, 1702577978.0, 62337]]
uuid
24301bdc-8078-4434-9a49-55d834bef317

#PSUTIL
psutil

Generate Synthetic Records with Faker

Attribute Values Produced

catchphrase
Ameliorated needs-based matrix
city
West Ashleyshire
clustername
benefit-rate-ask
comment
orchestrate proactive technologies
company
Cruz, Martinez and Edwards
country
Faroe Islands
createddt
2021-01-01
ean13
8830281905112
email
coreyschneider@yahoo.com
filename
61cab42f-7467-4ef9-a2b0-36b0ef8784ea
firstname
Richard
ipv4public
132.220.100.214
job
Data processing manager
lastname
Reynolds
latitude
-38.857865
licenseplate
ZSE 002
longitude
-56.895629
md5
98507349ceddb161f01f0548a41ba2fb
password
+9&X&Dqcp7
path
./
phonenumber
(972)874-5913x48635
postcode
85790
streetaddress
21666 Heather Summit
useragent
Opera/9.30.(X11; Linux i686; mni-IN) Presto/2.9.160 Version/12.00
username
grahammathew
uuid
18c7da9a-af7f-42fc-adbf-376752123a47
uuidkey
20240125171344_5f5980f6-00c8-41b3-93b3-ccbde917d395
OK

Download a Wiki Page as HTML or WikiFormat (Text)

We are using the Python 3 PIP — wikipedia-api. You can install it locally for testing via the pip3 below. It is added to the requirements.txt.

Currently, I output the results to an attributes called results. Comment if I should change to Flow File results or a different output attribute name.

pip3 install wikipedia-api
I like to look at all the python processors. GetWikiData is for getting wiki data.
Pick text or html. Put your wiki page name in Wiki Page property.
Just like any other processor can run on all the nodes and be scheduled.

I will be adding a few new ones a week as I go through my old Python code and investigate interesting new processing libraries, sources, sinks and more. This is phun.

Get GTFS Data and Translate from Protobuf to JSON

This processor calls any GTFS REST End point and returns a JSON Flow File.

ADDITIONAL RESOURCES

Weekend hacking some Python

--

--

Tim Spann
Cloudera

Principal Developer Advocate, Zilliz. Milvus, Attu, Towhee, GenAI, Big Data, IoT, Deep Learning, Streaming, Machine Learning. https://www.datainmotion.dev/