简
在kafka中把产生消息的一方称为生产者(Producer),尽管消息的产生非常简单,但是消息的发送过程比较复杂
发送消息从创建一个ProducerRecord对象开始,此类是kafka中的一个核心类,表示kafka需要发送的K-V键值对,记录了要发送的topic、partition、key、value、timestamp等
1 | public class ProducerRecord<K, V> { |
在发送ProducerRecord的时候需要将对象序列化为字节数组,便于在网络上传输,之后消息达到分区器,若发送过程中指定了分区号,也就是partition,则在发送消息的时候将使用指定的分区,若发送过程中未制定分区,则根据topic和cluster中的partition数量顺序选择一个分区进行发送,分区选择器由接接口org.apache.kafka.clients.producer.Partitioner
的实现类指定。
org.apache.kafka.clients.producer.KafkaProducer
1 | private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { |
org.apache.kafka.clients.producer.internals.DefaultPartitioner
1 | // 选取分区 |
ProducerRecord内关联的时间戳timestamp,如果用户未指定,则使用KafkaProducer内的time的时间作为时间戳,但是kafka最终使用的时间戳取决于topic配置的时间戳类型:
- topic为CreateTime,则消息记录中的时间戳由broker使用
- topic为LogAppendTime,则消息记录中的时间戳会在追加到日志中时由broker重写
消息被放在一个记录批次里ProducerBatch
,这个批次的所有消息都会被发送到相同的topic和partition上,由一个FutureRecordMetadata负责发送。
broker收到消息后会返回一个响应,如果发送正常的话,会返回一个RecordAppendResult
对象,包含了topic、partition、offset、时间戳等信息,发送失败则会将失败的消息记录下来,然后后续重试发送。
org.apache.kafka.clients.producer.KafkaProducer
1 | private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { |
org.apache.kafka.clients.producer.internals.RecordAccumulator
1 | public RecordAppendResult append(TopicPartition tp, |
消息发送类型
-
简单发送
kafka最简单的消息发送是只指定topic和key及value,分区及时间戳均使用默认值,send()方法会返回一个
Future<RecordMetadata>
对象,如果不需要关心返回值,则可以忽略这个返回值,否则必须关注此值,方法返回的异常信息可能有InterruptedException(发送线程中断异常)
,BufferExhaustedException(缓冲区已满)
,SerializationException(序列化异常)
1
2
3ProducerRecord<String,String> record =
new ProducerRecord<>("cc_test","cc","chuanchuan");
producer.send(record); -
同步发送
第一种简单发送方式的前提是我们不在意发送的结果,但是我们在正常的情况下都会等待broker的反馈。我们从发送的源码中看到send()方法返回的
Future<RecordMetadata>
对象,我们可以调用Future的get()方法阻塞主线程等待broker的响应,如果返回错误,则我们调用get()方法的时候会抛出异常,如果没发生异常,则顺利获取到RecordMetadata
对象,使用该对象查看消息的详细信息:topic、key和value的序列化后的大小、offset、partition。生产者发送过程中一般会出现两类错误:一类可以通过重试解决,一类无法通过重试解决。比如连接错误、无Leader错误等都可以通过重试来实现,而消息过大这类错误KafkaProducer会直接抛出异常,不会重试,因为不管重试多少次都是消息过大。
1
2
3
4
5
6
7ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
try{
RecordMetadata rm = producer.send(record).get();
System.out.println(rm.offset());
} catch(Exception e) {
log.error("occur error", e);
} -
异步发送
消息同步发送会造成同一时间只能有一条消息在发送中,在其有返回之前,其他的消息都需要一直等待,这样会造成消息堵塞滞后,无法让kafka发挥更大的效益,若一个消息发送需要20ms,发送五十条消息就需要1s,如果我们使用异步这种方式,那么发送五十条可能只需要30ms,甚至更少。异步发送的原理是在我们调用send()方法时传入一个接口
org.apache.kafka.clients.producer.Callback
的实现类的对象,由ProducerBatch的私有方法completeFutureAndFireCallbacks
完成回调1
2
3
4
5
6
7
8
9
10ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
producer.send(record, );
class CcProducerCallback implements Callback {
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception != null){
exception.printStackTrace();
}
}
}org.apache.kafka.clients.producer.internals.ProducerBatch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
produceFuture.set(baseOffset, logAppendTime, exception);
// execute callbacks
for (Thunk thunk : thunks) {
try {
// 发生异常
if (exception == null) {
RecordMetadata metadata = thunk.future.value();
if (thunk.callback != null)
thunk.callback.onCompletion(metadata, null);
} else {
// 正常
if (thunk.callback != null)
thunk.callback.onCompletion(null, exception);
}
} catch (Exception e) {
log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
}
}
produceFuture.done();
}
分区机制
kafka对于数据的读写是以partition为粒度的,partition可以分布在不同的broker上,每个节点都可以独立的实现消息的读写,并且能够通过新增新的broker来提升kafka集群的吞吐量,partition部署在多个broker来实现负载均衡。
kafka的分区策略其实指的就是Producer将消息发送到哪个分区的算法,kafka提供了默认的分区策略,同时也支持我们自定义分区策略,所有的策略都实现于接口org.apache.kafka.clients.producer.Partitioner
1 | public interface Partitioner extends Configurable, Closeable { |
消息发送到哪一个partition上涉及到分区选择机制,主要有顺序、随机、按key分配、自定义分配等方式,具体的实现方法就是public int partition()
。
-
顺序轮询
顺序分配就是消息均匀的发送给每一个partition,每个partition存储一次消息,kafka的默认策略。
-
随机策略
随机策略可以先计算出topic的总的partition数,然后使用
ThreadLocalRandom.current().nextInt()
方法来获取一个小于分区总数的随机值,随机策略会导致消息分布不均匀。虽然是随机的,但是单个分区内也是有序的。策略代码
1
2List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size()); -
key分配策略
这个策略也叫做 key-ordering策略,kafka中每条消息都会有自己的key,一旦消息被定义了 key,那么你就可以保证同一个key的所有消息都进入到相同的partition里面,因为每个partition下的消息处理都是有顺序的,所以这个策略也被称为按消息键保序策略
策略代码
1
2
3List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
// Math.abs()的原因是hashCode可能是负数
return Math.abs(key.hashCode()) % partitions.size(); -
自定义分配策略
自由发挥吧,只要实现Partitioner接口就成了
application.properties
1
2# org.apache.kafka.clients.producer.ProducerConfig类中定义了各类参数配置信息
cc.kevinlu.springboot.kafka.partitioners.CcPartitioner =CcPartitioner
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28package cc.kevinlu.springboot.kafka.partitioners;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import lombok.extern.slf4j.Slf4j;
4j
public class CcPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if (log.isDebugEnabled()) {
log.debug("{}------------{}", topic, cluster.availablePartitionsForTopic(topic).size());
}
// 永远都打到partition 0上
return 0;
}
public void close() {
}
public void configure(Map<String, ?> configs) {
}
}
Producer Property
- retries:消息重试次数,若消息发送过程中出现错误,但是可通过重新发送来弥补错误,比如Leader缺失,则生产者会不断的重发消息,直到重发次数达到此参数指定的值后放弃重试并返回错误,默认情况下每次重试间隔100ms,通过参数
retry.backoff.ms
指定 - acks:指定要有多少个partition副本接收消息,生产者才认为消息是成功写入,acks能够控制消息丢失概率。
- acks=0:表示生产者只管发不管服务器是否接收了,非常容易丢消息
- acks=1:只要集群的Leader收到了消息就立刻反馈给生产者,消息可能会丢失
- acks=all:只有当所有的参与复制的节点都接收到消息时,broker才会反馈给生产者,能够保证消息绝不丢失,但是延迟更高
- key.serializer:key的序列化类,需是接口
org.apache.kafka.common.serialization.Serializer
的实现类 - value.serializer:value的序列化类,需是接口
org.apache.kafka.common.serialization.Serializer
的实现类 - compression.type:消息压缩类型,默认为none, 可选值有none、gzip、snappy、lz4、zstd