Streaming data into Kafka S01/E01 — Loading CSV file

Florian Hussonnois
StreamThoughts
Published in
8 min readAug 13, 2020
Photo by Ross Sokolovski on Unsplash

(Last update: 2023–10–07)

Ingesting data files in Apache Kafka is a very common task. Among all the various file formats that we can find, CSV is probably the most popular one to move data between different systems. This is due to its simplicity and to the fact that it can be used to export or import data from one (small) database to another.

A CSV file is nothing more than a text file (with a .csv extension). Each line of the file represents a data record and each record consists of one or more fields, separated by a comma (or another separator).

Here is a chunk of example :

40;War;02:38;1983;U2;Rock
Acrobat;Achtung Baby;04:30;1991;U2;Rock
Sunday Bloody Sunday;War;04:39;1983;U2;Rock
With or Without You;The Joshua Tree;04:55;1987;U2;Rock

In Addition, a CSV file can contain a header line to indicate the name of each field.

title;album;duration;release;artist;type

In this article, we will see how to integrate such a file in Apache Kafka. Of course, we are not going to reinvent the wheel. Many tools already exist to do this and are available in open-source.

We will use the Kafka Connect framework, which is part of the Apache Kafka project. Kafka Connect has been designed to move data in and out of Kafka using connectors.

Kafka Connect File Pulse connector

Connect File Pulse Logo

The Kafka Connect File Pulse connector makes it easy to parse, transform, and stream data file into Kafka. It supports several formats of files, but we will focus on CSV.

For a broad overview of FilePulse, I suggest you read this article :

For more information, you can check-out the documentation here.

How to use the connector

The easiest and fastest way to get started with Connect the FilePulse connector is to use the Docker image available on Docker Hub.

$ docker pull streamthoughts/kafka-connect-file-pulse:latest

You can download the docker-compose.yml file available on the GitHub project repository to quickly start a Confluent Platform with Kafka Connect and the FilePulse connector pre-installed.

$ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/docker-compose.yml
$ docker-compose up -d

Once you have started the Docker containers, you can check that the connector is installed on the Kafka Connector worker available on http://localhost:8083.

$ curl -s localhost:8083/connector-plugins|jq '.[].class'| grep FilePulse
"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"

You can also install the connector either from GitHub Releases Page or from Confluent Hub.

Ingesting data

First, let’s create the JSON configuration file (connect-config.json) to run the FilePulse connector:

{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-00",
"tasks.max": 1
}

Next, use the curl command to create a connector usingthe configuration create above:

curl \                                                                                                                                                                                  4 

-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-00/config \
-d @./connect-config.json | jq

Then, check that the connector is working properly by running :

$ curl -s localhost:8083/connectors/source-csv-filepulse-00/status|jq  '.connector.state'

"RUNNING"

For the moment, our connector is up and running, but there’s no data to process. So, let’s copy a CSV file into the Docker container directory /tmp/connect-data :

// (1) Download CSV file
$ export GITHUB_REPO_MASTER=https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/master/
$ curl -sSL $GITHUB_REPO_MASTER/datasets/quickstart-musics-dataset.csv -o musics-dataset.csv

// (2) Copy CSV file to docker-container
$ docker exec -it connect mkdir -p /tmp/kafka-connect/examples/
$ docker cp musics-dataset.csv connect://tmp/kafka-connect/examples/musics-dataset-00.csv

Let’s check if we have some data into the output topic musics-filepulse-csv-00 :

$ docker run --tty \
--network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload

(output)

{
"headers": {
"array": [
{
"string": "title;album;duration;release;artist;type"
}
]
},
"message": {
"string": "Zoo Station;Achtung Baby;04:36;1991;U2;Rock"
}
}

Note: In the example above, we are using kafkacat to consume messages. The option -o-1 is used to only consume the latest message.

As we can see, our topic contains one Avro message for each line of the CSV file. The reason for this is because we have configured our connector to use the LocalRowFileInputReader(see: tasks.reader.class configuration)

Each record contains two fields:

  • message: This field is of type string and represents a single line of the input file.
  • headers: This field is of type array and contains the first header line of the input file. This field is automatically added by the RowFileInputReader because we have configured it with skip.headers=1.

Headers

Connect FilePulse adds some headers to each sent record that contains metadata about the original source file from which the data was extracted.

This can be useful for debugging but also for data lineage.

docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-00 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .headers

(output)

[
"connect.file.name",
"musics-dataset-00.csv",
"connect.file.uri",
"file:/tmp/kafka-connect/examples/musics-dataset-00.csv",
"connect.file.contentLength",
"6588",
"connect.file.lastModified",
"1696681334000",
"connect.file.system.inode",
"26360981",
"connect.file.system.hostname",
"50eaed1b1bc1",
"connect.task.hostname",
"50eaed1b1bc1",
"connect.file.hash.digest",
"1466679696",
"connect.file.hash.algorithm",
"CRC32"
]

Parsing data

One of the main advantages of Connect FilePulse is that it allows you to define complex pipelines for analyzing, transforming and enriching data through the use of processing filters.

For this blog post, we are going to use the CSVFilter to parse each line extracted from our input file. Also, because the first line of the CSV file is a header you can set the property extractColumnName to name the record's fields based on the headers field.

First, update the connector configuration as follows:

{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-01",
"filters":"ParseLine",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"tasks.max": 1
}

Then, create a new connector name source-csv-filepulse-01 :

$ curl \
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-01/config \
-d @./connect-config.json | jq

Finally, consume the connector output topic musics-filepulse-csv-01 :

docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-01 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload

(output)

{
"album": {
"string": "Achtung Baby"
},
"artist": {
"string": "U2"
},
"duration": {
"string": "04:36"
},
"release": {
"string": "1991"
},
"title": {
"string": "Zoo Station"
},
"type": {
"string": "Rock"
}
}

Filtering data

Sometimes you may want to keep only the lines in a file that have a field with a particular value. For this article, let’s imagine we need to only keep the song from the band AC/DC.

To do this, we’ll extend our filter chain to use the DropFilter, which discards all messages satisfying a given condition without throwing an exception.

First, update the connector configuration as follows:

{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-02",
"filters":"ParseLine, KeepACDC",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, 'AC/DC') }}",
"filters.KeepACDC.invert":"true",
"tasks.max": 1
}

The propery if is set with a Simple Connect Expression Language (SCEL) which is a basic expression language provided by the Connect FilePulse connector to access and manipulate record’s fields.

Then, create a new connector (so that our file can be reprocessed):

curl \                                                            
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-02/config \
-d @./connect-config.json | jq

Finally, you can check that you get the expected result by executing:

docker run --tty  --network=host edenhill/kafkacat:1.6.0 kafkacat \                                                                                                                     4 

-b localhost:9092 \
-t musics-filepulse-csv-02 \
-C -J -q -o-1 \
-s key=s \
-s value=avro -r http://localhost:8081 | jq .payload.artist

(output)

{
"string": "AC/DC"
}

Changing the field types

You have probably noticed that at no time have we defined the type of our fields. By default, the connector assumes that all fields are of type string and maybe you are happy with that. But in most cases, you will want to convert a few fields. For example, we can convert the field year to be of type Integer .

For doing this, you can use the AppendFilter with the Simple connect Expression.

As previsouly, let’s create final connector configuration:

Lets’ create our final configuration :

{
"connector.class": "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.listing.class": "io.streamthoughts.kafka.connect.filepulse.fs.LocalFSDirectoryListing",
"fs.listing.directory.path": "/tmp/kafka-connect/examples/",
"fs.listing.interval.ms": "10000",
"fs.listing.filters": "io.streamthoughts.kafka.connect.filepulse.fs.filter.RegexFileListFilter",
"file.filter.regex.pattern": ".*\\.csv$",
"tasks.reader.class": "io.streamthoughts.kafka.connect.filepulse.fs.reader.LocalRowFileInputReader",
"skip.headers": "1",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.fs.clean.LogCleanupPolicy",
"tasks.file.status.storage.class": "io.streamthoughts.kafka.connect.filepulse.state.KafkaFileObjectStateBackingStore",
"tasks.file.status.storage.bootstrap.servers": "kafka:29092",
"tasks.file.status.storage.topic": "connect-file-pulse-status",
"offset.attributes.string": "name",
"topic": "musics-filepulse-csv-03",
"filters":"ParseLine, KeepACDC,ReleaseToInt",
"filters.ParseLine.extract.column.name": "headers",
"filters.ParseLine.trim.column": "true",
"filters.ParseLine.separator": ";",
"filters.ParseLine.type": "io.streamthoughts.kafka.connect.filepulse.filter.CSVFilter",
"filters.KeepACDC.type":"io.streamthoughts.kafka.connect.filepulse.filter.DropFilter",
"filters.KeepACDC.if":"{{ equals($value.artist, 'AC/DC') }}",
"filters.KeepACDC.invert":"true",
"filters.ReleaseToInt.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.ReleaseToInt.field": "$value.release",
"filters.ReleaseToInt.value": "{{ converts($value.release, 'INTEGER') }}",
"filters.ReleaseToInt.overwrite": "true",
"tasks.max": 1
}

Then, create our new connector with namesource-csv-filepulse-03 :

curl \                
-s -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-csv-filepulse-03/config \
-d @./connect-config.json | jq

Finally, consume the topicmusic-filepulse-csv-03the verify thet our data field was converted into int :

docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
-b localhost:9092 \
-t musics-filepulse-csv-03 \
-C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.release

(output)

{
"int": 1980
}

The AppendFilter is a very handy filter that allows us to quickly modify a record. For example, we could also use it to set the key of each record to be equal to the album name by adding the following configuration:

{
"filters.SetKey.type": "io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
"filters.SetKey.field": "$key",
"filters.SetKey.value": "{{ uppercase($value.album)}}"
}

Note: For this blog post, we have used here the filter chain mechanism provided by the Connect File Pulse connector. But it is also possible to use the Kafka Connect Single Message Transformations (SMT) to perform the same task using the org.apache.kafka.connect.transforms.Cast.

Conclusion

We have seen in this article that it is fairly easy to load a CSV file into Kafka without writing a single line of code using Kafka Connect. Also, the Connect File Pulse connector is a powerful solution that allows you to easily manipulate your data before sending it into Apache Kafka.

If you find this project valuable, I kindly ask you to show your support by sharing this article and spreading the word📣. You can even show your support by giving a ⭐ on the GitHub repository.🙏

We also welcome contributions from the community. If you have any ideas or specific project needs, please feel free to reach out and propose them. You can actively contribute to the project by creating pull requests (PR).

Thank you very much.

Follow-me on Twitter/X : @fhussonnois

--

--

Florian Hussonnois
StreamThoughts

Lead Software Engineer @kestra-io | Co-founder @Streamthoughts | Apache Kafka | Open Source Enthusiast | Confluent Kafka Community Catalyst.