Apache spark streaming from csv file

Nitin Gupta
2 min readNov 30, 2018

--

Install spark hadoop, refer online instruction. I have “spark-2.3.2-bin-hadoop2.7” to refer in this demo.

Edit .bashrc file as well to include bin folder in path and start fresh console for further steps.

export PATH=$PATH:/usr/lib/spark-2.3.2-bin-hadoop2.7/bin

Create source file “Spark-Streaming-file.py” with source code as below. Code is self explanatory with comments. We are opening a read stream which is actively parsing “/tmp/text” directory for the csv files.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType

spark = SparkSession.builder.appName(“StructuredNetworkWordCount”).getOrCreate()

# Define schema of the csv
userSchema = StructType().add(“name”, “string”).add(“salary”, “integer”)

# Read CSV files from set path
dfCSV = spark.readStream.option(“sep”, “;”).option(“header”, “false”).schema(userSchema).csv(“/tmp/text”)

# We have defined the total salary per name. Note that this is a streaming DataFrame which represents the running sum of the stream.

dfCSV.createOrReplaceTempView(“salary”)
totalSalary = spark.sql(“select name,sum(salary) from salary group by name”)

# totalSalary = dfCSV.groupBy(“name”).sum(“salary”)

# All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode(“complete”)) to the console every time they are updated. And then start the streaming computation using start().

# Start running the query that prints the running counts to the console
query = totalSalary.writeStream.outputMode(“complete”).format(“console”).start()

query.awaitTermination()

Start spark streaming process. Execute below command in the folder where Spark-Streaming-file.py file is present.

$ spark-submit Spark-Streaming-file.py

Now our Spark streaming is waiting for csv files to be pushed to “/tmp/text” folder. Let’s move a file “1.csv” into “ /tmp/text” with below content.

nitin;10000
ram;20000
nitin;5000
mahesh;2000

Here we see the result

18/11/30 19:22:18 WARN Utils: Your hostname, nitin-Satellite-C850 resolves to a loopback address: 127.0.1.1; using 192.168.0.7 instead (on interface wlp2s0)
18/11/30 19:22:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
18/11/30 19:22:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform… using builtin-java classes where applicable
— — — — — — — — — — — — — — — — — — — — — -
Batch: 0
— — — — — — — — — — — — — — — — — — — — — -
+ — — — + — — — — — -+
| name|sum(salary)|
+ — — — + — — — — — -+
| nitin| 15000|
| ram| 20000|
|mahesh| 2000|
+ — — — + — — — — — -+

Conclusion

Hence our Spark session is actively parsing the folder for the files being added there and processing total salary as per our logic on data frame.

--

--