Apache Flume과 Cloud Hadoop사용하기

Suewoon Ryu
NAVER CLOUD PLATFORM
16 min readSep 23, 2020

이 페이지에서는 Flume에대해 간략히 설명하고, 서버의 로그를 Cloud Hadoop의 HDFS에 저장하는 Flume Topology를 구성해봅니다. Cloud Hadoop 1.2 클러스터가 이미생성되어 있다고 가정합니다.

Cloud Hadoop 생성하러 가기

개요

Figure1. Flume Agent

공식 페이지에 따르면 Flume은 아래와 같이 정의되어있습니다.

distributed, reliable, available system for collecting, aggregating and moving large amount of log data from many different sources to a centralized data store

성격

  • distributed: topology를 어떻게 구성하느냐에 따라 달라질 수 있지만, 여러개의flume agent끼리pipe를만들수있습니다. 보통, agent끼리 연결할 때에는 Avro 타입의 sink, source(next hop)를 사용합니다.
  • reliable: flume agent 안에서 이벤트는 source, channel, sink 라는 컴포넌트를 따라서 이동합니다. 이벤트가 sink로 전달되기 전까지는 channel에서 이벤트가 사라지지 않고, 이를 보장하기위해 flume은 transactional approach를 도입했습니다.
  • available: channel로 disk-backed 시스템을 사용한다면 agent에 fault가 발생해도 source에서 channel로 전달된 데이터를 복구할 수 있습니다.

용도

  • from many ~ to a centralized: flume agent는 여러개의노드에서 로그를가져와서 최종적으로는 중앙화된 저장소에 보관 할 수 있습니다.
  • collecting, aggregating, moving: 로그를 수집하여 합칠 수 있습니다. 그리고 그 과정중에 selector, interceptor를활용하여 이벤트의 형태를 바꿀 수 있습니다. 수집한 이벤트는 다음agent로전달하거나 최종 sink에 저장 할 수 있습니다.

컴포넌트

Event

flume agent에 의해 옮겨지는 데이터의 기본 단위입니다. 선택적으로 이벤트에는 header값을 줄 수 있으며, 보통 header는 이벤트의 내용을 확인하고 변경하기 위해 사용합니다.

Flume Agent

JVM 프로세스로 source, channel, sink 컴포넌트를 호스트 합니다. 이벤트는 Agent를 통해서 외부의source로부터next-hop의목적지로 흘러갈 수 있습니다.

  • source: client로 부터 전달받은 이벤트를 소비합니다. source가 이벤트를 받으면 1개이상의 채널에 넘기게 됩니다.
  • channel: 이벤트의 임시 저장소입니다. source와 sink를 이어주는 역할을하며 이벤트 흐름(flow)의durability를 보장하는데 중요한 역할을 합니다.
  • sink: 채널로부터이벤트를제거하고 flow의 next-hop에 전달합니다.

Cloud Hadoop과 Flume agent 사용하기

Topology

간단하게 아래처럼 flume topology를 구성해 볼 수 있습니다. 여기서는 각 서버의 vmstat 결과를 수집하여 Cloud Hadoop HDFS에 Sink해보도록 하겠습니다.

Figure2. Flume Topology 예시

Flume 설치

Server 상품을 이용하여 로그를 수집 할 서버를 3대생성 했습니다. 주의할 점은 각 서버를 Cloud Hadoop이포함된ACG에생성해야 한다는 것입니다.

  • log-gen-001 / centos-7.8–64 / 2vCPU, 8G Mem
  • log-gen-002 / centos-7.8–64 / 2vCPU, 8G Mem
  • log-gen-003 / centos-7.8–64 / 2vCPU, 8G Mem

Flume 패키지를 다운받아서 ~/apps경로에 압축을 풀면 설치는 끝입니다.

mkdir ~/downloads ~/appscd ~/downloads
wget http://mirror.navercorp.com/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
tar -xvf apache-flume-1.9.0-bin.tar.gz
mv apache-flume-1.9.0-bin ~/apps/
cd ~/apps
ln -s apache-flume-1.9.0-bin flume

사전작업

Cloud Hadoop을 HDFS Sink로 쓰기위해서 아래와 같은 사전작업이 필요합니다.

1. 웹서버와 HDFS 통신을 위해 …

각 로그 서버가 HDFS 네임노드 호스트와 통신하기 위해서는 각 호스트의 IP, 이름을 /etc/hosts 에 등록해야 합니다. 해당 정보는 Cloud Hadoop의edge 노드 (e.g. e-001-xxx )의 /etc/hosts 에서확인할수있습니다.

2. HDFS Sink를 사용하기위해…

이 topology에서는HDFS sink를 사용합니다. 따라서 Flume Agent가실행되는노드에서 hadoop common jar 라이브러리가 필요하게됩니다. 그리고 namenode HA를 위해서 네임서비스를 사용하려면 hdfs-site.xml , core-site.xml 같은 구성파일도 필요합니다.

2–1. hadoop-client 패키지 다운로드

우선 .repo 를 구성합니다. vi /etc/yum.repos.d/ambari-hdp-1.repo

[HDP-2.6-repo-1]
name=HDP-2.6-repo-1
baseurl=http://public-repo-1.hortonworks.com/HDP/centos7/2.x/updates/2.6.5.0

path=/
enabled=1
gpgcheck=0
[HDP-UTILS-1.1.0.22-repo-1]
name=HDP-UTILS-1.1.0.22-repo-1
baseurl=http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.22/repos/centos7

path=/
enabled=1
gpgcheck=0

hadoop-client 를 설치하면 /usr/hdp/current/hadoop-client/lib 아래에서필요한 .jar 라이브러리를 사용할 수 있습니다.

$ yum clean all 
$ yum install hadoop-client

2–2. hadoop config 설정

$FLUME_CLASS_PATH/conf 아래 hadoop config 파일을 다운받습니다.

$ cd ~/apps/flume/conf
$ curl -u $AMBARI_ID:$AMBARI_PASS -H "X-Requested-By: ambari" -X GET http://$AMBARI_URI:8080/api/v1/clusters/$CLUSTER_NAME/services/HDFS/components/HDFS_CLIENT\\?format=client_config_tar> -o hdfs_client_conf.tar.gz
$ tar -xvf hdfs_client_conf.tar.gz
$ rm -f hdfs_client_conf.gz

Flume 구성변경

각 flume agent에서 아래처럼 구성값을생성합니다.

cd ~/apps/flume/conf
cp flume-conf.properties.template flume.conf
cp flume-env.sh.template flume-env.sh

flume.conf

여기서 agent의 이름, 각 컴포넌트를 정의 합니다. fooAgent 라고 명명했습니다. HDFS의 sink path에는 네임서비스가 들어간 경로를 사용하면 됩니다. Cloud Hadoop에서는 클러스터명이 네임서비스가 됩니다. hdfs-site.xml 에 노드 정보가 포함되어 있으므로 어느 네임노드가 Active상태인지 명시하지 않아도됩니다.

fooAgent.sources = Exec
fooAgent.channels = MemChannel
fooAgent.sinks = HDFS

fooAgent.sources.Exec.type = exec
fooAgent.sources.Exec.command = /usr/bin/vmstat 1
fooAgent.sources.Exec.channels = MemChannel

fooAgent.channels.MemChannel.type = memory
fooAgent.channels.MemChannel.capacity = 10000
fooAgent.channels.MemChannel.transactionCapacity = 1000

fooAgent.sinks.HDFS.channel = MemChannel
fooAgent.sinks.HDFS.type = hdfs
fooAgent.sinks.HDFS.hdfs.path = hdfs://$CLUSTER_NAME/user/hduser/flume/events/
fooAgent.sinks.HDFS.hdfs.fileType = DataStream
fooAgent.sinks.HDFS.hdfs.writeFormat = Text
fooAgent.sinks.HDFS.hdfs.batchSize = 1000
fooAgent.sinks.HDFS.hdfs.rollSize = 0
fooAgent.sinks.HDFS.hdfs.rollCount = 10000

flume-env.sh

사전작업에서 설치한 hadoop client의경로를 FLUME_CLASSPATH 에 추가합니다.

export JAVA_HOME="/usr/lib/jvm/java"
export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
FLUME_CLASSPATH="/usr/hdp/current/hadoop-client/lib"

프로세스 시작

각 flume agent를시작합니다.

cd ~/apps/flume/.bin/flume-ng agent --conf ./conf/ -f conf/flume.conf -Dflume.root.logger=DEBUG,console -n  fooAgent....20/09/09 12:32:22 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false20/09/09 12:32:22 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp20/09/09 12:32:52 INFO hdfs.HDFSEventSink: Writer callback called.20/09/09 12:32:52 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp20/09/09 12:32:52 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622342911.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.159962234291120/09/09 12:32:55 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false20/09/09 12:32:55 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp20/09/09 12:33:25 INFO hdfs.HDFSEventSink: Writer callback called.20/09/09 12:33:25 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp20/09/09 12:33:25 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622375913.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.159962237591320/09/09 12:33:28 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false20/09/09 12:33:28 INFO hdfs.BucketWriter: Creating hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp20/09/09 12:33:58 INFO hdfs.HDFSEventSink: Writer callback called.20/09/09 12:33:58 INFO hdfs.BucketWriter: Closing hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp20/09/09 12:33:58 INFO hdfs.BucketWriter: Renaming hdfs://xxxxx/user/hduser/flume/events/FlumeData.1599622408915.tmp to hdfs://xxxxx/user/hduser/flume/events/FlumeData.159962240891520/09/09 12:34:01 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false20/09/09 12:34:01 INFO hdfs.BucketWriter: Creating

아래와 같이 HDFS에서확인할수있습니다.

$ hadoop fs -ls /user/hduser/flume/events/Found 17 items-rw-r--r--   2 root hdfs       3089 2020-09-09 12:25 /user/hduser/flume/events/FlumeData.1599621914876-rw-r--r--   2 root hdfs       3093 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621946882-rw-r--r--   2 root hdfs       2931 2020-09-09 12:26 /user/hduser/flume/events/FlumeData.1599621979885-rw-r--r--   2 root hdfs       3091 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622012888-rw-r--r--   2 root hdfs       2931 2020-09-09 12:27 /user/hduser/flume/events/FlumeData.1599622045890-rw-r--r--   2 root hdfs       3091 2020-09-09 12:28 /user/hduser/flume/events/FlumeData.1599622078893-rw-r--r--   2 root hdfs       2930 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622111895-rw-r--r--   2 root hdfs       3093 2020-09-09 12:29 /user/hduser/flume/events/FlumeData.1599622144897-rw-r--r--   2 root hdfs       3092 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622177899-rw-r--r--   2 root hdfs       2931 2020-09-09 12:30 /user/hduser/flume/events/FlumeData.1599622210902-rw-r--r--   2 root hdfs       3093 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622243904-rw-r--r--   2 root hdfs       2932 2020-09-09 12:31 /user/hduser/flume/events/FlumeData.1599622276906

마치며

이번 페이지에서는 아주 간단한 Flume Topology를 구성해 봤습니다. 실제 운영 환경에서는 2개 이상의 flume agent를 pipe하면서 interceptor를 활용하여 이벤트를 변환합니다. source, channel, sink에는 다양한 타입이 존재하며 kafka를 channel, sink로 많이 사용 하기도 합니다. HDFS를 최종 저장소(terminal sink)로 사용하는경우 위와 같이 설정 하면 됩니다.

--

--