Kafka中的消息是否会丢失和重复消费

在解释Kafka中的消息是否会丢失和重复消费这两个问题前,让我们先回顾下kafka的基本概念。

Kafka简介

Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标如下:

为何使用消息系统

解耦

在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。扩展就像调大电力按钮一样简单。

灵活性 & 峰值处理能力

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见;如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。

缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列通过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽可能的快速。该缓冲有助于控制和优化数据流经过系统的速度。

异步通信

很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

名词解释

Broker   Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic   每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition   Parition是物理上的概念,每个Topic包含一个或多个Partition.

Producer   负责发布消息到Kafka broker

Consumer   消息消费者,向Kafka broker读取消息的客户端。

Consumer Group   每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。


好了,在回顾了kafka的基本概念后,在此基础上我们基本搞清楚了Kafka的机制及如何运用。那我们重回标题的问题:Kafka中的消息会不会丢失或重复消费呢?为什么呢?

要确定Kafka的消息是否丢失或重复,可以从两个方面分析入手:消息发送消息消费

1、消息发送

Kafka消息发送有两种方式:同步(sync)和异步(async),默认是同步方式,可通过producer.type属性进行配置。Kafka通过配置request.required.acks属性来确认消息的生产:

综上所述,有6种消息生产的情况,下面分情况来分析消息丢失的场景:

2、消息消费

Kafka消息消费有两个consumer接口,Low-level API和High-level API:

如果使用高级接口High-level API,可能存在一个问题就是当消息消费者从集群中把消息取出来、并提交了新的消息offset值后,还没来得及消费就挂掉了,那么下次再消费时之前没消费成功的消息就“诡异”的消失了;

解决办法:

针对消息丢失:同步模式下,确认机制设置为-1,即让消息写入Leader和Follower之后再确认消息发送成功;异步模式下,为防止缓冲区满,可以在配置文件设置不限制阻塞超时时间,当缓冲区满时让生产者一直处于阻塞状态;

针对消息重复:将消息的唯一标识保存到外部介质中,每次消费时判断是否处理过即可。


Kafka的Leader选举机制

Kafka将每个Topic进行分区Patition,以提高消息的并行处理,同时为保证高可用性,每个分区都有一定数量的副本 Replica,这样当部分服务器不可用时副本所在服务器就可以接替上来,保证系统可用性。在Leader上负责读写,Follower负责数据的同步。当一个Leader发生故障如何从Follower中选择新Leader呢?

Kafka在Zookeeper上针对每个Topic都维护了一个ISR(in-sync replica—已同步的副本)的集合,集合的增减Kafka都会更新该记录。如果某分区的Leader不可用,Kafka就从ISR集合中选择一个副本作为新的Leader。这样就可以容忍的失败数比较高,假如某Topic有N+1个副本,则可以容忍N个服务器不可用。

如果ISR中副本都不可用,有两种处理方法:

  1. 等待ISR集合中副本复活后选择一个可用的副本;
  2. 选择集群中其他可用副本;

关于kafka的高可用,具体可参考:http://www.jasongj.com/2015/04/24/KafkaColumn2/


[返回首页]