Kafka 实时消息队列

Kafka

Kafka 是什么?

Kafka 是分布式实时,基于发布/订阅的消息队列系统。该系统的设计目标
1. 高吞吐率~ 从生成者发布消息,到消费者处理消息,每秒可以处理至少100K 的数据。
2. 数据持久性 ~ 保证大数据持久的访问时间。
3. 支持消息分区和发布式消费 ~ 保证消息可以顺序完成处理。
4. 实时完成数据处理 ~ 从生产到消费低延迟的处理时间。

为什么使用?

  1. 系统解耦 ~ war 应用负责显示产品和下单功能,生产消息并推送到broker,子系统从broker获取消息并处理。
  2. 避免数据丢失 ~ 从插入- 获取- 删除,消息队列的机制规定系统需要在处理过程中表达数据已经完成处理,消息才会完全移除。
  3. 扩展性 ~ 轻松添加多个broker 服务器,以提供更高的数据处理能力。
  4. 可恢复性 ~ Cluster里其中一个broker挂了,也不会影响整个系统。

Kafka 名词概念

  1. Broker ~ 由一或多个服务器组织成Kafka 集群,这些服务器是broker。功能包括管理topic,接受生产者消息,保存数据并让消费者处理数据。
  2. Topic ~ 每个消息都有一个标志,这个topic 就是标志如 “orderTopic”,”logTopic” 作为分别消息数据功能。
  3. Partition ~ 是一个物理的概念,每个topic 可以指定在一个或者多个partition数量。
  4. Offset ~ 保存在partition里的数据sequence ID。
  5. Producer ~ 负责生产消息数据。
  6. Consumer ~ 负责处理消费数据。

Kafka 架构

架构图

Producer 生成者使用Push的模式把消息发送到broker,而Consumer 消费者使用Pull的模式从broker获取消息并处理。Zookeeper 主要是管理相关的配置文件,如客户端端口地址,Replication数量,客户端连接数等等功能。

安装测试

  1. http://kafka.apache.org/downloads 下载应用包。你的机器也需要安装scala 程序包。
  2. 下载完成后,解压并cd 到目录里。
  3. 你会看到bin,config,libs,和logs 这几个主要文件夹。bin 文件夹里有调用kafka broker 程序命令, config 文件夹里有各中配置文件,libs 主要是所有kafka程序库jar 包,logs 文件夹保存相关服务器日志。
  4. cd 到config 里,我们主要会设置调试这两个文件,server.properties 和zookeeper.properties。
  5. 通过editor 或者cat zookeeper.properties ,可以看到如以下
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181 // 客户端连接端口
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0 // 限制连接数

6. 启动zookeeper 服务。

bin/zookeeper-server-start.sh config/zookeeper.properties

7. 启动kafka broker 服务之前,先看看config/server.properties 文件里主要的配置。

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0 //代表broker 独立的ID,如设置多个broker, 需要设置不同的独立
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092 //自定义运行端口是9092,多个broker  需要不同的端口
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1 // 可以设置多个保存消息空间

8. 启动kafka broker server 。

bin/kafka-server-start.sh config/server.properties

9.启动成功后,我们尝试发一个topic

bin/kafka-topics.sh --create -zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

10. 查看创建好的topic. 结果如下。

Lucass-MacBook-Pro:kafka_2.11-0.10.2.0 lucaschen$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic
Topic:testTopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testTopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

代码示范

  1. 创建一个Maven 项目,以JAVA 编写基本实现程序。
  2. 在pom.xml 的<dependencies /> 里加载kafka client 程序库依赖,maven会自动连接联系互联网并下载相关的库到你本地项目里。可以参考https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients
  3. 创建分别四个文件,分别为
    Order.java ~ 定义Order 的实体类,包含id和product properties.
    OrderConsumer.java ~ 负责处理消费消息。
    OrderProducer.java ~ 负责生产消息。
    OrderSerializer.java ~ 负责解析和序列化Order消息。
项目结构
Order.java
public class Order implements Serializable {
private String orderId;
private String productName;
//省略 constructor , getter 和setter
}
OrderProducer.java
public class OrderProducer {

public static void main(String[] args) {

String topic = "orderTopic";

Properties properties = getKafkaProducerProperties();

KafkaProducer producer = new KafkaProducer(properties);

List<Order> dummyOrderData = getDummyOrderData();

dummyOrderData.forEach(order -> {
ProducerRecord producerRecord = new ProducerRecord(topic, order.getOrderId(), order);
producer.send(producerRecord);
});

producer.close();
}

private static Properties getKafkaProducerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");
properties.put("acks", "all");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "com.lucas.Kafka.OrderSerializer");
return properties;
}

private static List<Order> getDummyOrderData() {
return Arrays.asList(
new Order("1", "product1"),
new Order("2", "product2"),
new Order("3", "product3"),
new Order("4", "product4"),
new Order("5", "product5")
);
}
}
OrderConsumer.java
public class OrderConsumer {
public static void main(String[] args) {
String topic = "orderTopic";
Properties properties = getKafkaConsumerProperties();
Consumer consumer = new KafkaConsumer(properties);
consumer.subscribe(Arrays.asList(topic));

while (true) {
ConsumerRecords<String, Order> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, Order> consumerRecord : consumerRecords) {
String printMessage = "Position = %d, key = %s , product = %s";
System.out.printf(printMessage,
consumerRecord.offset(), consumerRecord.key(), consumerRecord.value().getProductName());
System.out.println();
}
  private static Properties getKafkaConsumerProperties() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092,localhost:9093,localhost:9094");//我启动多个broker
properties.put("group.id", "consumer");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "com.lucas.Kafka.OrderSerializer");
return properties;
}
}
OrderSerializer.java
public class OrderSerializer implements Serializer, Deserializer {

@Override
public void
configure(Map map, boolean b) {

}

@Override
public byte
[] serialize(String s, Object o) {

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
objectOutputStream.writeObject(o);
objectOutputStream.close();
return byteArrayOutputStream.toByteArray();
} catch (IOException e) {
e.printStackTrace();
}
return new byte[0];
}

@Override
public
Object deserialize(String s, byte[] bytes) {

ByteArrayInputStream in = new ByteArrayInputStream(bytes);
ObjectInputStream is;
try {
is = new ObjectInputStream(in);
return is.readObject();
} catch (IOException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
return null;
}

@Override
public void
close() {

}
}

4. OrderSerializer 是custom serializer 和deserializer class ,你可以在定义Producer和Consumer 对象的时候把配置properties 改用 org.apache.kafka.common.serialization.StringDeserializer
org.apache.kafka.common.serialization.StringSerializer,
并序列化Object 为JSON String. Properties 部分还有其他配置,可以参考http://kafka.apache.org/documentation.html#config

总结

还有其他开源如activeMQ, zeroMQ 等等消息队列系统。这些框架提高了现今互联网系统高并发处理能力并同时兼容稳定性,保证业务不受影响。Alibaba双 十一 0% 掉单也是使用自身团队开发的rocketMQ。