Kafka消费者

消息由生产者产出,产出后push到partition中,但是既然有生产了,那肯定就要有消费,不然我们生产出来的消息岂不成了垃圾数据,所以在kafka中有一个与生产者对应的玩意儿:消费者。生产者是往partition中push数据,而消费者是从partition中pull消息,有些MQ中是由服务端push消息给消费者,并且是阅后即焚的模式,但是kafka是支持将消息持久化到磁盘上,并且并不会因为消息被消费了而删除,每一个消费者都可以自由的消费partition中的历史消息,即使是一个新加入的Consumer,也可以通过指定offset将partition中所有的历史消息都从头消费一次,而服务端只负责为消息设定offset,Consumer从哪一条消息开始消费完全由自己决定,每一个Consumer都会在本地维护自己的offset,Consumer之间的offset互不干扰。

每一个消费者都归属于一个消费者群组,一个partition只能被同一个消费者群组内的一个消费者实例消费,但可以被不同消费者群组同时消费,每个群组内的所有消费者订阅的都是同一个topic,每个消费者接收一个topic中一部分partition的消息,若消费者数量多于partition的数量,则会出现闲置的消费者,而若消费者数量小于partition的数量,则会出现某个消费者消费多个partition中的数据。

消费者群组(Consumer Group)

消费者群组是由一个或多个消费者实例组成的群组,具有可扩展性和容错性。每一个消费者群组都拥有全局唯一的group_id,群组内的所有的消费者实例共享这个group_id,群组内的所有消费者订阅同一个topic,组内的一个消费者实例只能消费topic中的一个Partition的消息。

消费类型:

  1. 点对点:一个topic对应一个消费者群组
  2. 广播(发布-订阅):一个topic对应多个消费者群组

消费者和分区

  1. Consumer数量 < Partition数量

    当Partition的数量多于Consumer的时候,一个Consumer会消费来自多个Partition的消息,此种情形下会加大Consumer的压力,单Consumer实例的吞吐量决定了消息消费的效率,当消息井喷的时候,会造成大多数消息都积压在Partition中,严重的时候会影响服务性能。若想提升消息消费效率,可以通过增加Consumer实例解决。

    image-20200417151054543
  2. Consumer数量 = Partition数量

    当Partition和Consumer数量相等的时候属于是最平衡的状态,一个Partition对应一个Consumer,消息的消费效率完全取决于Consumer实例的吞吐量

    image-20200417151007055
  3. Consumer数量 > Partition数量

    因为一个Partition只能被同一个消费者组内的一个Consumer实例消费,所以当Consumer的数量大于Partition的数量时,将会有多个Consumer实例空闲着,无消息可消费。比如下图中,Topic0有4个Partition,而Consumer Group中有5个消费者实例,由消费关系来看,Consumer4这个实例会一直空闲着。

    image-20200417150940246
  4. 多元化消费消息

    一个Partition只能被同一个消费者群组内的一个Consumer实例消费,但是我们在实际生产中可能会遇到需要将一条消息发送给多端的情况,比如订单生成佣金的时候,不同的项目组都有各自的佣金策略,这种情况可以给每一个项目组分配一个group的形式让所有的项目组共享消息

    image-20200417165151251

消费者重平衡(ReBalance)

当消费者群组内的消费者实例发生变化时,消息的消费情况会是什么样子?用户群组初始有两个Consumer实例,每一个都要负责消费两个Partition的消息,然后群组内新增了两个Consumer实例,这样Consumer和Partition的数量一致了,新增加的消费者会均衡的替原有消费者分摊处理Partition的消息,最终达到一对一的平衡状态,这个过程就叫作ReBalance。

image-20200417172313104

kafka通过消费者群组的ReBalance实现高可用和伸缩性,在群组做ReBalance期间,整个群组的消费者都无法消费消息,所以一般情况下我们都会预先估算好消费者群组内的消费者数量,估算的依据是消息量和topic下的Partition数。且当一个Partition重新分配给另一个消费者实例时,会造成当前正在消费的消息状态丢失。

消费者群组内的消费者实例需要定期向broker发送心跳来维护自己在群组内的地位,比如消费者进行poll和commit的同时,会发送一次心跳。如果broker长时间未接收到来自Consumer的心跳请求,则认为该Consumer实例已宕机,自动将其从群组中移除,并做一次ReBalance,Consumer之前对应的Partition的消息会暂时不再被消费。

偏移量

偏移量可以分为生产者偏移量和消费者偏移量,生产者偏移量实际上就是每一个Partition的偏移量,Partition之间是相互隔离的,也可以理解为Partition中的每一个消息都一个offset属性,每一个Producer针对topic-Partition都在自身维护一个offset;同时也可以理解为生产者偏移量只是一个虚拟的offset,实际上就是消息队列的size。消费者偏移量是切切实实和消费者相关的一个最重要的元素,使用偏移量,Consumer才能知道要从哪条消息开始读取,偏移量主要和topic的Partition相关,尤其是在消费者群组发生ReBalance的时候,调整后的Consumer从新的Partition接收消息之前需要知道这个Partition有多少消息已经被消费了,当前的消费者要从哪一条消息开始消费,否则就会出现重复消费或者遗漏消息的情况。

kafka中默认一个topic:__consumer_offsets,消费者每消费一次就会像该topic发送一次消息,消息中包含每一个Topic-Partition的offset,在发生ReBalance之后,消费者开始负责另外一个Partition的消息,这个时候会向__consumer_offsets获取该Partition最后一次被消费的偏移量,然后从该偏移量开始继续消费数据。

通过topic、partition、offset三个元素可以定位一条消息。

消费者在获取一批消息之后,需要将最后一条消息的偏移量提交给服务器,也就是将偏移量commit给topic__consumer_offsets,提交方式有自动提交和手动提交两种,通过参数enable.auto.commit来控制,默认值为true,也就是默认自动提交;而将参数值改为false,就是手动提交。

  • 自动提交
    1. enable.auto.commit=true,设置为自动提交
    2. auto.commit.interval.ms=3000,设置自动提交时间间隔,单位:毫秒,默认5秒
  • 手动提交
    1. enable.auto.commit=false,设置为手动提交
    2. 同步提交:consumer.commitSync()
    3. 异步提交:consumer.commitAsync()

配置

  • fetch.min.bytes:指定消费者每次获取记录的最小字节数,默认为1字节,若本次poll时数据量大小不满足条件,则会等到有足够的数据时再拉取。当消息不用非常实时的时候,可以将消费者的此值设置略大一些,以此降低消费者和broker的工作负载。
  • fetch.max.bytes:指定消费者每次获取记录的最大字节数,默认为50M
  • max.partition.fetch.bytes:指定每个分区每次拉取消息的最大字节数,默认为1M
  • auto.offset.reset:指定消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的处理方式,默认值是latest,也就是从最新的记录开始读取,也可以设置为earliest,让消费者从第一条消息开始读取
  • enable.auto.commit:设置消费者是否自动提交偏移量,默认为true
  • heartbeat.interval.ms:消费者每次心跳间隔,单位:毫秒,默认为3s
  • bootstrap.servers:指定消费者连接的kafka集群地址,通过逗号分隔,格式:host1:port1,host2:port2,...
  • group.id:消费者所属群组id,默认为默认分组
  • key.deserializer:key反序列化方式
  • value.deserializer:value反序列化方式

案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public void receiver() {
Properties properties = new Properties();
// kafka集群地址
properties.put("bootstrap.servers", "127.0.0.1:9092");
// 消费者群组id
properties.put("group.id", "cc_consumer");
// key反序列化
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value反序列化
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 创建消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// 指定消费者监听的topic
consumer.subscribe(Arrays.asList("my_topics"));
try {
while (true) {
// 拉取topic
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.toString());
}
}
} finally {
consumer.close(Duration.ofMillis(2000));
}
}