Kafka生产者

在kafka中把产生消息的一方称为生产者(Producer),尽管消息的产生非常简单,但是消息的发送过程比较复杂

img

发送消息从创建一个ProducerRecord对象开始,此类是kafka中的一个核心类,表示kafka需要发送的K-V键值对,记录了要发送的topic、partition、key、value、timestamp等

1
2
3
4
5
6
7
8
public class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;
}

在发送ProducerRecord的时候需要将对象序列化为字节数组,便于在网络上传输,之后消息达到分区器,若发送过程中指定了分区号,也就是partition,则在发送消息的时候将使用指定的分区,若发送过程中未制定分区,则根据topic和cluster中的partition数量顺序选择一个分区进行发送,分区选择器由接接口org.apache.kafka.clients.producer.Partitioner的实现类指定。

org.apache.kafka.clients.producer.KafkaProducer

1
2
3
4
5
6
7
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}

org.apache.kafka.clients.producer.internals.DefaultPartitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 选取分区
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
// 顺序index
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
// 取模
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}

ProducerRecord内关联的时间戳timestamp,如果用户未指定,则使用KafkaProducer内的time的时间作为时间戳,但是kafka最终使用的时间戳取决于topic配置的时间戳类型:

  • topic为CreateTime,则消息记录中的时间戳由broker使用
  • topic为LogAppendTime,则消息记录中的时间戳会在追加到日志中时由broker重写

img

消息被放在一个记录批次里ProducerBatch,这个批次的所有消息都会被发送到相同的topic和partition上,由一个FutureRecordMetadata负责发送。

broker收到消息后会返回一个响应,如果发送正常的话,会返回一个RecordAppendResult对象,包含了topic、partition、offset、时间戳等信息,发送失败则会将失败的消息记录下来,然后后续重试发送。

org.apache.kafka.clients.producer.KafkaProducer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
throwIfProducerClosed();
// first make sure the metadata for the topic is available
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
// 序列化key
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
// 序列化value
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
// 决定要发送的partition
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);

// 设置header
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();

int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
ensureValidRecordSize(serializedSize);
// 设置消息时间戳
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

// 事务
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);

// 发送消息,见下方代码
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
return result.future;
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
// 记录错误信息
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
this.interceptors.onSendError(record, tp, e);
throw e;
}
}

org.apache.kafka.clients.producer.internals.RecordAccumulator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
return appendResult;
}

byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
// 申请一个缓冲区,将消息数据写入到缓冲区中
buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");

RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
return appendResult;
}

MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
// 将消息分批处理
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

dq.addLast(batch);
incomplete.add(batch);

// 清空缓冲区
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}

消息发送类型

  1. 简单发送

    kafka最简单的消息发送是只指定topic和key及value,分区及时间戳均使用默认值,send()方法会返回一个Future<RecordMetadata>对象,如果不需要关心返回值,则可以忽略这个返回值,否则必须关注此值,方法返回的异常信息可能有InterruptedException(发送线程中断异常)BufferExhaustedException(缓冲区已满)SerializationException(序列化异常)

    1
    2
    3
    ProducerRecord<String,String> record =
    new ProducerRecord<>("cc_test","cc","chuanchuan");
    producer.send(record);
  2. 同步发送

    第一种简单发送方式的前提是我们不在意发送的结果,但是我们在正常的情况下都会等待broker的反馈。我们从发送的源码中看到send()方法返回的Future<RecordMetadata>对象,我们可以调用Future的get()方法阻塞主线程等待broker的响应,如果返回错误,则我们调用get()方法的时候会抛出异常,如果没发生异常,则顺利获取到RecordMetadata对象,使用该对象查看消息的详细信息:topic、key和value的序列化后的大小、offset、partition。

    生产者发送过程中一般会出现两类错误:一类可以通过重试解决,一类无法通过重试解决。比如连接错误、无Leader错误等都可以通过重试来实现,而消息过大这类错误KafkaProducer会直接抛出异常,不会重试,因为不管重试多少次都是消息过大。

    1
    2
    3
    4
    5
    6
    7
    ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
    try{
    RecordMetadata rm = producer.send(record).get();
    System.out.println(rm.offset());
    } catch(Exception e) {
    log.error("occur error", e);
    }
  3. 异步发送

    消息同步发送会造成同一时间只能有一条消息在发送中,在其有返回之前,其他的消息都需要一直等待,这样会造成消息堵塞滞后,无法让kafka发挥更大的效益,若一个消息发送需要20ms,发送五十条消息就需要1s,如果我们使用异步这种方式,那么发送五十条可能只需要30ms,甚至更少。异步发送的原理是在我们调用send()方法时传入一个接口org.apache.kafka.clients.producer.Callback的实现类的对象,由ProducerBatch的私有方法completeFutureAndFireCallbacks完成回调

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    ProducerRecord<String, String> record = new ProducerRecord<>("cc_test", "cc", "chuanchuan");
    producer.send(record, );

    class CcProducerCallback implements Callback {
    public void onCompletion(RecordMetadata metadata,Exception exception){
    if(exception != null){
    exception.printStackTrace();
    }
    }
    }

    org.apache.kafka.clients.producer.internals.ProducerBatch

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
    produceFuture.set(baseOffset, logAppendTime, exception);

    // execute callbacks
    for (Thunk thunk : thunks) {
    try {
    // 发生异常
    if (exception == null) {
    RecordMetadata metadata = thunk.future.value();
    if (thunk.callback != null)
    thunk.callback.onCompletion(metadata, null);
    } else {
    // 正常
    if (thunk.callback != null)
    thunk.callback.onCompletion(null, exception);
    }
    } catch (Exception e) {
    log.error("Error executing user-provided callback on message for topic-partition '{}'", topicPartition, e);
    }
    }

    produceFuture.done();
    }

分区机制

kafka对于数据的读写是以partition为粒度的,partition可以分布在不同的broker上,每个节点都可以独立的实现消息的读写,并且能够通过新增新的broker来提升kafka集群的吞吐量,partition部署在多个broker来实现负载均衡。

kafka的分区策略其实指的就是Producer将消息发送到哪个分区的算法,kafka提供了默认的分区策略,同时也支持我们自定义分区策略,所有的策略都实现于接口org.apache.kafka.clients.producer.Partitioner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public interface Partitioner extends Configurable, Closeable {

/**
* 提供消息信息计算partition
*
* @param topic topic名称
* @param key key名称
* @param keyBytes key序列化字节数组
* @param value value值
* @param valueBytes value序列化字节数组
* @param cluster 集群
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

/**
* 关闭partitioner
*/
public void close();

}

消息发送到哪一个partition上涉及到分区选择机制,主要有顺序、随机、按key分配、自定义分配等方式,具体的实现方法就是public int partition()

  1. 顺序轮询

    顺序分配就是消息均匀的发送给每一个partition,每个partition存储一次消息,kafka的默认策略。

    image-20200417033430230
  2. 随机策略

    随机策略可以先计算出topic的总的partition数,然后使用ThreadLocalRandom.current().nextInt()方法来获取一个小于分区总数的随机值,随机策略会导致消息分布不均匀。虽然是随机的,但是单个分区内也是有序的。

    image-20200417035130292

    策略代码

    1
    2
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    return ThreadLocalRandom.current().nextInt(partitions.size());
  3. key分配策略

    这个策略也叫做 key-ordering策略,kafka中每条消息都会有自己的key,一旦消息被定义了 key,那么你就可以保证同一个key的所有消息都进入到相同的partition里面,因为每个partition下的消息处理都是有顺序的,所以这个策略也被称为按消息键保序策略

    image-20200417035625713

    策略代码

    1
    2
    3
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    // Math.abs()的原因是hashCode可能是负数
    return Math.abs(key.hashCode()) % partitions.size();
  4. 自定义分配策略

    自由发挥吧,只要实现Partitioner接口就成了

    application.properties

    1
    2
    # org.apache.kafka.clients.producer.ProducerConfig类中定义了各类参数配置信息
    spring.kafka.properties.partitioner.class=cc.kevinlu.springboot.kafka.partitioners.CcPartitioner

    CcPartitioner

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    package cc.kevinlu.springboot.kafka.partitioners;

    import java.util.Map;

    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;

    import lombok.extern.slf4j.Slf4j;

    @Slf4j
    public class CcPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    if (log.isDebugEnabled()) {
    log.debug("{}------------{}", topic, cluster.availablePartitionsForTopic(topic).size());
    }
    // 永远都打到partition 0上
    return 0;
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {
    }
    }

Producer Property

  1. retries:消息重试次数,若消息发送过程中出现错误,但是可通过重新发送来弥补错误,比如Leader缺失,则生产者会不断的重发消息,直到重发次数达到此参数指定的值后放弃重试并返回错误,默认情况下每次重试间隔100ms,通过参数retry.backoff.ms指定
  2. acks:指定要有多少个partition副本接收消息,生产者才认为消息是成功写入,acks能够控制消息丢失概率。
    • acks=0:表示生产者只管发不管服务器是否接收了,非常容易丢消息
    • acks=1:只要集群的Leader收到了消息就立刻反馈给生产者,消息可能会丢失
    • acks=all:只有当所有的参与复制的节点都接收到消息时,broker才会反馈给生产者,能够保证消息绝不丢失,但是延迟更高
  3. key.serializer:key的序列化类,需是接口org.apache.kafka.common.serialization.Serializer的实现类
  4. value.serializer:value的序列化类,需是接口org.apache.kafka.common.serialization.Serializer的实现类
  5. compression.type:消息压缩类型,默认为none, 可选值有none、gzip、snappy、lz4、zstd