EXPEDIA GROUP TECHNOLOGY — DATA

Apache Spark Structured Streaming — Output Sinks (3 of 6)

Straight Outta Spark Streaming

Neeraj Bhadani
Expedia Group Technology

--

Concentric circles receding into the distance
Photo by Matthew Feeney on Unsplash

In previous posts of this series, we discussed an overview as well as input sources for Apache Spark™️ Streaming. This time, we discuss the available range of output sinks.

You may also be interested in my other posts on Apache Spark.

Output Sinks

In Spark Streaming, output sinks store results into external storage.

  • Console sink: Displays the content of the DataFrame to console. In this series, we have only used console sink, refer to previous posts for details.
  • File sink: Stores the contents of a DataFrame in a file within a directory. Supported file formats are csv, json, orc, and parquet.
  • Kafka sink: Publishes data to a Kafka® topic.
  • Foreach sink: Applies to each row of a DataFrame and can be used when writing custom logic to store data.
  • ForeachBatch sink: Applies to each micro-batch of a DataFrame and also can be used when writing custom logic to store data.

It’s time to get our hands dirty; let’s discuss each output sink in detail.

Setup

We use the same csv data files as in the input sources post of this series to create a streaming DataFrame from a file source. Please refer to this blog for more details on the code below.

Output sink — Console

We covered the console output sink multiple times in a previous post.

Output sink — File

The file sink stores the contents of a streaming DataFrame to a specified directory and format. We use initDf (created above) and apply a simple transformation before storing it to the file system.

Transformation

Perform a very simple transformation by selecting a few columns from initDF

val resultDf = initDF.select("Name", "Date", "Open", "Close")

Output to file sink

Note: File sink only supports append output mode.

Store the content of DataFrame resultDf using file sink.

We store the results into csv files. We also use checkpoint, which we discuss in a separate blog post. Execute the code above and check the output folder output/filesink_output .

On left window, we can see output CSV file and data files. On right window we can see content of CSV file.

The output of our resultDf DataFrame is stored in directory output/filesink_output . Check out the selected columns (Name, Date, Open, Close) in the csv on the right side of the snapshot.

Now change the file format to json and review the output. The content of our resultDf DataFrame is stored in a json file.

On left window, we can see output JSON file and data files. On right window we can see content of JSON file.

You can try orc and parquet formats as well. For easy reference, you can find the complete code on GitHub.

Output sink — Kafka

With the kafka sink, we publish the content of our streaming DataFrame to a Kafka topic using the initDf DataFrame defined in the Setup section above. You can refer to this to install Kafka and create a Kafka topic.

Transformation

Kafka expects data in a column name value, so let's concatenate all the column values and store them in one value column.

Output to Kafka sink

We pass the value "kafka” to format() and specify other necessary Kafka information like "kafka.bootstrap.servers” and the "topic” to which we publish.

Execute our streaming application

We use the command below to consume the data which our streaming application published to Kafka.

kafka-console-consumer --bootstrap-server localhost:9092 --topic testConsumer
On right window, we can see and data files. On left window we can see data published to kafka topic.

In the screenshot above, as soon as we drop a file into data/stream (a folder in the right window), we see our transformed output on Kafka’s console on the left window (which is pipe | separated). For easy reference, you can find the complete code on GitHub.

Output sink — foreachBatch

So far we discussed sinks where the output system was already defined like file, kafka, or console. What if we would like to store data in any arbitrary storage like a NoSQL DB (for example MongoDB) or a Relational DB (like MySQL). By using foreach and foreachBatch we can write custom logic to store data. foreach performs custom write logic on each row and foreachBatch performs custom write logic on each micro-batch.

Using foreachBatch, we write each micro-batch to storage defined in our custom logic. In this case, we store the output of our streaming application to MySQL DB.

Transformation

Select a few columns from initDF

Output to foreachBatch sink

foreachBatch takes a function that expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch. First, create a function with custom write logic to save a micro-batch to MySQL.

We specified all the necessary details of MySQL DB like URL, DB name (training), Table name (test), User name (root), and Password.

Use the savetoMySQL() function to save our streaming DataFrame to MySQL.

We use the foreachBatch output sink and pass the saveToMySql() function to store data in MySQL. Start the application, ingest the data, and check the result on MySQL.

On right window, we can see data files. On left  window we can see data ingested into mySQL DB.

In the screenshot above, we dropped the MSFT_2017.csv file into data/stream on the right window and see the number of records for Microsoft™️ in training.test MySQL table on the left window.

On right window, we can see data files. On left window we can see data ingested into mySQL DB.

Here we drop the GOOGL_2017.csv file into data/stream on the right window and see the number of records for Google in training.test MySQL table on the left window. For easy reference, you can find the complete code on GitHub.

Output sink — foreach

If you have made it this far, great! Now we discuss our last sink for this blog post. The foreach output sink performs custom write logic to each record in a streaming DataFrame. If foreachBatch is not an option, e.g. in continuous processing mode or if a batch data writer does not exist, then we can use the foreach sink for custom write logic.

To use foreach we need to implement 3 methods (open, process, and close).

  • open: function to open connection
  • process: write data to the specified connection
  • close: function to close connection

Output to foreach sink

We use initDF to read data from a File source. Then we create an instance of the ForeachWriter class and implement the open(), process(), and close() methods. We simply print records to console, so we use println() statements in the process() function.

Use the ForeachWriter instance defined above to write data using the foreach sink.

We used the foreach output sink and provide an instance of customWriter defined above. Execute the streaming application and check the result.

On right window, we can see data files. On left window we can see data printed on output console.

As soon as we drop the file MSFT_2017.csv into folder data/stream on the right-hand window, we see output on the left window defined by process method in the foreach output sink. For easy reference, you can find the complete code on GitHub.

The next post in this series covers Checkpoints and Triggers.

Here are other blogs on Apache Spark Structured Streaming series.

I hope you enjoyed learning about output sinks used to output data from Spark Streaming!

References

This blog post is provided for educational purposes only. Certain scenarios may utilize fictitious names and test data for illustrative purposes. The content is intended as a contribution to the technological community at large and may not necessarily reflect any technology currently being implemented or planned for implementation at Expedia Group, Inc.

--

--