Streaming data into Kafka S01/E01 — Loading CSV file
(Last update: 2023–10–07)
Ingesting data files in Apache Kafka is a very common task. Among all the various file formats that we can find, CSV is probably the most popular one to move data between different systems. This is due to its simplicity and to the fact that it can be used to export or import data from one (small) database to another.
A CSV file is nothing more than a text file (with a .csv
extension). Each line of the file represents a data record and each record consists of one or more fields, separated by a comma (or another separator).
Here is a chunk of example :
40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987;U2;Rock
In Addition, a CSV file can contain a header line to indicate the name of each field.
title;album;duration;release;artist;type
In this article, we will see how to integrate such a file in Apache Kafka. Of course, we are not going to reinvent the wheel. Many tools already exist to do this and are available in open-source.
We will use the Kafka Connect framework, which is part of the Apache Kafka project. Kafka Connect has been designed to move data in and out of Kafka using connectors.
Kafka Connect File Pulse connector
The Kafka Connect File Pulse connector makes it easy to parse, transform, and stream data file into Kafka. It supports several formats of files, but we will focus on CSV.
For a broad overview of FilePulse, I suggest you read this article :
For more information, you can check-out the documentation here.
How to use the connector
The easiest and fastest way to get started with Connect the FilePulse connector is to use the Docker image available on Docker Hub.
$ docker pull streamthoughts/kafka-connect-file-pulse:latest
You can download the docker-compose.yml
file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.
$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d
Once you have started the Docker containers, you can check that the connector is installed on the Kafka Connector worker available on http://localhost:8083
.
$ curl -s localhost:8083/connector-plugins|jq '.[].class'| grep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
You can also install the connector either from GitHub Releases Page or from Confluent Hub.
Ingesting data
First, let’s create the JSON configuration file (connect-config.json
) to run the FilePulse connector:
{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-00",
"tasks.max": 1
}
Next, use the curl
command to create a connector usingthe configuration create above:
curl \ 4
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
-d @./connect-config.json | jq
Then, check that the connector is working properly by running :
$ curl -s localhost:8083/connectors/source-csv-filepulse-00/status|jq '.connector.state'
"RUNNING"
For the moment, our connector is up and running, but there’s no data to process. So, let’s copy a CSV file into the Docker container directory /tmp/connect-data
:
// (1) Download CSV file
$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
$ curl -sSL $GITHUB_REPO_MASTER/datasets/quickstart-musics-dataset.csv -o musics-dataset.csv
// (2) Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples/
$ docker cp musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv
Let’s check if we have some data into the output topic musics-filepulse-csv-00
:
$ docker run --tty \
--network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload
(output)
{
"headers": {
"array": [
{
"string": "title;album;duration;release;artist;type"
}
]
},
"message": {
"string": "Zoo Station;Achtung Baby;04:36;1991;U2;Rock"
}
}
Note: In the example above, we are using kafkacat to consume messages. The option
-o-1
is used to only consume the latest message.
As we can see, our topic contains one Avro message for each line of the CSV file. The reason for this is because we have configured our connector to use the LocalRowFileInputReader
(see: tasks.reader.class
configuration)
Each record contains two fields:
message
: This field is of typestring
and represents a single line of the input file.headers
: This field is of typearray
and contains the first header line of the input file. This field is automatically added by theRowFileInputReader
because we have configured it withskip.headers=1
.
Headers
Connect FilePulse adds some headers to each sent record that contains metadata about the original source file from which the data was extracted.
This can be useful for debugging but also for data lineage.
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .headers
(output)
[
"connect.file.name",
"musics-dataset-00.csv",
"connect.file.uri",
"file:/tmp/kafka-connect/examples/musics-dataset-00.csv",
"connect.file.contentLength",
"6588",
"connect.file.lastModified",
"1696681334000",
"connect.file.system.inode",
"26360981",
"connect.file.system.hostname",
"50eaed1b1bc1",
"connect.task.hostname",
"50eaed1b1bc1",
"connect.file.hash.digest",
"1466679696",
"connect.file.hash.algorithm",
"CRC32"
]
Parsing data
One of the main advantages of Connect FilePulse is that it allows you to define complex pipelines for analyzing, transforming and enriching data through the use of processing filters.
For this blog post, we are going to use the CSVFilter
to parse each line extracted from our input file. Also, because the first line of the CSV file is a header you can set the property extractColumnName
to name the record's fields based on the headers
field.
First, update the connector configuration as follows:
{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-01",
"filters":"ParseLine",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"tasks.max": 1
}
Then, create a new connector name source-csv-filepulse-01
:
$ curl \
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-01/config \
-d @./connect-config.json | jq
Finally, consume the connector output topic musics-filepulse-csv-01
:
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-01 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload
(output)
{
"album": {
"string": "Achtung Baby"
},
"artist": {
"string": "U2"
},
"duration": {
"string": "04:36"
},
"release": {
"string": "1991"
},
"title": {
"string": "Zoo Station"
},
"type": {
"string": "Rock"
}
}
Filtering data
Sometimes you may want to keep only the lines in a file that have a field with a particular value. For this article, let’s imagine we need to only keep the song from the band AC/DC.
To do this, we’ll extend our filter chain to use the DropFilter
, which discards all messages satisfying a given condition without throwing an exception.
First, update the connector configuration as follows:
{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-02",
"filters":"ParseLine, KeepACDC",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, 'AC/DC') }}",
"filters.KeepACDC.invert":"true",
"tasks.max": 1
}
The propery
if
is set with a Simple Connect Expression Language (SCEL) which is a basic expression language provided by the Connect FilePulse connector to access and manipulate record’s fields.
Then, create a new connector (so that our file can be reprocessed):
curl \
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-02/config \
-d @./connect-config.json | jq
Finally, you can check that you get the expected result by executing:
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \ 4
-b localhost:9092 \
-t musics-filepulse-csv-02 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload.artist
(output)
{
"string": "AC/DC"
}
Changing the field types
You have probably noticed that at no time have we defined the type of our fields. By default, the connector assumes that all fields are of type string and maybe you are happy with that. But in most cases, you will want to convert a few fields. For example, we can convert the field year
to be of type Integer
.
For doing this, you can use the AppendFilter
with the Simple connect Expression.
As previsouly, let’s create final connector configuration:
Lets’ create our final configuration :
{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-03",
"filters":"ParseLine, KeepACDC,ReleaseToInt",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, 'AC/DC') }}",
"filters.KeepACDC.invert":"true",
"filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.ReleaseToInt.field": "$value.release",
"filters.ReleaseToInt.value": "{{ converts($value.release, 'INTEGER') }}",
"filters.ReleaseToInt.overwrite": "true",
"tasks.max": 1
}
Then, create our new connector with namesource-csv-filepulse-03
:
curl \
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-03/config \
-d @./connect-config.json | jq
Finally, consume the topicmusic-filepulse-csv-03
the verify thet our data field was converted into int
:
docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-03 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.release
(output)
{
"int": 1980
}
The AppendFilter
is a very handy filter that allows us to quickly modify a record. For example, we could also use it to set the key of each record to be equal to the album name by adding the following configuration:
{
"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"
}
Note: For this blog post, we have used here the filter chain mechanism provided by the Connect File Pulse connector. But it is also possible to use the Kafka Connect Single Message Transformations (SMT) to perform the same task using the
org.apache.kafka.connect.transforms.Cast
.
Conclusion
We have seen in this article that it is fairly easy to load a CSV file into Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka.
If you find this project valuable, I kindly ask you to show your support by sharing this article and spreading the word📣. You can even show your support by giving a ⭐ on the GitHub repository.🙏
We also welcome contributions from the community. If you have any ideas or specific project needs, please feel free to reach out and propose them. You can actively contribute to the project by creating pull requests (PR).
Thank you very much.
Follow-me on Twitter/X : @fhussonnois