Ingesting Raw Data with Kafka-connect and Spark Datasets

Ronald Ángel
The Startup
Published in
6 min readOct 15, 2019
Photo by Fabio Ha on Unsplash

In this blog post, I will explain how we use Kafka-connect and spark orchestrated by platforms like Kubernetes and airflow to ingest raw data. You will get some insights about the advantages of storing your source data as it comes and how to ease data ingestion when you need early data exploration and a business case is not entirely defined.

Why do you need to define the structure of your data prior to define your business case structure?

Why ingest raw data into a data lake?

When you work on product innovation, you will encounter some situations when a product is still being defined (discovery phase) but data from multiple sources linked to the business case is being already collected. Thus, making all the collected data available to anyone either to analyze or explore comes up as a need. Also, as the business case is not defined, assumptions regarding schemas are not only problematic but unreasonable, the data should have in the future whatever schema is needed and consumers could define this depending on their own purposes.

Furthermore, for a product, currently multiple real-time sources including FTP, web sockets, rest services, and external queues are consumed in real-time, then an extra challenge exists: data should be collected, stored securely without data losses and with lowest possible latency. Consequently, this opens the space to restrict access to this data layer to only people interested in data collection and early exploration, then create a serialized more manageable structure to be exposed to the data science team.

As a result, a data lake ingestion process with two layers is created: a secure layer that keeps all the raw data as it comes used only for the data ingestion team and some selected data scientist, and a serialized data layer that is easy to query by all the people interested.

Proposed Infrastructure

Before talking about our ingestion processes is important to describe some components that make them happen. We run an architecture based on CI/CD with Kubernetes and monitor our data metrics using tools like Prometheus and Grafana. However, we will keep the metrics topic for a further post. So, the main components within our infrastructure are:

Openshift and Gitlab for CI/CD: OpenShift is our container application platform and GitLab our continuous integration and continuous delivery (CI/CD) tool. Both, allow us to improve our software release process and speed up our deployments. Furthermore, through these two platforms, we deploy containers that can run our data ingestion pipelines.

Kafka: To support any future use case we must build a reliable event-based system that handles a big volume of data with low latency, scalability, and fault-tolerance. Therefore, Kafka is the platform used to coordinate multiple producers from real-time sources and consumers that ingest data to our internal data lake.

S3: An Objects Storage is used to centralize the data stores, harmonize data definitions and ensure good governance. This platform handles scalability and provides fault-tolerance storage for our pipelines, making the data lake ingestion creation process faster.

Airflow: Our pipelines are scheduled, managed and monitored using airflow. Moreover, we control the status of our jobs and handle retry strategies automatically through this platform.

Producing Data to Kafka

Using the infrastructure described previously, we have multiple components reading from external sources: some of them are real-time sources delivering data via queues, WebSockets or rest services; others are data locations like sFTP that we read periodically to fetch information. Consequently, we deploy multiple Kafka producers each delivering data to a different topic that will contain the raw data produced by the source. However, there is a challenge at this point: uniformity, we must guarantee a homogeneous data structure and allow data ingestion processes to run transparently when we write messages to multiple Kafka raw topics. Then, all the messages are produced as .json.gzip and contain these general data fields:

raw_data: the data as it comes from the producer.

metadata: producer metadata needed to track the message source.

ingestion_timestamp: timestamp when the message was produced. This is used later on for data partitioning.

This is an empty record example:

{
"raw_data": {},
"metadata":{"thread_id":0,"host_name":"","process_start_time":""},"ingestion_timestamp":0
}

Using Kafka-connect to store raw data

As we described before the first raw data layer is written to the data lake, this layer gives us flexibility not only within for our technical processes but also for our business definitions because the information is available for analysis from the beginning. Then, since we have Kafka in place, using Kafka-connect allows us to perform this raw data layer ETL without writing a single line of code. Thus, the s3 sync is used to read data from the raw topics and produce data to S3.

Raw Data Ingestion

Kafka-connect collects information as it comes using the org.apache.kafka.connect.json.JsonConverter and and writes data using the io.confluent.connect.storage.partitioner.TimeBasedPartitioner, following this example you can understand how to configure this connector step by step. This pipeline works stably. However, some additional parameters were needed to tweak the Kafka-connect ingestión:

topics.regex: We keep a uniform name for all the raw topics with the suffix _raw. Therefore, we can have a single connector configuration template file to create the connectors for multiple topics.

"topics.regex": ".*raw$"

flush.size: Since we receive tiny messages, small files could be produced to s3, then configuring a bigger flush size will prevent this problem. Besides, the ingestion linger-timestamp should be configured as well: rotate.interval.ms and rotate.schedule.interval.ms.

Example:

"flush.size": "180000","rotate.interval.ms": "300000","rotate.schedule.interval.ms": "300000"

Kafka-connect always produce based wither on the buffer size or the rotate interval, then increasing these parameters leads to bigger message size.

path.format: We want the data to be partitioned using:

"timestamp.field": "ingestion_timestamp

then the following format is defined:

"'date'=YYYY-MM-dd".

Partitioning the data by date makes the ingestion process and future implementation of parquet partitioned queries faster.

Using spark to serialize data and expose Hive tables

As yet, we are storing raw data into s3. However, this layer is not accessible to the entire data science team. So, for the sake of security and usability multiple spark jobs orchestrated by airflow periodically read data from the s3 buckets at the year/month/day level, infer the dataset schema, serialize the data and store:

a). Snappy parquet files partitioned by year/month/day to S3.

b). An external Hive table from the file location.

spark job orchestration

The process followed by this spark job is described as following:

  1. Create a raw table
  • Reads data from a bucket using the .json read function.
  • Validates raw data schema.
  • Writes this dataset as parquet partitioned by the current ingestion timestamp year, month and day.
  • Create a Hive table from that location.

2. Creates a flatten (inferred inner schema) data table

  • Creates a data frame using only the raw_data field.
  • Infers the data schema using the spark dataset API.
  • Flats the schema format to a single level.
  • Writes this dataset as parquet partitioned by the current ingestion timestamp year, month and day.
  • Create a Hive table from that location.

Finally, for some jobs that run daily, data must be written using the spark partition dynamic. Thus, records will be overwritten only on the current day partition. For this, the following spark-hive configuration is used:

val ss = SparkSession.builder
.config(conf = conf)
.appName(appName)
.config("spark.sql.warehouse.dir", warehouseLocation)
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
.enableHiveSupport()
.getOrCreate()

After the spark finishes, data from each source is serialized as parquet and exposed as partitioned Hive tables to the data science team.

Final words

This pipeline allows us to democratize our data access and ingestion even when a business case is not defined. This gives us flexibility in our exploration process and time to market capabilities that let us keep on building new and big products.

Finally, we understand that there are multiple options to ingest data depending on the data volume and our solution is only one of them. Hence, we wanted to present this architecture that suits our requirements and has a good performance for our data volume.

--

--