Kafka之保证可靠的数据传递

2019/04/25 Kafka

可靠性保证

Kafka的数据可靠性保证:

  1. 保证分区消息的顺序

如果使用同一个生产者往同一个分区写入消息,消息B在消息A之后写入,Kafka保证B的偏移量比A大,消费者会先读取A再读取B。

  1. ==只有当消息被写入分区的所有同步副本时(不一定要写入磁盘),它才被认为是已提交的。==

生产者可以选择接受不同类型的确认,比如在消息被完成提交时确认,或者被写入首领时确认,或者被发送时就确认。

  1. 只要还有一个副本是活跃的,已经提交的消息就不会丢失。

  2. 消费者只能读取已经提交的消息。

Kafka管理员需要权衡消息存储的可靠性和一致性的重要程度, 以及可用性、高吞吐量、低延迟的硬件成本的重要程度之间的权衡。

复制

==Kafka的复制机制和分区的多副本架构是Kafka可靠性保证的核心。==

Kafka的主题本分为多个分区,分区是基本的数据块。分区存储在单个磁盘上,Kafka保证分区里的消息是有序的。

每个分区可以有多个副本,其中一个副本是首领副本。所有消息都直接发送给首领副本,或者直接从首领副本读取消息。

其他副本只需要与首领副本同步,并及时复制最新的消息,当首领副本不可用时,其中一个同步副本将称为新的首领副本。

跟随者副成称为同步副本的条件:

  1. 与Zookeeper之间有一个活跃的会话。(在过去6s向ZooKeeper发送心跳)
  2. 在过去10s内从首领那里获取过消息。
  3. 在过去10s内从首领那里获取过最新的消息。 跟随者由于网络原因成为不同步副本,一旦重新获取最新消息后,可以重新变成同步副本。

由于Kafka的消息需要所有同步副本确认才能称为已提交,因此,一个滞后的同步副本会降低Kafka的吞吐率。而如果一个副本不再是同步的,就不再会影响Kafka的性能,但是会增大数据风险。

broker配置

复制系数

  • replication.factor : 主题级别的配置参数
  • default.replication.factor : broker级别的配置参数,配置自动创建的主题

如果复制系数是N,则每个分区总共会被N个不同的broker复制,总共有N个数据副本。

建议在要求可用性的场景里把复制系统至少设为3.

副本的分布也很重要:

  • broker.rack : 为每个broker配置所在的机架 Kafka会保证分区的副本被分布在多个机架上。

不完全的首领选举

  • unclean.leader.election=true/false 默认为true

当首领不可用时,其他副本都是不同步的,是否允许不同步副本成为新的首领。

如果不同步的副本不能成为新首领,在旧首领恢复前,Kafka不可用,降低了可用性。

如果不同步的副本可以成为新首领,因为不同步副本不包括所有旧首领的消息,可能有数据丢失的风险。

最少同步副本

  • min.insync.replicas

消息只有被写入到所有同步副本后才被认为是已提交的,该参数设定此时“所有同步副本”的最少数目。

对于一个包含3个副本的主题,如果min.insync.replicas=2,那么至少要存在两个同步副本才向分区写入数据。

有3个副本,min.insync.replicase=2,如果两个副本不可用,及“所有同步副本” < min.insync.replicase。

broker会停止接受生产者的请求,返回NotEnoughReplicasException.
消费者仍然可以继续读取已有的数据,broker变为只读。

生产者的可靠性

即使broker配置的尽可能可靠,如果生产者本身是不可靠的,数据丢失仍然会发生。

acks设置

  1. acks=0

此时生产者不管发送是否成功,很大可能会丢失消息。

  1. acks=1

首领收到消息并写入分区文件后即返回确认。

如果首领在跟随者副本还没有收到更新时崩溃,消息会丢失。

如果发送消息时,broker正在进行首领选举,生产者会收到LeaderNotAvailableException异常,生产者需要恰当的处理该异常,重发消息。

  1. acks=all

首需要等待所有同步副本都收到消息后才返回确认。

min.insync.replicas参数结合,决定在返回确认前至少有多少个副本能够收到消息。

最可靠,但是吞吐率最低。

生产者重试

当错误发生时,对于可以自动处理的错误(如,LeaderNotAvailableException),可以进行多次重试,直至消息发送成功。

但是,重试可能造成同个消息多次写入的问题,==broker会收到两个相同的消息,Kafka没法保证每个消息只被处理一次。==

对于幂等消息(如:这个账号里有100美元),重复消息不会对结果造成影响。但是对于非幂等消息(如:往账号里增加100美元),会造成结果错误。

==对于重复消息,可以在消息里加入唯一标识符,并在消费者中进行清理。==

消费者的可靠性

只有已经被写入所有==同步副本==的数据,才会被消费者读取,因此消费者得到的消息已经具备了一致性。

消费者可靠性主要是跟踪哪些消息是已经读取过的,哪些是还没读取过的,保证读取消息时不会丢失。

如果消费者提交了偏移量,却未能处理完轮询得到的消息,就可能造成消息丢失。

消费者的可靠性配置

  1. group.id

如果两个消费者具有相同的group.id,并且订阅了同一个主题,每个消费者会分到主题分区的一个子集,也就是只能读取到所有消息的一个子集。

如果希望消费者可以看到主题的所有消息,需要为它设置唯一的group.id。

  1. auto.offset.reset = earliest / latest

配置在没有偏移量可以提交时,或请求的偏移量在broker上不存在时,消费者的读取位置。

  • earliest : 从分区开始位置读取。造成重复读取,不会丢失。
  • latest : 从分区末尾开始读取。可能丢失,不会重复。
  1. enable.auto.commit

自动提交偏移量。

  1. auto.commit.interval.ms

偏移量提交时间间隔,默认为5s。

消费者的可靠性

总是在处理完事件后再提交偏移量

提交的偏移量应该是处理完成的消息偏移量,而不是读取到的偏移量。

偏移量提交频率是性能和重复消息数量之间的权衡

可以在一个循环里多次提交偏移量,也可以在多个循环只提交一次偏移量。

注意再均衡

==注意要在再均衡发生前提交偏移量。==

消费者重试

在不影响轮询读取的情况下,对处理失败的消息进行重试。

如,记录#30处理失败,#31处理成功,此时需要在不丢弃#30和不影响轮询的状态下对#30进行重试。

方法一:

  • 提交最后一个处理成功的偏移量,把处理失败的消息保存到缓冲区。
  • 调用KafkaConsumer#pause(Collection<TopicPartition> partitions)使得轮询不再返回新数据。
  • 尝试重新处理缓冲区中的消息,直至成功或到达重试上限。
  • 调用KafkaConsumer#resume(Collection<TopicPartition> partitions)使得轮询返回新数据。
KafkaConsumer:

public void pause(Collection<TopicPartition> partitions)

暂停从给定分区中获取数据,新的poll调用不会给消费者返回任何数据。

public void resume(Collection<TopicPartition> partitions)

从暂停中恢复。

==使用pauseresume方法是因为,不能跳出poll循环,也不能长时间阻塞轮询,会造成长时间没有发出心跳,Kafka broker会认为消费者宕机,造成再均衡。==

方法二:

  • 把错误写入一个独立的主题。
  • 建立一个独立的消费者群组专门负责从错误主题上读取需要重试的数据。
长时间处理

暂停轮询的时间不能超过几秒,否则客户端和broker的心跳将断开。

  • 使用线程池处理需要长时间处理的数据。
  • 调用pause(),保持轮询,等待工作线程完成处理。
  • 调用resume(),继续获取数据。
仅一次处理

消费者如果要支持仅一次处理语义(及每个消息只被写到外部系统一次,不处理重复消息)。

最简单的办法是把结果写到一个支持唯一键的系统里,如键值存储引擎、关系型数据库、ElasticSearch或其他数据引擎,可以在消息里直接包含一个唯一的键,也可以使用主题 + 分区 + 偏移量的组合创建唯一键。

本文地址:https://cheng-dp.github.io/2019/04/25/kafka-maintain-credible-data-transport/

Search

    Table of Contents