Machine Learning Over Streaming Kafka® Data-Part 2: Introduction to Batch Training and TensorFlow

Paul Brebner
Open Source Journal
6 min readJul 26, 2023

As I mentioned in Part 1 of this series, we are looking at Machine Learning (ML) over streaming Apache Kafka® data. But rather than just jumping in — and immediately going over a fast-flowing waterfall (in a barrel, which people have actually attempted!) — I first need to get a good understanding of TensorFlow with some “still” (static and unchanging) data and batch learning to start with. This will be easier and repeatable before we encounter Kafka and streaming and changing data.

Niagra Falls Daredevil — I assume he was one of the few survivors — unless this was a publicity photo before he went over the falls. (Source: Shutterstock)

In this blog you will learn the basic steps for batch learning over static data with TensorFlow, including how to set up a Python TensorFlow environment, importing pandas and numpies, defining the data columns and the learning class, reading the data into a Panda DataFrame, splitting the data into training/evaluation sets, creating a model, and training and evaluating the model.

Experiment 1: Batch Processing With TensorFlow

Here’s the basic TensorFlow batch code, designed to use 1 week of drone delivery data . Oh, and watch out-it’s in Python (a programming language I’m no expert in).

First, you have to set up a Python and TensorFlow environment on your computer. I followed these instructions with the addition of the following extra commands:

python3 -m pip install — upgrade pip

pip install scikit-learn

pip install pandas <span data-ccp-props=”{}”> </span>

Then there is a series of steps required, including

  • Reading the file into an internal data structure
  • Defining columns and which one is the class to be learned
  • Splitting the data into training and test data subsets
  • Defining and compiling a model (which is initially untrained), and finally
  • Training and then evaluating the model.

Now, here are the basic steps using the example Python code from above:

1. Import Pandas and Nump(t)ies

My word processor wanted to autocorrect “numpy” to “Numpty” — an alteration of numbskull, with the ending remodeled on the pattern of Humpty Dumpty (Source: Shutterstock)

First, import pandas and numpies (2 weird-sounding things I’ll explain more in detail below):

import os

from datetime import datetime

import time

import threading

import json

from sklearn.model_selection import train_test_split

import pandas as pd

import numpy as np

import tensorflow as tf

import tensorflow_io as tfio

2. Define Columns

Next, define the columns that the CSV data has. In my case, the first column is the ‘class’ that is to be learned. The label (if a shop is busy or not busy in a given hour), and the rest are features (we have 5 extra randomly generated features just to add some complexity):

# define data columns

DCOLUMNS = [

# labels

‘class’,

‘shop_id’,

‘shop_type’,

‘shop_location’,

‘weekday’,

‘hour’,

‘avgTime’,

‘avgDistance’,

‘avgRating’,

‘another1’,

‘another2’,

‘another3’,

‘another4’,

‘another5’

]

3. Read the File into Pandas

(Source: Shutterstock)

Next, read the CSV file into a panda data type. A Panda DataFrame is just a 2-dimensional, size-mutable, potentially heterogeneous tabular data structure-basically just a 2d array I suspect.

For these, I borrowed some code from here :

# read csv file

fname = ‘week1.csv’

drone_iterator = pd.read_csv(fname, header=None, names=DCOLUMNS, chunksize=100000)

drone_df = next(drone_iterator)

print(drone_df.head())

print(“size “, drone_df.size)

print(“data types “, drone_df.dtypes)

l1 = len(drone_df)

l2 = len(drone_df.columns)

l3 = len(drone_df[drone_df[“class”]==0])

l4 = len(drone_df[drone_df[“class”]==1])

print(“records “, l1, “ columns “, l2, “ class 0 “, l3, “ class 1 “, l4)

NUM_COLUMNS = len(drone_df.columns)

4. Split the Data

(Source: Shutterstock)
  1. Next, we need to split the data into 2 subsets-one for training and one for evaluation. If you use all the data for training, you will potentially overfit the model. If that happens, it won’t work as well for new unseen observations, and you also won’t have any data left over for evaluation:

# split the data into training and test sets

train_df, test_df = train_test_split(drone_df, test_size=0.4, shuffle=True)

print(“Number of training samples: “,len(train_df))

print(“Number of testing samples: “,len(test_df))

print(train_df.head())

print(test_df.head())

The “train_test_split” function takes arguments including the data to split, the ratio to use for test data, and whether to shuffle the data to make the split or not-if you don’t shuffle it, then it will simply take the first 60% for training and the last 40% for testing for this example.

5. Create a Model With the Adam Optimizer

The creation of Adam — by a Robot (by an AI — custom image via Paul Brebner and Dalle-E)

Next, we need to design, build and compile a model:

# design the model

# Set the parameters

OPTIMIZER=”adam”

LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)

METRICS=[‘accuracy’, tf.keras.metrics.TruePositives(), tf.keras.metrics.TrueNegatives(), tf.keras.metrics.FalsePositives(), tf.keras.metrics.FalseNegatives()]

# 32 is the default batch size

BATCH_SIZE=32

# EPOCHS is the number of times to train with each batch of data

EPOCHS=200

# design/build the model

model = tf.keras.Sequential([

tf.keras.layers.Input(shape=(NUM_COLUMNS,)),

tf.keras.layers.Dense(128, activation=’relu’),

tf.keras.layers.Dropout(0.2),

tf.keras.layers.Dense(256, activation=’relu’),

tf.keras.layers.Dropout(0.4),

tf.keras.layers.Dense(128, activation=’relu’),

tf.keras.layers.Dropout(0.4),

tf.keras.layers.Dense(1, activation=’sigmoid’)

])

print(model.summary())

# compile the model

model.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)

As you can see, many weird and wonderful settings are required-this one used the Adam algorithm . Fun fact: the Adam algorithm actually has nothing to do with Adam and Eve; instead, the name Adam is derived from “adaptive moment estimation”.

Batch size and Epoch are important concepts and correspond to the number of observations used at once for training, and the number of iterations to train the model.

6. Train/Fit the Model

Bodybuilding training is all about the Reps (repetitions) = Epochs (Source: Shutterstock)

Next, we must get the data ready for training — and then actually train it by calling “fit”:

# get data ready for training

drone_features = train_df.copy()

drone_labels = drone_features.pop(‘class’)

drone_features = np.array(drone_features)

Rather than a fixed number of epochs, it’s better to allow the model to stop when a metric stops improving for patience number of epochs:

callback = tf.keras.callbacks.EarlyStopping(monitor=’accuracy’, patience=10)

model.fit(drone_features, drone_labels, batch_size=BATCH_SIZE, epochs=EPOCHS, callbacks=[callback])

The fit method needs 2 DataFrames: the first containing the features only, and the second containing the labels (the class to be learned). We prepare 2 DataFrames, drone_labels and drone_features for this. Drone_features is created using the NumPy library (NumPy is the fundamental package for scientific computing in Python-the fundamental NumPy datatype is an n-dimensional arrays of homogeneous data types).

The other important trick here is that rather than calling fit with a fixed number of Epochs, we define a callback which stops learning whenever a condition is met-in this case the accuracy doesn’t improve for 10 iterations. I discovered that this can significantly improve the accuracy of the training and reduce the training time if the training occurs faster than expected.

Another thing to note is that every time the fit method is called, the model is updated with the data provided-by default it doesn’t start training from scratch each time

7. Evaluate the Model

Now it’s time to see how well the model performed on the data we left over for testing-we must prepare it as for the training data, and then call the evaluate method:

# prepare the test data for evaluation

drone_features_test = test_df.copy()

drone_labels_test = drone_features_test.pop(‘class’)

drone_features_test = np.array(drone_features_test)

print(‘testing…’)

print(model.evaluate(drone_features_test, drone_labels_test))

So, there we have the basic steps for batch learning with TensorFlow.

But how did this all perform/work out? I certainly had a few hypotheses myself, but even I was a bit surprised at what unfurled.

In the next part, we’ll explore performance metrics, show an example trace-and find out how long training actually takes.

Follow the series: Machine Learning Over Streaming Kafka® Data

Part 1: Introduction

Part 2: Introduction to Batch Training and TensorFlow

Part 3: Introduction to Batch Training and TensorFlow Results

Part 4: Introduction to Incremental Training With TensorFlow

Part 5: Incremental TensorFlow Training With Kafka Data

Part 6: Incremental TensorFlow Training With Kafka Data and Concept Drift

Originally published at https://www.instaclustr.com on July 26, 2023.

--

--

Paul Brebner
Open Source Journal

Open Source Technology Evangelist at Instaclustr (by Spot by NetApp). Previously, computer scientist working in R&D in distributed systems, performance, etc.