# kafka 的事务机制

Kafka 事务与数据库的事务定义基本类似,主要是一个原子性:多个操作要么全部成功,要么全部失败。Kafka 中的事务可以使应用程序将消费消息、生产消息、提交消费位移当作原子操作来处理。

  • KAFKA的事务机制,在底层依赖于幂等生产者,幂等生产者是 kafka 事务的必要不充分条件;
  • 事实上,开启 kafka事务时,kafka 会自动开启幂等生产者。

# kafka的幂等性

当 kafka producer 向 broker 中的 topic发送数据时,可能会因为网络抖动等各种原因,造成 producer 收不到 broker 的 ack 确认信息。kafka幂等性就会保证在生产者内部逻辑问题引起的消息重复消费的时候,只有一个数据可以被正确的发送。

注意事项

如果使用try/catch捕获,用send手动发送,则会被视为不同的消息**

原理

  • 在 producer 端,每个 producer 都被 broker 自动分配了一个 Producer Id (PID), producer 向 broker 发送的每条消息,在内部都附带着该 pid 和一个递增的 sequence number;
  • 在 broker 端,broker 为每个 topic 的每个 partition 都维护了一个当前写成功的消息的最大 PID-Sequence Number 元组;
  • 当 broker 收到一个比当前最大 PID-Sequence Number 元组小的 sequence number 消息时,就会丢弃该消息,以避免造成数据重复存储;
  • 当 broker 失败重新选举新的 leader 时, 以上去重机制仍然有效:因为 broker 的 topic 中存储的消息体中附带了 PID-sequence number 信息,且 leader 的所有消息都会被复制到 followers 中。当某个原来的 follower 被选举为新的 leader 时,它内部的消息中已经存储了PID-sequence number 信息,也就可以执行消息去重了。

# kafka事务基本流程

  • initTransactions:方法用来初始化事务,这个方法能够执行的前提是配置了transactionalId,如果没有则会报出IllegalStateException:
  • beginTransaction:方法用来开启事务;
  • sendOffsetsToTransaction:方法为消费者提供在事务内的位移提交的操作;将偏移量提交到事务中,仅当整个交易(消费和生产)成功时,它才会提交。
  • commitTransaction:方法用来提交事务;
  • abortTransaction:方法用来中止事务,类似于事务回滚。
producer.initTransactions();
try {
    producer.beginTransaction();
    for (ProducerRecord<String, String> record : payload) {
        producer.send(record);
    }

    Map<TopicPartition, OffsetAndMetadata> groupCommit = new HashMap<TopicPartition, OffsetAndMetadata>() {
        {
            put(new TopicPartition(TOPIC, 0), new OffsetAndMetadata(42L, null));
        }
    };
    producer.sendOffsetsToTransaction(groupCommit, "groupId");
    producer.commitTransaction();
} catch (ProducerFencedException e) {
    producer.close();
} catch (KafkaException e) {
    producer.abortTransaction();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 事务基本流程

  1. 存储对应关系,通过请求增加分区
    • Producer 在向新分区发送数据之前,首先向 TransactionalCoordinator 发送请求,使 TransactionalCoordinator 存储对应关系 (transactionalId, TopicPartition) 到主题 __transaction_state 中。
  2. 生产者发送消息
    • 基本与普通的发送消息相同,生产者调用 producer.send() 方法,发送数据到分区;
    • 发送的请求中,包含 pid, epoch, sequence number 字段;
  3. 增加消费 offset 到事务
    • 生产者通过 producer.senOffsetsToTransaction() 接口,发送分区的 Offset 信息到事务协调者,协调者将分区信息增加到事务中;
  4. 事务提交位移
    • 在前面生产者调用事务提交 offset 接口后,会发送一个 TxnOffsetCommitRequest 请求到消费组协调者,消费组协调者会把 offset 存储到 Kafka 内部主题 __consumer_offsets 中。协调者会根据请求的 pid 与 epoch 验证生产者是否允许发起这个请求。
    • 只有当事务提交之后,offset 才会对外可见。
  5. 提交或回滚事务
    • 用户调用 producer.commitTransaction()abortTransaction() 方法,提交或回滚事务;
    • 生产者完成事务之后,客户端需要显式调用结束事务,或者回滚事务。前者使消息对消费者可见,后者使消息标记为 abort 状态,对消费者不可见。无论提交或者回滚,都会发送一个 EndTxnRequest 请求到事务协调者,同时写入 PREPARE_COMMIT 或者 PREPARE_ABORT 信息到事务记录日志中。

需要注意的是

如果事务性生产者(Transactional Producer)发送的消息没有被提交,消费者是不会读取该消息之后的数据的。

Last Updated: 4/9/2024