Big Data Battle: Streaming data approach using Apache Flume vs Apache Spark in Python (PySpark)

Teerawit Seekasamit
8 min readApr 7, 2020

--

ขอบคุณภาพจาก https://towardsdatascience.com/big-data-analysis-spark-and-hadoop-a11ba591c057

Big Data เป็นนิยามอธิบายถึงปริมาณของข้อมูลมหาศาล ลักษณะของข้อมูลจะเป็นแบบข้อมูลที่ไม่มีโครงสร้าง (unstructured data)และข้อมูลกึ่งโครงสร้าง (semi-structured data)ซึ่งเป็นข้อมูลที่เติบโตขึ้นอย่างรวดเร็วและเพิ่มขึ้นในปริมาณที่มาก ซึ่งในปัจจุบันข้อมูลได้เพิ่มมากขึ้นจากแหล่ง Social networks เช่น Twitter, Facebook และจากแหล่งอื่นๆมากมาย ซึ่งข้อมูลเหล่านี้จะต้องถูกจัดการด้วยเครื่องมือที่มีประสิทธิภาพ โดยเครื่องมือที่นิยมคือ Hadoop เป็นซอฟแวร์ที่ทำขึ้นเพื่อเป็น platform ไว้จัดเก็บข้อมูลและประมวลผล Big Data ซึ่ง Hadoop ยังมี software package มากมายเพื่อรองรับการทำงานที่สูงขึ้น ซึ่งในงานนี้เราจะกล่าวถึง Apache Flume, Apache Spark ที่มี library MLlib ใช้ในการประมวลผล Macheine Learning และ Spark Streaming ใช้สำหรับการประมวลผลแบบ Real-time ซึ่งทั้งสอง software package ช่วยให้ data engineers และ data scientists สามารถประมวลผลข้อมูลได้แบบ real-time จากหลายๆแหล่งข้อมูล อีกทั้งข้อมูลที่ถูกประมวลผลนี้ก็ยังสามารถนำมาออกมาเป็นไฟล์ฐานข้อมูล และ live dashboards ได้อีกด้วย

Apache Flume เป็นเครื่องมือไว้สำหรับ streaming ข้อมูล โดยประเภทของข้อมูลจะเป็นข้อมูลที่ไหลเข้ามาเรื่อยๆ อย่างรวดเร็ว ซึ่งรับมาจาก Data source จากหลายๆแหล่งข้อมูล ไปยัง Hadoop ecosystem
ข้อดีของ Apache Flume คือ เป็น open-source สามารถรับข้อมูลได้เยอะและมี delay น้อยมาก สามารถรองรับ data flow ได้หลายแบบ และมีเครื่องมือสำหรับ monitor
ส่วนข้อเสีย คือมีโอกาสเกิดข้อมูลซ้ำกันได้และข้อมูลในระดับ enterprise สามารถประเมินฮาร์ดแวร์ได้ยาก

PySpark คือ Python API ถูกพัฒนาขึ้นเพื่อให้ง่ายต่อการเขียน Spark programming Model ด้วยภาษา Python เพื่อสนับสนุน Apache Spark ซึ่ง Apache Spark เป็น software ที่ช่วยในการประมวลผลข้อมูลขนาดใหญ่หรือที่เรียกกันว่า Big Data โดยมีการประมวลผลที่มีความเร็วสูง โดยทำงานในรูปแบบของ MapReduce โดย PySpark ง่ายต่อการใช้งานร่วมกับภาษาอื่นๆ อีกทั้งมี Resilient Distributed Dataset (RDD) คือ Dataset ที่กระจายข้อมูลอยู่บนหลายๆ Node ของ Cluster ซึ่งมีความสามารถที่จะประมวลผลแบบ Parallel อีกทั้งยังมีความยืดหยุ่นเช่น หากมี node ใด node หนึ่งเกิดล่มขึ้นมา ขณะที่ node นั้นกำลังทำงานอยู่ spark ก็มีความสามารถที่จะกระจายงานให้กับ node อื่น ซึ่งมีข้อมูลแบบเดียวกันกับ node ที่ล่มอยู่ เราก็ยังสามารถประมวลผลต่อได้อย่างสบาย กล่าวคือมีคุณสมบัติ fault tolerance นั่นเอง ช่วยให้ Data Scientists ง่ายต่อการทำงาน มีความรวดเร็วในการประมวลผล เมื่อเทียบกับ frameworks อื่นๆ

Research: Streaming Data Analysis using Apache Cassandra and Zeppelin

จากงานวิจัยของ A. MadhaviLatha, G.Vijaya Kumar ได้ออกแบบและสร้าง Apache Flume ให้ทำงานร่วมกับ spark streaming โดยการ streaming ข้อมูลจาก Twitter ดึงข้อมูลไปเก็บที่ Apache Cassandra เมื่อเรียกข้อมูล ข้อมูลจะถูกนำไปวิเคราะห์โดยใช้แนวคิด Apache Zeppelin ผลลัพธ์ที่ได้จะแสดงบน Dashboard และวิเคราะห์ตรวจสอบความถูกต้องโดยใช้ JSON จากการศึกษา Zippelin สามารถวิเคราะห์ข้อมูล ที่มี apache flume, Cassandra และ spark streaming ช่วยในการดึงข้อมูล สามารถทำงานได้ดีกับ online applications เช่น Twitter มันจะเข้าไปดึงข้อมูล ข้อมูลจะใหลและถูกเก็บโดย Apache spark streaming และถูก dump ลงใน Cassandra data storage ข้อมูลจะถูกเก็บไว้และแสดงเป็น Dashboard ใน Hadoop 1.x name node เกิดความล้มเหลว แต่แก้ไขโดยใช้ Hadoop 2.x เมื่อข้อมูลถูกเก็บก็จะเกิดปัญหาข้อมูลสูญหายใน HDFS ซึ่งใน Cassandra จะไม่มีเกิดข้อผิดพลาดนั้น และ spark streaming มีประสิทธิภาพมากกว่า Apache Flume หลังจากการวิเคราะห์ข้อมูลในภายหลังสามารถวิเคราะห์ได้ และในอนาคตยังสามารถทำโดยใช้ multiple nodes ในรูปแบบ cluster ได้

Research: Distributed Data Streams Processing Based on Flume/Kafka/Spark

WangJun, a *, Wang Wenhao,b, Chen Renfei ได้ออกแบบและดำเนินการทำระบบ stream ข้อมูลแบบกระจายผ่าน Flume Kafka และ Spark ดึงข้อมูลและวิเคราะห์ข้อมูล และทำ data mining ด้าน business intelligence อย่างมีประสิทธิภาพ ทันเวลา และน่าเชื่อถือ โดยการใช้ Flume ทำให้สามารถดึงข้อมูลได้จากหลายแหล่งข้อมูลได้อย่างถูกต้อง คุณสมบัติของ Kafka คือสามารถประมวลผลงานได้ในปริมาณมาก รองรับการขยายตัว และ Spark Streaming มีชุดการประมวลผลข้อมูลขนาดใหญ่ที่มีประสิทธิภาพ ทนทานต่อความผิดพลาดและการดึงข้อมูลขนาดใหญ่แบบ real-time สามารถวิเคราะห์ข้อมูล Streaming Data จำนวนมากได้อย่างรวดเร็ว ในงานวิจัยนี้ผลการวิเคราะห์ข้อมูลในองค์กร สามารถปรับกลุ่มประชากรที่เป็นเป้าหมายการโฆษณาและอื่นๆ ที่เกี่ยวข้องกับระบบทำงานได้อย่างเสร็จสิ้นตามการประมวลผลข้อมูล Streaming Data ได้อย่างมีประสิทธิภาพและถูกต้อง อีกทั้งสามารถแยกข้อมูลที่เกี่ยวข้องโดยการวิเคราะห์ข้อมูลคำสั่งซื้อ ช่วยให้องค์กรปรับกลยุทธ์การตลาดและปรับปรุงประสิทธิภาพขององค์กรได้

Streaming data approach using Apache Flume

การใช้งานส่วนนี้จะเป็นการนำ Streaming Twitter Data ไว้เป็น processing data ซึ่งเป็นหน้าที่ของ Apache Flume แล้วสร้าง target output บน HDFS

ขอบคุณภาพจาก https://www.edureka.co/blog/apache-flume-tutorial/

1. Hadoop Installation with Centos 7

ก่อนที่จะใช้งาน Apache Flume ต้องติดตั้ง Hadoop ในระบบก่อน เพื่อใช้เป็น HDFS และคุณต้องมั่นใจว่า ระบบได้เปิดการใช้งานสำหรับ port ต่างๆ ตามความเหมาะสม ( 8080, 8088, 9000, 9870) ให้เรียบร้อยเสียก่อน

สามารถทำแบบ step by step ตาม GitHub ข้างล่างได้เลย พร้อมคลิป youtube พกมาประกอบด้วย

Let’s Go!

#update your system — Centos 7
yum clean all
yum install epel-release -y
yum update -y

#install java wget ant
yum install -y java-1.8.0-openjdk wget ant
java -version

#create user ‘hadoop’ and ssh-keygen, run one per cmd
useradd -m -d /home/hadoop -s /bin/bash hadoop
passwd hadoop
su hadoop
cd
whoami #check my current user (hadoop)
ssh-keygen -t rsa
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
chmod 600 ~/.ssh/authorized_keys
ssh localhost
cat ~/.ssh/authorized_keys #read output of authrized_keys

#install hadoop 3.2.1
wget https://downloads.apache.org/hadoop/common/stable/hadoop-3.2.1.tar.gz
tar
-zxvf hadoop-3.2.1.tar.gz
mv hadoop-3.2.1 hadoop

#set environment variable of user hadoop, เพิ่ม env เข้าไปในไฟล์ .bashrc
vi ~/.bashrc

.bashrc file

#exit and login with user ‘hadoop’ again to be effective from all env above
exit
su hadoop
cd
whoami

#you can check which version of Hadoop you are using.
hadoop version

#modify configuration file ‘hadoop-evn.sh’ and add env for JAVA_HOME
vi $HADOOP_HOME/etc/hadoop/hadoop-env.sh

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08–0.el7_7.x86_64/jre

#setup Hadoop Configuration Files : core-site.xml, hdfs-site.xml,
mapred-site.xml, yarn-site.xml

cd $HADOOP_HOME/etc/hadoop
vi core-site.xml

core-site.xml

vi hdfs-site.xml

hdfs-site.xml

vi mapred-site.xml

mapred-site.xml

vi yarn-site.xml

yarn-site.xml

#create NameNode and DataNode directories in /home/hadoop/
mkdir -p ~/hadoopdata/hdfs/{namenode,datanode}

#check the storage directories
hdfs namenode -format

#run service Hadoop web browser with port 9870
cd /home/hadoop/hadoop/sbin
start-dfs.sh

#open web browser : public_ip:9870

#run ResourceManager daemon and NodeManager daemon with port 8088
start-yarn.sh

#open web browser : public_ip:8088

#ลองสร้าง directory ชื่อว่า /user/ts บน HDFS
hdfs dfs -mkdir -p /user/ts

click menu Utilities -> Browse the file system and found that there is a directory named ‘user’, then click it
You will see directory named ‘ts’. That’s perfect.

Optional : ไปลองเล่นต่อก็ได้นะครับ
#test upload a file into HDFS directory
hdfs dfs -put ~/.bashrc /user/ts
#copy the files from HDFS to your local file systems
hdfs dfs -get /user/ts /tmp/
#delete the files and directories
hdfs dfs -rm -f /user/ts/text.tx
hdfs dfs -rmdir /user/ts

2. Set up Apache Flume Agent

สิ่งที่ต้องมีก่อน คือ API Key และ API Secret ของ Twitter API ซึ่งจะนำไปใช้ในไฟล์ twitter.conf บน Flume Agent เพื่อสามารถติดต่อไปยัง Twitter API พร้อมส่ง request ร้องขอขออนุญาตใช้ข้อมูลพร้อมยืนยันตัวตนด้วย Key & Token
วิธีขอสามารถเข้าเวบนี้: How to get API Keys and Tokens for Twitter

สามารถทำแบบ step by step ตาม GitHub ข้างล่างได้เลย พร้อมคลิป youtube

Let’s Go!

#make as user ‘hadoop’
#download Apache Flume 1.9.0

su hadoop
cd
wget
https://downloads.apache.org/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -zxf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin flume

#add env into .bashrc file
vi .bashrc

export FLUME_HOME=/home/hadoop/flume
export PATH=$PATH:$FLUME_HOME/bin

#exit and login user ‘hadoop’ again
exit
su hadoop
cd

#Go to dir ‘lib to check whether there are 3 jar file for twitter
#twitter4j-core-x.jar
#twitter4j-stream-x.jar
#twitter4j-media-support-x.jar

cd /home/hadoop/flume/lib
ls twitter4j-*

#สิ่งนี้สำคัญมากเพราะหากไม่ลบ จะทำให้ processing ของ streaming twitter data จะมีปัญหาได้ ฉะนั้นเพื่อหลีกเลี่ยงการเกิดขึ้นของ unexpected issue

rm guava-11.0.2.jar

#create twitter.conf file, you can copy from here.
cd /home/hadoop/flume/conf
vi twitter.conf

TwitterAgent.sources.Twitter.keywords คือ search word ที่เราสนใจ
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://localhost:9000/user/ts/tweets คือต้องการเก็บ target output ไปไว้ directories ที่ไหนบน HDFS อย่าลืมสร้างด้วย

#Use the ‘jps’ command to see the running Hadoop daemons
jps

#create directory named ‘twitters’ under /user/ts/, where the Twitter data should be stored.
hdfs dfs -mkdir -p /user/ts/tweets #create
hdfs dfs -ls /user/ts #list

#Fetch the twitter data into the HDFS cluster
#Start Flume agent

flume-ng agent -n TwitterAgent -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=INFO,console

หากต้องการหยุด streaming data สามารถกดปุ่ม ctrl+c

#check the contents of the tweet data
hdfs dfs -ls /user/ts/tweets
hdfs dfs -cat /user/ts/tweets/FlumeData.xxxxxxxxxx

Streaming data approach using Apache Spark in Python (PySpark)

การใช้งานส่วนนี้ สามารถอ่านรายละเอียดเพิ่มเติมฉบับภาษาไทยได้ ที่นี่ ซึ่งได้อ้างอิงต้นฉบับมาจาก https://www.analyticsvidhya.com/blog/2019/12/streaming-data-pyspark-machine-learning-model/ และสามารถเยี่ยมชม คลิปวีดีโอ ที่เราได้จัดขึ้นมาเพื่อเข้าใจภาพรวมของการใช้งานส่วนนี้ให้มากขึ้น

Introduction

ขอบคุณภาพจาก https://cdn.analyticsvidhya.com/wp-content/uploads/2019/12/overview.png
  • ทุกวินาทีมีการส่ง Tweets มากกว่า 8,500 Tweets รูปภาพ มากกว่า 900 รูปภาพถูกอัพโหลดบน Instagram มีการโทร Skype มากกว่า 4,200 ครั้ง มีการโทรค้นหาโดย google มากกว่า 78,000 ครั้งและมีการส่ง email มากกว่า 2 ล้านฉบับ
  • เราจะรวบรวมข้อมูลในระดับนี้ได้อย่างไร เราจะรู้ได้อย่างไรว่า machine learning pipeline ยังคงประมวลผลลลัพธ์ทันที่มีการสร้างและรวบรวมข้อมูล สิ่งเหล่านี้เป็นงานสำคัญในอุตสาหกกรมที่ท้าทายที่ต้องเผชิญ และ เหตุใดแนวคิด Streaming data จึงได้รับความสนใจมากขึ้นระหว่างองค์กร
  • ได้เรียนรู้ว่าข้อมูลแบบสตรีมคืออะไร เข้าใจพื้นฐานของการสตรีมแบบ Spark แล้วทำงานกับชุดข้อมูลที่เกี่ยวข้องกับอุตสาหกรรมเพื่อนำข้อมูลแบบสตรีมมิ่งไปใช้โดยใช้ Spark

What is Streaming Data?

  • Streaming data จะไม่มีจุดเริ่มต้นหรือจุดสิ้นสุดแบบต่อเนื่อง ข้อมูลนี้สร้างขึ้นทุกๆ วินาที จากข้อมูลหลายแหล่งและจำเป็นต้องประมวลผลและวิเคราะห์โดยเร็วที่สุด ข้อมูลสตรีมจำนวนมากต้องได้รับการประมวลผลแบบ real-time เช่นการค้นหาของ Google

What is Spark?

  • Spark เป็นตัวที่ประมวลผลข้อมูลคล้ายกับ MapReduce แต่ spark ทำงานบน memory ซึ่งเร็วขึ้น10–100 เท่า สามารถเขียน program ด้วยภาษา python, R, scala เพื่อใช้ในการประมวลบางอย่างและมี library หลายชนิดให้เลือกใช้งาน
    เช่น Spark SQL, Spark Streaming, MLib และ GraphX

Let’s Go!

#update your system, no need if you have done already
yum clean all
yum install epel-release -y
yum update -y

#install java wget ant, no need if you have done already
yum install -y java-1.8.0-openjdk wget ant
java -version

#Disable SELinux
sudo setenforce 0

#Download Spark 1.6.0, uncompress tar, and create a symbolic link
cd /opt
wget
http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz
tar -xzf spark-1.6.0-bin-hadoop2.6.tgz
ln -s /opt/spark-1.6.0-bin-hadoop2.6 /opt/spark
#you will see new directories color green which is a symbolic link

#set environment variable into your .bash_profile
echo ‘export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.242.b08-0.el7_7.x86_64/jre’ >> ~/.bash_profile
echo ‘export SPARK_HOME=/opt/spark’ >> ~/.bash_profile
echo ‘export PATH=$PATH:$SPARK_HOME/bin’ >> ~/.bash_profile

#exit and login instance again to check your current environment.
cat ~/.bash_profile

#install nmap-ncat for being able to read and write data across networks
yum install nc -y

#python version 3+ is required
yum install -y https://centos7.iuscommunity.org/ius-release.rpm
yum update
yum install -y python36u python36u-pip

alias python=’/usr/bin/python3.6'

#install pyspark and numpy with pip3
pip3 install pyspark
pip3 install numpy

#start pyspark and open web browser : public_ip:8080
cd $SPARK_HOME
./sbin/start-master.sh

open web browser : public_ip:8080

#get twitter sentiments data
cd /opt/spark
wget
https://raw.githubusercontent.com/lakshay-arora/PySpark/master/spark_streaming/datasets/twitter_sentiments.csv

#create twitter_sentiment_analysis.py and copy from here
vi twitter_sentiment_analysis.py

อย่าลืมแก้ที่ line 42 ให้เป็น
my_data = spark.read.csv(‘/opt/spark/twitter_sentiments.csv’, schema=my_schema, header=True)

Open 2 sessions.
#first session (right)
nc –lk 4321

#second session (left)
python3 /opt/spark/sentiment_analysis_streaming.py localhost 4321

Finally, type the text in the first terminal (right) and you will get the predictions in real-time in the other terminal:

Try sending the message

TWEET_APP finally reached 200 followers on #twitch. follower hypu #stream tomorrow!

TWEET_APP @user hey, white people: you can call people ‘while’ by @user #rare #identity

ภาพเคลื่อนไหว: การแสดงผลของ spark streaming ระหว่าง 2 sessions หลังจากส่งข้อความ
Big Data Battle: Streaming data approach using Apache Flume vs PySpark

--

--

Teerawit Seekasamit

All I need is Time, A moment that is Mine, while I’m in between.