场景
前段时间遇到一个场景,有一个用户社区模块,用户可以在社区的帖子专区发布帖子,或评论帖子,帖子和评论信息需要依据产生时间依次发送给第三方,消息内容为帖子和帖子评论,接收方必须先接收帖子再接收帖子评论,且需要按照发生时间顺序依次接收。第三接收方随时可能新增或减少,新增的接收方需要获取加入时间前30天之内的历史数据。
消息中间件选择
这是一个纯粹的消息传递功能,不涉及数据的整合和流处理,但是因为新增的消费者(第三方)要能获取到前30天之内的历史数据,所以我们需要一个能够将消息持久化的消息组件,并且要能自动清理30天之前的历史消息数据,选来选取,目前最方便的就是kafka了,我们来看一下kafka的特性:
- 持久化:kafka默认就支持将消息持久化到磁盘,被消费的消息不会被删除
- 历史数据:kafka可以通过
log.retention.hours=720
指定只保存30天的数据 - 帖子顺序发送:kafka的partition是有序的
- 效率:kafka的topic可以通过多分区提升效率,为分区设置副本提升高可用
具体实现
Kafka配置
设置kafka持久化日志过期时间:log.retention.hours=720
Topic分区数量
依据帖子和评论的数量估算Topic的分区数量
伪代码
1 | public void push(T t){ |
- 先从DB中查询帖子和帖子评论
- 查询topic在kafka中的分区数
- 根据帖子ID和分区数取模,将帖子和评论均衡分配到各个分区中
- 将消息发送给对应分区,帖子和对应评论需要发送到同一分区,并且要按照时间发送,这样才能保证同一个帖子的发送顺序
消费者
每一个第三方的消费者都归属于唯一的消费者组,且必须给所有的消费者都设置一个消费者组