Kafka初探

基本名词

  • 消息:kafka中的数据单元称为消息,也可以叫记录,相当于MySQL表中的一条记录
  • 批次:为了提高效率,kafka可以一次性写入一批数据(消息),批次指的就是一组消息
  • 主题:相当于MySQL的表,一个主题(Topic)代表着一类消息,kafka使用主题对消息分类
  • 分区:分区(partition)归属于主题,一个主题可以划分为若干个分区,分区可以分布在不同的broker上,也可以在同一个broker上,使用分区来实现kafka的伸缩性。主题的单个分区上的消息是有序的,但是不同分区上的消息无法保证有序。
  • 生产者:向主题发布消息的客户端称为生产者,生产者用于不断的向主题发送消息
  • 消费者:订阅主题消息的客户端称为消费者,消费者用于处理生产者生产的消息
  • 消费者群组:生产者与消费者的关系是一对多,比如一个客服对应多个咨询者,消费者群组就是由一批消费者组成的
  • 偏移量:偏移量(Consumer Offset)是一种源数据,是一个单向递增的整数标识,用于记录消费者发生重平衡时的位置,以便用来恢复数据
  • broker:一个独立的服务器被称为broker,broker接收来自生产者的消息,并为消息设置偏移量,并提交消息持久化到磁盘
  • broker集群:多个broker组成一个集群,保证kafka的高可用,每个集群中都有一个broker充当集群Leader的角色
  • 副本:kafka中消息的备份又称为副本(Replica),副本的数量是可配置的,类型有Leader和Follower两种,Leader对外提供服务,Follower辅助
  • 重(chong)平衡(ReBalance):若消费者组内某个消费者宕了,其他存活的消费者自动重新分配订阅主题分区,kafka高可用的必备能力

关系介绍

  • Topic&Partition

    1. Topic是kafka中给消息分类的标记,一个消息必定属于一个Topic,一个Topic可以包括一个或多个Partition,Partition又可以有多个副本,副本又可以分配在不同的broker上。

    2. Partition内部是有序的,Partition之间是无序的

      img
    3. 内部存储是以append-log的方式不断进行log文件尾部追加,文件读写是在磁盘上是顺序的,效率极高,媲美内存操作,每一条log对应一个offset,可以把offset理解为一个数组的下标,通过这个下标就可以读取对应的消息数据,Partition只负责为消息分配offset,消费者具体由哪个offset开始消费消息完全由消费者自己控制,也就是kafka服务端只负责提供数据,消费者自己控制消息消费进度。

      img
    4. kafka虽然是可以持久化消息,并且不删除已经被消费过的消息,但消息也不是被永久存储在磁盘上的,为了防止磁盘长期被消息写入数据日积月累,kafka提供两种旧数据淘汰策略:

    • 开启数据清理:log.cleaner.enable=true,默认关闭状态

    • 基于时间:log.retention.hours=168,单位:小时;log.retention.ms=100,单位:毫秒;log.retention.minutes,单位:分钟

    • 基于文件大小:log.retention.bytes=1073741824,单位:字节

  • Consumer&Consumer Group

    1. 一个消费者组由一个或多个消费者组合而成,每一条消息只会被同一个group中的一个消费者消费,但是不同group中的消费者可以同时消费同一条消息,保证了 消息队列中的消息只被消费一次;kafka是发布订阅模式的消息队列,这里订阅的是消费者组,而不是特定的一个消费者实例。

      img

      kafka支持离线处理和实时处理,所以我们可以使用Hadoop进行离线处理,也可以使用Storm这种实时流处理系统进行实时处理,还可以将数据实时的同步到其他的数据中心,前提是这些消费者处于不同的消费者组。

      可以测试一下:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      # 1. 创建一个topic
      kafka-topics --create --zookeeper localhost:2181 --replication-factor 3 --partition 1 --topic cc_topic

      # 2. 启动一个Producer
      kafka-console-producer --broker-list localhost:9092 --topic cc_topic

      # 3. 启动五个Consumer,1~3号放cc_group_1,4~5放cc_group_2
      kafka-console-consumer --bootstrap-server localhost:9092 --group cc_group_1 --from-beginning --topic cc_topic

      kafka-console-consumer --bootstrap-server localhost:9092 --group cc_group_1 --from-beginning --topic cc_topic

      kafka-console-consumer --bootstrap-server localhost:9092 --group cc_group_1 --from-beginning --topic cc_topic

      kafka-console-consumer --bootstrap-server localhost:9092 --group cc_group_2 --from-beginning --topic cc_topic

      kafka-console-consumer --bootstrap-server localhost:9092 --group cc_group_2 --from-beginning --topic cc_topic

      # 4. 发送一条消息
      123

      # 5. 查看消费者
      cc_group_1和cc_group_2各自收到123这条消息一次
  • Consumer ReBalance

    Consumer ReBalance是通过Zookeeper实现,kafka保证了同一个消费者组中只能有一个消费者消费某条消息,其实kafka保证的是在稳定状态下每一个消费者都只会消费一个或多个Partition的消息,而某一Partition的消息仅会被一个消费者消费,这样设计的优势是每个消费者不用跟所有的broker进行通信,减少了通信开销,劣势是同一个消费组内的消费者不能均匀消费,而且单个Partition内部的数据是有序的,所以对于单个消费者来说,其消费的消息是有序的。

    1. Consumer < Partition:会出现某些Consumer消费多个Partition的数据
    2. Consumer > Partition:会出现某些Consumer没有可消费的Partition
    3. Consumer = Partition:一个Consumer消费一个Partition,均匀

特性

  • 高吞吐、低延迟:kafka处理消息的速度非常快,每秒几乎可以处理几十万条消息,并且最低延迟只有几毫秒
  • 高伸缩性:每个topic都能有多个partition,每个partition又可以分布在不同的broker上
  • 高并发:能够同时支持数千个客户端进行读写
  • 容错性:允许集群中的某些节点失败,某个节点宕机,kafka仍然可用继续提供服务
  • 持久性、可靠性:kafka的消息存储是基于Zookeeper的,Zookeeper是可以将消息持久化到磁盘上,并且支持数据备份,所以kafka是一个非常可靠的可持久化消息中间件
  • 速度快:kafka采用零拷贝的模式实现数据的快速移动,避免了内核空间和用户空间的频繁切换,kafka可以批量发送数据,从生产者到文件系统到消费者;数据压缩可以通过有效的数据压缩减少IO次数,并且采用顺序读写的方式避免寻址造成的消耗。总结起来就是零拷贝、顺序读写、数据压缩、分批发送。

消息队列

  • 点对点(一对一):一个生产者所生产的消息只会被一个消费者进行消费,不会同时被多个消费者消费

    image-20200416173858298
  • 发布订阅(一对多、多对多):一个或多个生产者所生产的消息会被多个消费者同时消费

    image-20200416173919621

架构体系

image-20200416220029587

一个kafka集群包含若干个Producer、若干Consumer group、若干broker和Zookeeper集群组成,kafka通过Zookeeper管理Partition,选举Leader,以及在Consumer发生变化时通过Zookeeper进行ReBalance。Producer将消息push到Partition,Consumer通过pull将消息从Partition拉取到本地。

API

kafka目前提供五类常用的API,主要有Producer、Consumer、Stream、Connect、Admin API:

  • Producer API:允许App作为Producer将消息发送到kafka集群的一个或多个topic上
  • Consumer API:允许App作为Consumer从kafka集群上的一个或多个topic拉取消息
  • Stream API:允许App作为流处理器,从一个或多个topic中消费输入流并转化为输出流
  • Connector API:允许将现有的应用程序或存储系统连接到kafka的topic,充当Producer或Consumer
  • Admin API:允许管理和检查topic、broker和kafka的其他内容
img

重要配置参数

kafka的参数配置文件是server.properties

  • broker.id:每个broker都有一个唯一标识,就像是MySQL表中的主键ID,默认值是0,这个值在kafka集群中必须是唯一不可重复的,值随意设置。
  • port:kafka broker的默认端口是9092,若未指定port参数,则就是9092,修改port参数可以是任意端口,但是最好不要低于1024,不然就需要管理员权限启动了
  • zookeeper.connect:设置broker源数据的Zookeeper地址,参数的值可以设置一个或多个,多个zk通过逗号分隔,比如zk1:port,zk2:port,zk3:port,不同的kafka集群可以使用同一个zk集群,可以通过指定zk的具体path来区分每个kafka的使用,比如kafka cluster1使用zk:port/path1,kafka cluster2使用zk:port/path2
  • zookeeper.connection.timeout.ms:设置broker连接Zookeeper的超时时间,单位是毫秒
  • log.dirs:kafka把所有的消息都保存在本地磁盘上,保存的日志地址通过该参数指定,可以指定多个存储目录,通过逗号分隔,例如/home/kafka/1,/home/kafka/2,/home/kafka/3
  • auto.create.topic.enable:默认为true,允许随意的创建topic,参数为true时,使用Producer往一个不存在的topic发送消息时会自动创建topic、使用Consumer从一个不存在的topic拉取消息时自动创建topic、主动创建topic、当任意一个客户端向topic发送元数据请求时。此值建议在生产上设置为false,topic由人工进行分配,防止生产环境出现各种乱七八糟的topic。
  • topic相关参数
    • num.partitions:主题拥有的Partition数量,若在创建topic的时候未指定分区数量,则使用该参数的值,默认为1。在运行过程中,分区数量可以增加不能减少,在创建时可以通过--partition指定个数
    • default.replication.factor:kafka消息的默认副本数,默认为1,只有在自动创建topic的时候才有效,在创建时可以通过--replication-factor指定
    • log.cleaner.enable:是否开启日志清理功能,默认为true,清理方式有时间和日志文件大小两种方式
    • log.retention.hours:设置kafka消息保存的时间,默认为168个小时,还可以通过log.retention.mslog.retention.minutes来设置清理时间的毫秒和分钟时间
    • log.retention.bytes:设置topic的每个Partition所能保存的数据量,比如若一个topic有10个Partition,此参数的值为1G,那么该topic的最大存储容量为8G,topic的容量随着Partition的增加而增加。
    • log.segment.bytes:设置日志文件的最大的容量大小。当消息到达broker时,会被追加到日志文件中,但是如果日志片段的当前大小加上新接收消息的打小后超过了该参数设置的值,则将新消息和后续的消息写入到一个新的日志文件中。该参数的值越小,分割的文件就越多,磁盘的写入效率就越低。
    • log.segment.ms:除了待日志文件大小超值后重新分配新文件之外,还可以通过日志创建时间来控制消息日志文件的生命周期,可以和log.segment.bytes同时设置,哪一个先达标使用哪一个策略,比如bytes设置为1G,ms设置为1小时,若30分钟内文件容量已达1G,则后续消息写入到新的日志文件中,若1小时内日志文件尚未达到1G,则也分配新的日志文件记录后续的消息。
    • log.retention.check.interval.ms:检查日志段以查看是否可以根据保留策略删除它们的时间间隔,单位:毫秒
    • message.max.bytes:该参数限定broker可接收的单个消息的大小,默认是1MB,如果Producer发送的消息大于此值,则broker会直接拒绝并返回错误。该参数指定的是压缩后的消息大小,消息的实际大小可能大于此值。