TLS加密
准备证书
root@kafka01:~# mkdir /usr/local/kafka/pki
root@kafka01:~# cd /usr/local/kafka/pki/
# 生成 CA 证书
root@kafka01:/usr/local/kafka/pki# openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -keyout ca.key -out ca.crt -subj "/CN=Kafka-CA"
# 生成私钥
root@kafka01:/usr/local/kafka/pki# openssl genrsa -out kafka.key 4096
# 生成证书签名请求 (CSR)
root@kafka01:/usr/local/kafka/pki# openssl req -new -key kafka.key -out kafka.csr -subj "/CN=kafka-cluster"
# 创建包含所有节点的SAN 配置文件
root@kafka01:/usr/local/kafka/pki# cat > san.cnf << EOF
[ req ]
distinguished_name = req_distinguished_name
req_extensions = req_ext
prompt = no
[ req_distinguished_name ]
CN = kafka-cluster
[ req_ext ]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names
[ alt_names ]
# 节点主机名与ip
DNS.1 = kafka01
DNS.2 = kafka02
DNS.3 = kafka03
IP.1 = 192.168.1.111
IP.2 = 192.168.1.112
IP.3 = 192.168.1.113
EOF
# 签署证书
root@kafka01:/usr/local/kafka/pki# openssl x509 -req -in kafka.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out kafka.crt -days 3650 -extfile san.cnf -extensions req_ext
# 验证证书
root@kafka01:/usr/local/kafka/pki# openssl x509 -in kafka.crt -text -noout | grep -A 1 "Subject Alternative Name"
X509v3 Subject Alternative Name:
DNS:kafka01, DNS:kafka02, DNS:kafka03, IP Address:192.168.1.111, IP Address:192.168.1.112, IP Address:192.168.1.113
root@kafka01:/usr/local/kafka/pki# ls -l
total 24
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root 423 Jul 26 16:03 san.cnf
创建 Keystore
将证书和私钥转换为 PKCS12 文件
root@kafka01:/usr/local/kafka/pki# openssl pkcs12 -export -in kafka.crt -inkey kafka.key -out kafka.p12 -name kafka-cert -CAfile ca.crt -caname root -passout pass:sundayhk.com
使用 keytool
将 kafka.p12
文件导入到 Keystore:
root@kafka01:/usr/local/kafka/pki# keytool -importkeystore \
-deststorepass sundayhk.com \
-destkeypass sundayhk.com \
-destkeystore kafka.keystore.jks \
-srckeystore kafka.p12 \
-srcstoretype PKCS12 \
-srcstorepass sundayhk.com \
-alias kafka-cert
root@kafka01:/usr/local/kafka/pki# ls -l
total 40
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root 4400 Jul 26 16:05 kafka.keystore.jks
-rw------- 1 root root 4360 Jul 26 16:05 kafka.p12
-rw-r--r-- 1 root root 423 Jul 26 16:03 san.cnf
创建 Truststore
使用 keytool
创建 Truststore 并导入 CA 证书:
root@kafka01:/usr/local/kafka/pki# keytool -import \
-file ca.crt \
-keystore kafka.truststore.jks \
-storepass sundayhk.com \
-alias root
Trust this certificate? [no]: yes
Certificate was added to keystore
root@kafka01:/usr/local/kafka/pki# ls -l
total 44
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root 4400 Jul 26 16:05 kafka.keystore.jks
-rw------- 1 root root 4360 Jul 26 16:05 kafka.p12
-rw-r--r-- 1 root root 1654 Jul 26 16:06 kafka.truststore.jks
-rw-r--r-- 1 root root 423 Jul 26 16:03 san.cnf
分发文件
将kafka.truststore.jks 和kafka.keystore.jks 文件分发到其他 kafka 节点
root@kafka01:/usr/local/kafka/pki# scp kafka.keystore.jks kafka.truststore.jks 192.168.1.112:/usr/local/kafka/pki/
root@kafka01:/usr/local/kafka/pki# scp kafka.keystore.jks kafka.truststore.jks 192.168.1.113:/usr/local/kafka/pki/
Kafka服务端配置 TLS
在 Kafka KRaft 模式下的 server.properties
文件中,添加以下配置:
root@kafka01:~# vim /usr/local/kafka/config/kraft/server.properties
# 修改SSL配置
listeners=SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SSL
advertised.listeners=SSL://192.168.1.111:9092,CONTROLLER://192.168.1.111:9093
# 新增Keystore配置
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.key.password=sundayhk.com
# 新增Truststore配置
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
# 客户端连接时启用ssl
ssl.client.auth=required
重启 kafka
root@kafka01:~# systemctl restart kafka
客户端配置 TLS
所有节点操作
创建客户端配置文件,指定证书信息 admin.properties文件内容:
root@kafka01:~# cat > /usr/local/kafka/config/admin.properties << EOF
security.protocol=SSL
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
ssl.endpoint.identification.algorithm=
ssl.key.password=sundayhk.com
EOF
连接 kafka 集群测试
# 查看节点信息
root@kafka01:~# kafka-broker-api-versions.sh --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties
192.168.1.112:9092 (id: 2 rack: null) -> (
……
)
192.168.1.111:9092 (id: 1 rack: null) -> (
……
)
192.168.1.113:9092 (id: 3 rack: null) -> (
……
)
# 查看topic信息
root@kafka01:~# kafka-topics.sh --describe --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties
Topic: test TopicId: RhzPTC_eRL-etwrn3p5z-g PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: test Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1 Elr: LastKnownElr:
Topic: test Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2 Elr: LastKnownElr:
Topic: test Partition: 2 Leader: 2 Replicas: 2,3 Isr: 3,2 Elr: LastKnownElr:
生产消费消息测试
# 生产者发送消息
root@kafka02:~# kafka-console-producer.sh --bootstrap-server 192.168.1.111:9092 --topic test --producer.config /usr/local/kafka/config/admin.properties
>hello ssl
# 消费者接收消息
root@kafka03:~# kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --from-beginning --consumer.config /usr/local/kafka/config/admin.properties
hello ssl
kafka-ui 配置 TLS
修改 kafka-ui 配置文件
root@kafka01:~# cat > /usr/local/kafka-ui/config.yml << EOF
kafka:
clusters:
-
name: kafka-cluster
bootstrapServers: 192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
metrics:
port: 9997
type: JMX
properties:
security:
protocol: SSL
ssl:
keystore:
location: /usr/local/kafka/pki/kafka.keystore.jks
password: sundayhk.com
ssl_endpoint_identification_algorithm: ''
ssl:
truststorelocation: /usr/local/kafka/pki/kafka.truststore.jks
truststorepassword: sundayhk.com
EOF
重启 kafka-ui 并验证
root@kafka01:~# systemctl restart kafka-ui
PLAIN认证
在Kafka中,SASL(Simple Authentication and Security Layer)机制包括三种常见的身份验证方式:
- SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
- SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
- SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。
创建Kraft账号密码认证文件
以下操作在所有节点执行。
创建两个用户,分别为admin、test(此处仅用于演示,实际生产环境建议按业务创建多个不同的账号,并配置对指定 topic 的读写权限)
cat << EOF > /usr/local/kafka/config/kafka_server_jaas.conf
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret"
user_test="test-secret";
user_alice="alice-secret";
};
EOF
该配置通过org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。
usemame和password指定该代理与集群其他代理初始化连接的用户名和密码
user_admin="admin-secret", 这个表示一个用户名为admin用户,密码是admin-secret,这个必须要有一个,且要这一个跟上面的username和password保持一致。
user_test="test-secret" 是第二个用户,表示的是用户名为test的账户,密码为test-secret。
修改 kafka 配置文件
Kafka broker 的 server.properties
配置文件,来启用 SASL/PLAIN 认证。以下是需要配置的参数
root@kafka01:~# vim /usr/local/kafka/config/kraft/server.properties
# 修改以下配置
listeners=SASL_SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_SSL
advertised.listeners=SASL_SSL://192.168.1.111:9092,CONTROLLER://192.168.1.111:9093
# 节点间CONTROLLER映射为SASL_PLAINTEXT认证
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# 新增以下配置
# 设置 SASL 认证机制
sasl.enabled.mechanisms=PLAIN
# 集群间认证时用的认证方式
sasl.mechanism.inter.broker.protocol=PLAIN
# 指定Kafka 客户端与 Broker 之间使用的 SASL 认证机制
sasl.mechanism=PLAIN
# 指定控制器通信时使用的认证机制
sasl.mechanism.controller.protocol=PLAIN
# 配置 SASL 认证存储方式为文件
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 设置必须授权才能用
allow.everyone.if.no.acl.found=false
# 配置超级用户
super.users=User:admin
修改启动脚本
root@kafka01:~# vim /usr/local/kafka/bin/kafka-server-start.sh
# 新增-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
fi
重启 kafka 集群
root@kafka01:~# systemctl restart kafka
客户端使用账户密码认证
修改客户端配置文件,新增认证信息
cat > /usr/local/kafka/config/admin.properties << EOF
bootstrap.servers=192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
ssl.endpoint.identification.algorithm=
ssl.key.password=sundayhk.com
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
EOF
查看 boorker 信息
root@kafka01:~# kafka-broker-api-versions.sh --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties
192.168.1.112:9092 (id: 2 rack: null) -> (
……
)
192.168.1.113:9092 (id: 3 rack: null) -> (
……
)
192.168.1.111:9092 (id: 1 rack: null) -> (
……
)
查看 topic 信息
root@kafka01:~# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.111:9092 --list --command-config /usr/local/kafka/config/admin.properties
__consumer_offsets
test
创建客户端认证文件
cat > /usr/local/kafka/config/client_jaas.conf << EOF
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};
EOF
修改生产者和消费者脚本,添加-Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf
root@kafka01:~# vim /usr/local/kafka/bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf kafka.tools.ConsoleProducer "$@"
root@kafka01:~# vim /usr/local/kafka/bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf org.apache.kafka.tools.consumer.ConsoleConsumer "$@"
生产者发送消息
root@kafka01:~# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic test --producer.config /usr/local/kafka/config/admin.properties
> hello kafka
消费者消费消息
root@kafka01:~# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --consumer.config /usr/local/kafka/config/admin.properties --from-beginning
hello kafka
kafka-ui 使用账号密码认证
更新 kafka-ui 配置文件
root@kafka01:~# vim/usr/local/kafka-ui/config.yml
kafka:
clusters:
- name: kafka-cluster
bootstrapServers: 192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
metrics:
port: 9997
type: JMX
properties:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
sasl.jaas.config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
ssl.keystore.location: /usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password: sundayhk.com
# ssl.key.password: sundayhk.com
ssl.truststore.location: /usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password: sundayhk.com
ssl.endpoint.identification.algorithm: ''
重启 kafka-ui 验证
root@kafka01:~# systemctl restart kafka-ui
ACL权限
在Kafka中,ACL(Access Control List)是用来控制谁可以访问Kafka资源(如主题、消费者组等)的权限机制。ACL配置基于Kafka的kafka-acls.sh
工具,能够管理对资源的读取、写入等操作权限。
Kafka ACL的基本概念
Kafka的ACL是基于以下几个方面的:
- 资源类型(Resource Type): Kafka支持多种资源类型,包括主题(Topic)、消费者组(Consumer Group)、Kafka集群本身(Cluster)等。
- 操作类型(Operation Type): 如
Read
(读取)、Write
(写入)、Create
(创建)、Describe
(描述)、Alter
(修改)等。 - 权限类型(Permission Type):
Allow
表示允许访问,Deny
表示拒绝访问。 - 主体(Principal): 访问Kafka的用户或客户端。Kafka支持通过SASL认证系统中的用户来定义主体,通常是
User:<username>
的形式。
添加ACL
给用户User:test
添加对test
主题的读取权限:
root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --add --allow-principal User:test --operation Read --topic test --command-config /usr/local/kafka/config/admin.properties
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:test, host=*, operation=READ, permissionType=ALLOW)
- --allow-principal: 允许访问的用户主体。
- --operation: 操作类型,如
Read
、Write
等。 - --topic top 名称。
通过 kafka-ui 查看验证
查看现有ACL
root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --list --command-config /usr/local/kafka/config/admin.properties
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
(principal=User:test, host=*, operation=READ, permissionType=ALLOW)
删除ACL
删除User:test
对test
主题的读取权限:
root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --remove --allow-principal User:test --operation Read --topic test --command-config /usr/local/kafka/config/admin.properties
Are you sure you want to remove ACLs:
(principal=User:test, host=*, operation=READ, permissionType=ALLOW)
from resource filter `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`? (y/n)
y