Building a Simple ETL Pipeline with Kafka and Clickhouse in Go for ML
A simple ETL pipeline using Golang, Kafka, and Clickhouse to stream JSON data in parallel into a database to perform ML to predict the temperature, from my experience
As a follow-up to two articles I posted about Kafka and Clickhouse, here’s a simple application to demonstrate how we could stream realtime data through Kafka to be validated and transformed in Clickhouse to do some basic time-series ML — specifically, how we could use weather data of previous years to forecast the temperature. I confess that I am not very familiar with the mathematics of ML and can only present observations gained through trial-and-error, but the final model was more accurate than I expected.
A Bit about the Technologies:
Kafka is a distributed streaming service that supports multiple producers/consumers. The Kafka broker breaks the “topics” that parties can publish/subscribe to into multiple partitions that are replicated across the cluster. While it does not have in-built ETL tools like Apache Spark, Kafka has remarkably low latency and can stream from multiple sources. I am using Zookeeper to manage the cluster. Although this program will read from a file because it’s easier to control, we would ideally stream data directly from the National Weather Service and other sources into Kafka.
Clickhouse is a column-based relational database. In addition to SQL-like commands, it also includes a simple stochastic linear regression method that we will be using. I did not expect and do not think the weather data to be linear, but I wanted to keep the ML simple enough to be done purely in Clickhouse.
Both are open-source and free to use. I am using their https://hub.docker.com/r/wurstmeister/kafka and https://hub.docker.com/r/clickhouse/clickhouse-server/ images on Docker. I am also using the segmentio/kafka-go and clickhouse/clickhouse-go APIs for Go.
I chose to use Go because of its simple concurrency model, which I wanted to highlight in this project. Channels in Go force synchronization between threads (goroutines) and can prevent data races without explicitly using locks and mutexes (which also exist in Go), while maintaining the flexibility and efficiency of concurrent execution. Here is a simple example of channels in practice, for interested readers.
Architecture:
Essentially, we separate the extraction, transformation, and loading among multiple goroutines that pass data to each step through channels. JSON-string data received from Kafka is passed along a channel to be validated and unmarshaled into a struct, which is finally passed through another channel to be inserted into Clickhouse. The struct works in place of a schema to validate each field’s data type.
We read from Kafka with consumer groups rather than readers for specific partitions so that we have more flexibility in the number of threads. Kafka’s internal broker will manage the offsets for each partition to ensure that we read each message only once, even from parallel threads.
The data is further transformed in Clickhouse to prepare it for time-series ML. We use the temperature of previous hours to determine the next hour’s, so we organize the data appropriately to easily feed into the Clickhouse ML method, which takes columns for each parameter to determine their weights. We use a window of the past 20 or so temperatures and their time offsets from the start of the window in addition to the month to determine what the next temperature will be at any given date. I took inspiration from https://towardsdatascience.com/ml-approaches-for-time-series-4d44722e48fe.
Implementation:
set-up.sh (ETL)
#!/user/bin/bash
topic="temperature_data"
echo "kafka-topics.sh --delete --topic ${topic} --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash
echo "kafka-topics.sh --create --topic ${topic} --partitions 10 --bootstrap-server localhost:9092" |docker exec -i kafka /bin/bash
table="temperatures"
# echo "show tables;" | docker exec -i clickhouse clickhouse-client
echo "truncate table if exists ${table};" |docker exec -i clickhouse clickhouse-client
ch_createtable="create table if not exists ${table} (
Date_time DateTime('America/Los_Angeles'),
Air_temp Float32,
) engine=MergeTree() ORDER BY Date_time;"
# "create table if not exists ${table} (
# Date_time DateTime('America/Los_Angeles'),
# Sea_level_pressure Float32,
# Altimeter Float32,
# Air_temp Float32,
# Relative_humidity Float32,
# Dew_point_temperature Float32,
# Wind_direction UInt16,
# Wind_speed Float32,) engine=MergeTree() ORDER BY Date_time;"
echo ${ch_createtable} | docker exec -i clickhouse clickhouse-client
Create a Kafka topic with about as many partitions as you plan to have goroutines reading to/from Kafka. Then, create a table in Clickhouse to hold the temperature data we will read from Kafka later. It’s not necessary to specify the time zone of the data, but I keep it to remind myself of how to do it. Set the date as the primary key to organize the data by.
json-to-kafka.go (ETL)
The weather data I downloaded was in csv format, so I ran it through an online converter and downloaded the json produced (https://csvjson.com/csv2json).
// https://www.weather.gov/lox/observations_historical
// https://csvjson.com/csv2json
// https://pkg.go.dev/github.com/segmentio/kafka-go#section-documentation
// https://pkg.go.dev/encoding/json#Marshal
package main
import (
"github.com/segmentio/kafka-go"
"context"
"encoding/json"
"os"
"fmt"
"time"
)
type Entry struct {
Date_time string
Sea_level_pressure float32
Altimeter float32
Air_temp float32
Relative_humidity float32
Dew_point_temperature float32
Wind_direction uint16
Wind_speed float32
}
func main() {
const topic = "temperature_data"
const fileName = "KLAX-data_temp-only.json"
var read_data []byte
var entries []Entry
var limit int = 10000
done_ch:=make(chan bool)
var threads int= 10
read_data = readData(fileName)
json.Unmarshal(read_data, &entries) //json to struct slice
fmt.Printf("How many entries to use? (/%d)", len(entries))
fmt.Scan(&limit)
entries = entries[:limit]
for i:=0; i<threads; i++ {
go writeMessages(entries, threads, i, topic, done_ch) //write to the topic
}
for i:=0; i<threads; i++{ //stall until all threads finish
<-done_ch
}
} //end main
//Read the given file and output a []byte of its contents
func readData(fileName string) ([]byte){
data, err := os.ReadFile(fileName)
if err != nil {
panic(err)
}
return data
}
//cont below...
As mentioned earlier, we can use a struct to validate each entries’ data types; unmatched fields are discarded. Be sure to capitalize the field names; while it does not break anything in this program, I ran into issues in latter code because the fields were not exported for an API method to access.
Anyway, we can use the json package’s Unmarshal method to convert the json file to a slice of our “Entry” struct. After trimming the slice to the desired length, we have goroutines to evenly distribute the work in writing to the Kafka topic. Each goroutine opens a Writer to write all its assigned entries after marshaling the structs back into json strings.
//Write its portion of the Entry slice to the topic and signal when finished
func writeMessages(entries []Entry, threads int, thread_num int, topic string, done_ch chan bool){
//modeled off documentation
writer := &kafka.Writer{
Addr: kafka.TCP("localhost:9092", "localhost:9093", "localhost:9094"),
Topic: topic,
Balancer: &kafka.LeastBytes{}, //distribute msgs evenly across partitions
BatchTimeout: 10 * time.Millisecond, //default is 1 second
}
var batch_size int = 0
msgs := []kafka.Message{}
var count int = 0
for i:=thread_num; i < len(entries); i+=threads {
count++
//convert from struct back to json
entry_bytes, _ := json.Marshal(entries[i]) // marshal to json, as a []byte
batch_size++
if batch_size <1000 { //sending batches with individual messages is SLOW
msgs = append(msgs, kafka.Message{ Value: entry_bytes,})
}else{
batch_size = 0
//https://stackoverflow.com/questions/37570665/what-does-mean-when-coming-directly-after-a-slice
err := writer.WriteMessages(context.Background(), msgs...)
if err!= nil {
panic(err)
}
msgs = []kafka.Message{}
msgs = append(msgs, kafka.Message{ Value: entry_bytes,})
}
}
err := writer.WriteMessages(context.Background(), msgs...) //write the remainder of the last batch
fmt.Printf("WROTE(%d): %v\n", count, thread_num)
if err = writer.Close(); err != nil {
panic(err)
}
done_ch <- true //signal done writing
}
Two key points to note:
- Don’t use too small batch sizes.
- Batches are flushed every 1 second by default (BatchTimeout option).
Overlooking these points will result in slow writes. For 10 threads to write 1000 messages each, compare:
go run json-to-kafka.go 0.60s user 0.58s system 10% cpu 11.726 total
for the default timeout of 1 second and 100 messages per batch, and:
go run json-to-kafka.go 0.52s user 0.53s system 63% cpu 1.659 total
for a timeout of 10 milliseconds and 1000 messages per batch. Now imagine waiting for 100,000 messages to be written with the first set of parameters (it’s about 15 minutes :( ).
We use channels for each goroutine to signal to main when they have finished writing.
kafka-to-ch.go (ETL)
This is the main portion of the ETL pipeline.
While (or after) our weather data is being written to Kafka, we can start reading from Kafka and inserting it into the database. Here, I unmarshal into a struct with only the date and temperature fields for convenience, since quite a few rows in the original weather data are missing one or several of the ignored fields, causing this program to drop those entries during validation.
// https://pkg.go.dev/github.com/ClickHouse/clickhouse-go/v2
// https://pkg.go.dev/os/signal
package main
import (
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/segmentio/kafka-go"
"time"
"context"
"encoding/json"
"errors"
"os/signal"
"os"
)
// json-to-kafka.go
type Entry struct {
Date_time string
// Sea_level_pressure float32
// Altimeter float32
Air_temp float32
// Relative_humidity float32
// Dew_point_temperature float32
// Wind_direction uint16
// Wind_speed float32
}
func main() {
conn := connect_ch() //establish connection to clickhouse
const number_routines_kafka = 10
const number_routines_ch = 10
const number_routines_tr = 10
const topic = "temperature_data"
const table = "temperatures"
var readers []*kafka.Reader = []*kafka.Reader{}
signal_ch := make(chan os.Signal)
msgs := make(chan []byte)
entries := make(chan Entry)
for i:=0; i<number_routines_kafka; i++ {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092", "localhost:9093", "localhost:9094"},
GroupID: "go-consumer-group",
Topic: topic,
MaxBytes: 10e6, // per batch
QueueCapacity: 1000,
ReadBatchTimeout: 10 * time.Millisecond,
// more options available
})
readers = append(readers, reader)
}
go func (signal_ch chan os.Signal){
signal.Notify(signal_ch, os.Interrupt) //catch a ctrl-c
} (signal_ch)
go func (readers []*kafka.Reader, msgs chan []byte){
for i:=0; i<number_routines_kafka; i++ {
go readWithReader(readers[i], msgs, i) //read from Kafka
}
} (readers, msgs)
for i:=0; i<number_routines_tr; i++ {
go transform(msgs, i, entries) //validate data
}
for i:=0; i<number_routines_ch; i++ {
go batchInsert(conn, entries, table, i) //write to Clickhouse
}
<-signal_ch //hang until connections ready to be cleaned up
for i,reader := range readers{
fmt.Printf("Closing reader: %d\n",i)
if err := reader.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}
if err := conn.Close(); err != nil {
fmt.Println("failed to close clickhouse connection:", err)
}
fmt.Println("yay")
} //end main
//Connect to Clickhouse
func connect_ch() (clickhouse.Conn) {
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%d", "127.0.0.1", 19000)},
Auth: clickhouse.Auth{
Database: "default",
Username: "default",
Password: "",
},
Debug: false,
})
if err != nil {
panic(err)
}
fmt.Println(conn.Ping(context.Background()))
v, err := conn.ServerVersion()
fmt.Println(v)
if err != nil {
panic(err)
}
return conn
}
//Consume messages as a consumer group and send them along the []byte channel
func readWithReader(reader *kafka.Reader, ch chan []byte, routine_num int){
var count int = 0
for {
entry, err := reader.ReadMessage(context.Background())
if err != nil {
break
}
ch <- entry.Value
count++
fmt.Printf("READ(%d): %v\n", count, routine_num)
}
fmt.Printf("DONE READ(%d): %v\n", count, routine_num)
}
//cont below...
This program is a bit more complex than the previous; here’s the overview from main:
- Multiple Kafka readers of the same consumer group are used in goroutines of readWithReader() and pass the raw consumed messages through the msgs channel as a byte array. At the same time, goroutines of transform() read from this channel to validate and unmarshal each message into an Entry struct to pass through the entries channel to the final set of goroutines, batchInsert(), to be inserted into Clickhouse.
- The program also has a goroutine running to catch a ctrl-c (SIGINT) and close the readers and Clickhouse connection for graceful shutdown. Main will hang after creating all the goroutines, waiting for this signal to be passed through the signal_ch channel, before proceeding with the clean-up.
//(copied from main)
go func (signal_ch chan os.Signal){
signal.Notify(signal_ch, os.Interrupt) //catch a ctrl-c
} (signal_ch)
//...
<-signal_ch //hang until connections ready to be cleaned up
for i,reader := range readers{
fmt.Printf("Closing reader: %d\n",i)
if err := reader.Close(); err != nil {
fmt.Println("failed to close reader:", err)
}
}
// other clean up...
A few points to note:
- The consumer group of the readers will be automatically created if it doesn’t exist already when running this program. You can run
kafka-consumer-groups.sh --describe --bootstrap-server localhost:9092 --all-groups
in the shell of the running kafka container to see the offsets of messages consumed from the topic.
- The validation during the transform step is only checking that the relevant fields are non-zero (missing fields when unmarshaling are initialized with the data type’s default value). Depending on the data, some entries may be unnecessarily discarded (i.e. if there are actually 0 degree temperatures in the original data).
//Unmarshal the messages from the []byte channel into Entries and validate them
//before sending them along the Entry channel
func transform(ch chan []byte, routine_num int, entries chan Entry){
var count int = 0
for val := range ch{
var temp Entry
err := json.Unmarshal(val, &temp) //to struct
if err != nil{
//mention but continue on
fmt.Println("Error unmarshaling: " + string(val))
continue
}
if validify(&temp) != nil{
fmt.Printf("Error transforming data: %v\n", temp)
continue
}
entries <- temp
count++
fmt.Printf("TRANSFORMED(%d): %v\n", count, routine_num)
}
}
//Check the json string converted to struct has valid data (nonzeroes)
func validify(entry *Entry) (error){
//convert date time to something clickhouse can convert to DateTime
res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
if err != nil{
return err
}
//if any entry is 0 (missing data), discard it
// if(entry.Sea_level_pressure==0||entry.Altimeter==0||
// entry.Air_temp==0||entry.Relative_humidity==0||entry.Dew_point_temperature==0||
// entry.Wind_direction==0||entry.Wind_speed==0){
if(entry.Air_temp==0){
return errors.New("invalid entry")
}
entry.Date_time = res.Format(time.DateTime)
return nil
}
//cont below...
- Parse() from the time package is useful for parsing nonstandard date formats in strings. Create a format string that represents Jan 2, 2006 at 3:04 PM in the desired format to parse (for example, “01/02/06–03:04PM”) (https://pkg.go.dev/time#Parse).
//copied
res, err := time.Parse("01/02/06-03:04PM", entry.Date_time)
- AppendStruct() from the Clickhouse-go API is useful for unmarshaling our Entry struct into rows and appending them to the batch (versus calling Append() and explicitly passing each corresponding column in).
// Insert batches into the table from the Entry channel
func batchInsert(conn clickhouse.Conn, ch chan Entry, table string, routine_num int){
var count int = 0
for val := range ch{
batch, _ := conn.PrepareBatch(context.Background(), "INSERT INTO " + table)
batch.AppendStruct(&val) // STRUCT FIELDS MUST START WITH UPPERCASE or will not work
count++;
batch.Send()
fmt.Printf("INSERTED(%d): %d\n", count, routine_num)
}
}
//program end
ML in Clickhouse
predict-set-up.sh (ML)
In addition to the table the previous steps have filled up, we need to derive a table from this data to train the model with.
Here’s a fraction of what our data table looks like now:
Here’s a fraction of what we want to (and will) feed into our model (table model_data):
For each date, the temp array stores the past temperatures in the window (30 temperatures), followed by the temperature of the specific date, and the time array stores the time offsets from the beginning of the window for each. For example, date 8/8/24 3:53 (the first row in the table) has in its temp array the temperatures from dates 8/7/24 2:53, 8/7/24 3:53, 8/7/24 4:53, etc. Since the start of this window is at 8/7/24 2:53, the time offsets for each entry start at 0, then +60 minutes (3600) from 2:53 -> 3:53, etc. It is important to store the time offsets because, as seen from the original data, the temperatures were not always taken at regular intervals.
To create this table, let’s make a helper table to find the the values for each window. Here’s what we want:
Min_bound represents the start of each window. This format will make grouping together the dates and temperatures for each window later easy.
Windowing functions come in handy.
-- (copied)
MIN(Date_time) OVER (
ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW
) AS min_time,
This will get the date of the entry window_size entries before it. Then, for each date and its min_time, add all dates and their temperatures that fall between the range (using a WHERE clause). While this results in multiple copies of each date, they each belong to a different window, denoted by min_bound.
#!/user/bin/bash
window_size=30 #DO NOT CHANGE w/out updating model command
table="temperatures"
min_bound_table="CREATE TABLE IF NOT EXISTS helper
ENGINE = Memory
AS SELECT
B.min_time AS min_bound,
A.Date_time,
A.Air_temp
FROM
(
SELECT
Date_time,
Air_temp
FROM ${table}
) AS A,
(
SELECT
min_time,
Date_time,
Air_temp,
delta,
a.Date_time
FROM
(
SELECT
Date_time,
Air_temp,
MIN(Date_time) OVER (ORDER BY Date_time ASC ROWS BETWEEN ${window_size} PRECEDING AND CURRENT ROW) AS min_time,
Date_time - min_time AS delta
FROM ${table}
) AS a
) AS B
WHERE (A.Date_time >= B.min_time) AND (A.Date_time <= B.Date_time)
ORDER BY Date_time ASC"
echo ${min_bound_table} | docker exec -i clickhouse clickhouse-client
#cont ...
After creating the helper table, we can use GROUP BY min_bound to divide the entries into their respective windows, from which we can transpose the rows into a single column with groupArray(). The result is an array of temperatures and time offsets for each date.
CREATE TABLE IF NOT EXISTS model_data
ENGINE = Memory
AS SELECT
MAX(Date_time) AS time,
groupArray(Air_temp) AS temp,
groupArray(Date_time - min_bound) AS delta_t
FROM helper
GROUP BY min_bound
ORDER BY time ASC
However, we need to be careful about the earliest 30 entries in our final table: there were not enough entries to fill their windows, so the earliest entry was duplicated instead. These entries can be identified as having zero time offsets for entries beyond the min bound of the window.
ALTER TABLE model_data (DELETE WHERE (delta_t[2]) = 0)
Our data is now in an appropriate format to feed to our model.
Clickhouse’s stochastic linear regression function takes in the step size, regularization coefficient, batch size, and SG method, then the columns of the target value, followed by each parameter. Any method besides “Adam” gives NaN’s, and the regularization does not seem to have much impact. I decided on step of 0.00004, coefficient 0.15, and batch size 3.
step=0.00004 # 0.00050
norm=0.15
batch=3 # 20
method="Adam"
train=75000 # number of entries to train on
make_model="CREATE TABLE IF NOT EXISTS temp_model
ENGINE = Memory
AS SELECT stochasticLinearRegressionState(${step}, ${norm}, ${batch}, '${method}')
(temp[31], temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS state
FROM (SELECT temp, delta_t, time FROM model_data ORDER BY time ASC LIMIT ${train})"
echo ${make_model} | docker exec -i clickhouse clickhouse-client
I also decided to include the month as a parameter because of a likely correlation between the month and the temperature. In the example code, I train the model with only the first 75,000 of the 100,000 entries.
Now, to test the model against the full dataset. These latest entries were not part of the training set.
run_model="WITH (
SELECT state
FROM temp_model
) AS model
SELECT
time,
evalMLMethod(model, temp[1], temp[2], temp[3], temp[4], temp[5], temp[6], temp[7], temp[8], temp[9], temp[10],
temp[11], temp[12], temp[13], temp[14], temp[15], temp[16], temp[17], temp[18], temp[19], temp[20],
temp[21], temp[22], temp[23], temp[24], temp[25], temp[26], temp[27], temp[28], temp[29], temp[30],
delta_t[1], delta_t[2], delta_t[3], delta_t[4], delta_t[5], delta_t[6], delta_t[7], delta_t[8], delta_t[9], delta_t[10],
delta_t[11], delta_t[12], delta_t[13], delta_t[14], delta_t[15], delta_t[16], delta_t[17], delta_t[18], delta_t[19], delta_t[20],
delta_t[21], delta_t[22], delta_t[23], delta_t[24], delta_t[25], delta_t[26], delta_t[27], delta_t[28], delta_t[29], delta_t[30],
delta_t[31], toMonth(time)) AS predicted,
temp[31] AS actual
FROM model_data ORDER BY time DESC LIMIT 200"
echo ${run_model} | docker exec -i clickhouse clickhouse-client
The predicted values are in the middle, and the actual values on the right.
Pretty good for this interval, even though it misses the 10 degree spike at 8/7 18:53. I saved the full output as a .csv file to graph later with a command like this:
echo "sql command..." | docker exec -i clickhouse clickhouse-client --format CSV> result.csv
Results:
The predictions shown above are from my best model. Its max and min predictions were pretty much accurate, and the average differences between each predicted temperature and the actual were almost net-zero.
Before showing its graph in Excel, here’s some of my earlier results, for context:
step=0.00055, norm=0.1, batch=10
Yikes, those spikes. Ignoring the obvious outliers, the upper bound of the predicted values looks like a flat line.
step=0.00010, norm=0.15, batch=5, method=”Adam”
This one looks better. The outliers are less extreme and the predicted values follow the general trend.
And now, my final model:
step=0.000040, norm=0.15, batch=3, method=”Adam”
Outliers have been reduced, and the predictions follow the sinusoidal trend more closely. If we split the graphs in Excel of the data from the final attempt, we can notice that the spikes are actually present in the original data, but the model just over-exaggerates them, like at the sudden dip on June, 2016. Ignore the y-axis values in this case:
Over these three graphs, I reduced both the step size and the batch size. Big step sizes tended to yield wildly varying results between successive runs and extreme outliers. Bigger batch sizes tended to reduce the magnitude of these outliers, however it would also average out the predictions, resulting in a flat line that changed minimally over the course of a year. (Interestingly enough, only the upper bound was flattened; the minimum values continued to follow the correct trend during the colder months of the years). Reducing the amount of “flattening” by reducing the batch size, in addition to reducing the magnitude of outliers by reducing the step size, yielded the most accurate results.
Adding more relevant factors, such as the humidity or air pressure, will likely increase the model’s accuracy, but my main focus was the ETL pipeline anyway, so this is good enough.
Thanks for reading! I hope you learned something useful.
Useful Links:
https://pkg.go.dev/github.com/segmentio/kafka-go#section-documentation
https://pkg.go.dev/github.com/ClickHouse/clickhouse-go/v2
https://pkg.go.dev/encoding/json#Marshal
https://www.weather.gov/lox/observations_historical
Time series analysis/forecasting:
https://towardsdatascience.com/ml-approaches-for-time-series-4d44722e48fe