Apache Apex实战系列(二)---’’Hello World”实战

本博文作为Apache Apex实战系列第二篇,本文将教你构建一个简单的Apache Apex流式应用.本文将以大数据处理中最经典的示例--”Hello,world”开篇,进行”抛砖引玉”.

在动手构建经典“Hello,world” (好吧,或许你已经猜到了,大数据处理最经典示例不是WordCount吗?对的,堪称Big Data入门级别的“Hello,World”)之前,先简单了解下Apache Apex On YARN的一些基本原理,如提交的Apex应用是如何运行于Hadoop集群上等.


Apache Apex On YARN 原理简介

Apache Apex On YARN 简单示意图如下:

从图中可以看到Apex通过Checkpint机制保证容错

一个Apex应用从客户端提交至在Hadoop集群中运行,可简单概括为以下三点:

  1. 首先,Application Master从Yarn获取worker container,以便从物理上运行Apex操作符;
  2. Worker Containers 通过Pub/Sub机制发送数据
  3. Workers 定期发送心跳信息至Apex AM(Application Master)

首先Apex CLI客户端向Resource Manager发起请求,Resource Manager收到请求后,寻找合适的节点作为运行Apex应用程序的Application Master.整个过程中,该Application Master是第一个运行的Yarn Container,负责支撑整个Apex应用,如请求额外资源,必要时运行其它Containers等.

当Apex AM运行,AM决定所需运行的Worker Containers数量,并基于具体配置,将DAG逻辑计划翻译成物理计划.物理计划可视为执行层蓝图,结合并行度、数据本地性等,负责分配对应操作符Operators至Containers中.物理计划创建完后,AM为执行层从 YARN Resource Manager处请求资源.Apex在运行时可增强物理计划,因而可动态进行资源分配(在需要时可向YARN RM请求和释放Container,而不必在提交时静态设置).

当Apex AM获取所有请求的容器Container后,开启worker进程,每个worker进程通过心跳协议周期性与Apex AM进行通讯,并允许Apex AM追踪、监控每个worker的健康状况.Apex AM并不参与实际流式数据处理,流式数据处理去中心化,异步、分布式于Containers中处理,各个Worker Container通过发布-订阅方式进行数据传输.


Apache Apex Operator模型初探

了解完Apache Apex On YARN原理后,再来简单理解下Apex操作符Operator模型在整个Apex应用运行过程中的生命周期.

Apex Operator生命周期示意图

从图中可以看到,Operator一般从setup阶段开始,及组件开启阶段,当Operator部署时即调用,进行初始化如缓存创建,建立外部连接等.注意该方法并不会执行实际数据处理工作,仅为准备工作.

beginWindow 标记着流式窗口开始,为一循环反复回调,因为此时Operator处于处理执行模式.该过程中,可在输出端口触发数据或写入数据至外部系统,可在回调内触发元组如回放日志中已被处理过的元组,最典型的如在窗口时间周期内重置临时状态等.通过ActivationListener激活后进入beginWindow阶段.

emitTuples(input operator) 方法由引擎周期性调用,如Kafka连接器从消费者API接受数据并触发元组至输出端口.Apex引擎在流式窗口时间范围内将多次调用该方法,因而建议操作符处理逻辑非阻塞,并尽早返回控制权至Apex引擎.实际上,多数从外部系统读取数据的连接器运行在不同的IO线程,传输并将高可用数据放置于Buffer中.

InputPort.process 阶段,引擎将传递元组进行处理,在流式窗口之间周期内可重复多次,元组类型为Stream或用户给定Schema.可以是Java对象(POJO,Map等).与emitTuples类似,操作符逻辑不应阻塞,尽快返回控制权至Apex引擎,而不干涉整个DAG处理.

endWindow 阶段,当超出流式窗口时间周期,Apex引擎将调用该方法.执行数据开销较大操作,如数据库提交,刷写文件等.也可用于触发聚合数据结果,如在每个窗口内进行Count,或更新操作符内一些指标属性等.由ActivationListener将Operator重置为deactivate状态并进入teardown阶段.

teardow 与setup相反,teardown非处理数据流的一部分,并不应用于任意影响相关状态的操作中.如在该方法内进行数据库提交是非法的,并不能保证该方法被调用.当进程意外终止时,Apex引擎并不能调用该回调并执行操作符中的给定逻辑.


Apache Apex 代码实战--WordCount

Apache Apex相关原理暂时梳理以上几点,下面进行Apex流式处理代码实战,以经典的WordCount为例.

首先,定义下本次WordCount实例中Apex DAG的大概流向,如图:

WordCount 实例Apex DAG示意图 (本图 input~>linesConsole 仅为Debug)

Apex流式应用WordCount实例核心代码如下

其中Input对应文件读取LineByLineFileInputOperator, Output对应GenericFileOutputOperator的输出端口.整个DAG中Operator除了文本行解析器LineSplitter需要手动编写代码外,其余Operator无需手动实现,如文本读取LineByLineFileInputOperator、计数器UniqueCounter及文本输出GenericFileOutputOperator,可直接调用Apex API.注意在文本输出时需要指定字符转化方法,本例通过实现Converter<Object,?>接口设置字符串转化方法.


Apex运行WordCount实例效果

由于整个示例相对简单,下面我们来看下具体运行效果.在运行Apex实例代码前,我们先得造份测试数据,可以Apache License为例进行WordCount,具体步骤如下:

首先,上传测试数据至HDFS上,然后开启Apex CLI提交实例应用,通过Yarn命令行查看实例应用已运行

此处贴上Hadoop集群Web UI任务运行示意图

通过Web UI 界面查询运行日志

WordCount实例运行结果,如图


Are we done? Definitely we’re.

好吧,以免你疑惑,最后贴上LineSplitter代码

截图省去nonWordStr对应的POJO代码

PS:如果你对Apache Apex还有很多疑惑,或对本例中Apex应用配置文件配置项,如properties-sandbox.xml具体配置等存在疑惑,具体请参考Apache Apex官方文档:

Like what you read? Give Myles Ryan a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.