为什么选择Kafka-Kraft架构?
Kafka,作为分布式消息系统的长期标杆,过去一直依赖 ZooKeeper 进行元数据管理。然而,从 Kafka 2.8.0 版本开始引入的 KRaft 模式(Kafka Raft Metadata mode),彻底摆脱了对 ZooKeeper 的依赖。
Kafka Raft功能在 Kafka 2.8.0 版本开始实验性引入,在 3.x 版本逐步稳定,在 Kafka 4.0 版本成为默认模式,彻底替代ZooKeeper。
KRaft 模式的核心变革
KRaft 模式的核心在于利用 Kafka 内部的 Quorum 控制器,通过 Raft 协议 来管理所有元数据。
所有元数据都直接保存在 Kafka 控制器中,这意味着我们不再需要单独维护一套 ZooKeeper 集群,简化运维工作,我们只需要维护 Kafka 集群本身,从而有效节省运算资源。
KRaft 模式的三大显著优势
这种自管理的元数据仲裁机制,为 Kafka 带来了三大核心优势:
- 部署与运维更简化:减少了对外部组件的依赖,显著降低了系统的部署复杂度和日常运维负担。
- 元数据操作性能大幅提升:实践证明,元数据操作的延迟降低了 40%,大幅提升了 Kafka 内部的运行效率。
- 系统稳定性与故障恢复能力增强:KRaft 模式从根本上消除了 ZooKeeper 可能导致的脑裂风险,大大增强了系统的整体稳定性。同时,在出现故障时,其自管理的元数据仲裁机制也能让故障恢复过程更为迅速和可靠。
架构图
环境准备
- 系统:Ubuntu 22.04
- JDK:21
- Kafka: 3.9.1
- CPU:4核+
- 内存:8GB+(生产环境建议16GB+)
- 磁盘:SSD/NVMe,预留2倍消息存储空间
ip | 主机名 | 角色 | node id | Broker端口 | Controller端口 |
---|---|---|---|---|---|
192.168.1.111 | kafka01 | Broker,Controller | 1 | 9092 | 9093 |
192.168.1.112 | kafka02 | Broker,Controller | 2 | 9092 | 9093 |
192.168.1.113 | kafka03 | Broker,Controller | 3 | 9092 | 9093 |
- 所有节点既是 Broker 又是 Controller,但进程分离部署。
- 使用 Kafka 内置的 KRaft 模式,不需要 ZooKeeper。
- 所有 Controller 运行 Controller-only 模式。
- 所有 Broker 运行 Broker-only 模式。
- 所有节点必须能互相访问。
-
安装JDK
JDK版本17或21
apt install -y openjdk-21-jdk
root@kafka01:~# java -version
openjdk version "21.0.7" 2025-04-15
OpenJDK Runtime Environment (build 21.0.7+6-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 21.0.7+6-Ubuntu-0ubuntu122.04, mixed mode, sharing)
部署Kafka
安装Kafka
wget https://dlcdn.apache.org/kafka/3.9.1/kafka_2.13-3.9.1.tgz
tar xf kafka_2.13-3.9.1.tgz
mv kafka_2.13-3.9.1 /usr/local/kafka
mkdir /data/kafka
导入环境变量
cat << \EOF > /etc/profile.d/kafka.sh
export PATH=/usr/local/kafka/bin:$PATH
EOF
source /etc/profile
创建 kafka 数据目录
mkdir -p /data/kafka
配置KRaft
kafka01 配置
root@kafka01:~# vim /usr/local/kafka/config/kraft/server.properties
# 节点ID,要求集群内唯一
node.id=1
# controller集群成员列表,集群内部选举和同步元数据
controller.quorum.voters=1@192.168.1.111:9093,2@192.168.1.112:9093,3@192.168.1.113:9093
# 侦听器名称、主机名和代理将向客户端公布的端口(broker对外地址)
advertised.listeners=PLAINTEXT://192.168.1.111:9092,CONTROLLER://192.168.1.111:9093
# kafka数据目录
log.dirs=/data/kafka
kafka02 配置
root@kafka02:~# vim /usr/local/kafka/config/kraft/server.properties
# 节点ID,要求集群内唯一
node.id=2
# 集群地址信息
controller.quorum.voters=1@192.168.1.111:9093,2@192.168.1.112:9093,3@192.168.1.113:9093
# 侦听器名称、主机名和代理将向客户端公布的端口(broker对外地址)
advertised.listeners=PLAINTEXT://192.168.1.112:9092,CONTROLLER://192.168.1.112:9093
# kafka数据目录
log.dirs=/data/kafka
kafka03 配置
root@kafka03:~# vim /usr/local/kafka/config/kraft/server.properties
# 节点ID,要求集群内唯一
node.id=3
# 集群地址信息
controller.quorum.voters=1@192.168.1.111:9093,2@192.168.1.112:9093,3@192.168.1.113:9093
# 侦听器名称、主机名和代理将向客户端公布的端口(broker对外地址)
advertised.listeners=PLAINTEXT://92.168.1.113:9092,CONTROLLER://92.168.1.113:9093
# kafka数据目录
log.dirs=/data/kafka
初始化集群
生成存储目录唯一ID
root@kafka01:~# kafka-storage.sh random-uuid
oc4WpGixSBKdjz6TukLrUg
格式化 kafka 存储目录(每个节点都要执行)
root@kafka01:~# /usr/local/kafka/bin/kafka-storage.sh format -t oc4WpGixSBKdjz6TukLrUg -c /usr/local/kafka/config/kraft/server.properties
Formatting metadata directory /data/kafka with metadata.version 3.9-IV0.
启动集群
每个节点都执行启动服务命令
root@kafka01:~# /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/kraft/server.properties
查看服务日志
root@kafka01:~# tail -f /usr/local/kafka/logs/server.log
集群验证
查看 kafka 节点状态
root@kafka01:~# kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
查看 topic 信息
root@kafka01:~# kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
systemd 管理服务
创建服务文件
root@kafka01:~# vim /etc/systemd/system/kafka.service
[Unit]
Description=Apache Kafka server (KRaft mode)
Documentation=https://kafka.apache.org/documentation/
After=network.target
[Service]
Type=simple
User=root
Group=root
ExecStart=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/kraft/server.properties
ExecStop=/usr/local/kafka/bin/kafka-server-stop.sh
Restart=on-failure
# 设置 JMX 配置(可选)
Environment=KAFKA_OPTS="-Djava.rmi.server.hostname=192.168.1.111 -Dcom.sun.management.jmxremote.port=9997 -Dcom.sun.management.jmxremote.rmi.port=9997 -Dcom.sun.management.jmxremote. -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"
# 设置 Kafka 日志输出(可选)
StandardOutput=syslog
StandardError=syslog
SyslogIdentifier=kafka
[Install]
WantedBy=multi-user.target
重新启动 kafka 服务
root@kafka01:~# kafka-server-stop.sh
root@kafka01:~# systemctl enable kafka
root@kafka01:~# systemctl restart kafka
root@kafka01:~# systemctl status kafka
部署Kafka-ui
Kraft 模式的 kafka 集群管理工具推荐使用 kafka-ui。
下载地址:https://github.com/provectus/kafka-ui/releases
root@kafka01:~# mkdir /usr/local/kafka-ui
root@kafka01:~# cd /usr/local/kafka-ui/
root@kafka01:/usr/local/kafka-ui# wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar
创建配置文件
root@kafka01:/usr/local/kafka-ui# cat > config.yml << EOF
kafka:
clusters:
- name: kafka-cluster
bootstrapServers: 192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
metrics:
port: 9997
type: JMX
EOF
启动测试
root@kafka01:/usr/local/kafka-ui# java -Dspring.config.additional-location=/usr/local/kafka-ui/config.yml -jar /usr/local/kafka-ui/kafka-ui-api-v0.7.2.jar
访问验证
使用 systemd 管理服务
root@kafka01:/usr/local/kafka-ui# vim /etc/systemd/system/kafka-ui.service
[Unit]
Description=Kafka UI Service
After=network.target
[Service]
Environment=JAVA_HOME=/usr/lib/jvm/java-21-openjdk-amd64 # 根据实际环境设置 JAVA_HOME
Environment=PATH=$PATH:$JAVA_HOME/bin
ExecStart=/usr/lib/jvm/java-21-openjdk-amd64/bin/java -Dspring.config.additional-location=/usr/local/kafka-ui/config.yml -jar /usr/local/kafka-ui/kafka-ui-api-v0.7.2.jar
User=root
Group=root
WorkingDirectory=/usr/local/kafka-ui
Restart=always
[Install]
WantedBy=multi-user.target
systemctl daemon-reload
systemctl enable kafka-ui
systemctl restart kafka-ui
systemctl status kafka-ui
Kafka基本使用
查看Broker情况
root@kafka01:/opt/kafka# kafka-broker-api-versions.sh --bootstrap-server 127.0.0.1:9092
192.168.1.112:9092 (id: 2 rack: null) -> (
...
)
192.168.1.113:9092 (id: 3 rack: null) -> (
...
)
192.168.1.111:9092 (id: 1 rack: null) -> (
...
)
创建topic
root@kafka01:/opt/kafka# kafka-topics.sh --create --topic test --partitions 3 --replication-factor 2 --bootstrap-server 127.0.0.1:9092
Created topic test.
查看topic
root@kafka01:~# kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092
Topic: test TopicId: E_oGGl_LTi-MZSin0boIHA PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 2 Replicas: 2,3 Isr: 2,3 Elr: LastKnownElr:
Topic: test Partition: 1 Leader: 3 Replicas: 3,1 Isr: 3,1 Elr: LastKnownElr:
Topic: test Partition: 2 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
生产者发送消息
root@kafka01:~# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test
>hello world
消费者消费消息
要让消费者接收在它启动之前就已经发送的消息,需要添加
--from-beginning
参数root@kafka02:~# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning hello world
删除 topic
root@kafka01:~# kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic test