Kafka TLS加密及SASL_SSL配置

2025-07-26 64 0

TLS加密

准备证书

root@kafka01:~# mkdir /usr/local/kafka/pki
root@kafka01:~# cd /usr/local/kafka/pki/

# 生成 CA 证书
root@kafka01:/usr/local/kafka/pki# openssl req -x509 -nodes -days 3650 -newkey rsa:4096 -keyout ca.key -out ca.crt -subj "/CN=Kafka-CA"

# 生成私钥
root@kafka01:/usr/local/kafka/pki# openssl genrsa -out kafka.key 4096

# 生成证书签名请求 (CSR)
root@kafka01:/usr/local/kafka/pki# openssl req -new -key kafka.key -out kafka.csr -subj "/CN=kafka-cluster"

# 创建包含所有节点的SAN 配置文件
root@kafka01:/usr/local/kafka/pki# cat > san.cnf << EOF
[ req ]
distinguished_name = req_distinguished_name
req_extensions = req_ext
prompt = no

[ req_distinguished_name ]
CN = kafka-cluster

[ req_ext ]
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = serverAuth, clientAuth
subjectAltName = @alt_names

[ alt_names ]
# 节点主机名与ip
DNS.1 = kafka01
DNS.2 = kafka02
DNS.3 = kafka03
IP.1 = 192.168.1.111
IP.2 = 192.168.1.112
IP.3 = 192.168.1.113
EOF

# 签署证书
root@kafka01:/usr/local/kafka/pki# openssl x509 -req -in kafka.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out kafka.crt -days 3650 -extfile san.cnf -extensions req_ext

# 验证证书
root@kafka01:/usr/local/kafka/pki# openssl x509 -in kafka.crt -text -noout | grep -A 1 "Subject Alternative Name"
            X509v3 Subject Alternative Name: 
                DNS:kafka01, DNS:kafka02, DNS:kafka03, IP Address:192.168.1.111, IP Address:192.168.1.112, IP Address:192.168.1.113

root@kafka01:/usr/local/kafka/pki# ls -l
total 24
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root  423 Jul 26 16:03 san.cnf

创建 Keystore

将证书和私钥转换为 PKCS12 文件

root@kafka01:/usr/local/kafka/pki# openssl pkcs12 -export -in kafka.crt -inkey kafka.key -out kafka.p12 -name kafka-cert -CAfile ca.crt -caname root -passout pass:sundayhk.com

使用 keytoolkafka.p12 文件导入到 Keystore:

root@kafka01:/usr/local/kafka/pki# keytool -importkeystore \
  -deststorepass sundayhk.com \
  -destkeypass sundayhk.com \
  -destkeystore kafka.keystore.jks \
  -srckeystore kafka.p12 \
  -srcstoretype PKCS12 \
  -srcstorepass sundayhk.com \
  -alias kafka-cert

root@kafka01:/usr/local/kafka/pki# ls -l
total 40
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root 4400 Jul 26 16:05 kafka.keystore.jks
-rw------- 1 root root 4360 Jul 26 16:05 kafka.p12
-rw-r--r-- 1 root root  423 Jul 26 16:03 san.cnf

创建 Truststore

使用 keytool 创建 Truststore 并导入 CA 证书:

root@kafka01:/usr/local/kafka/pki# keytool -import \
  -file ca.crt \
  -keystore kafka.truststore.jks \
  -storepass sundayhk.com \
  -alias root

Trust this certificate? [no]:  yes
Certificate was added to keystore

root@kafka01:/usr/local/kafka/pki# ls -l
total 44
-rw-r--r-- 1 root root 1805 Jul 26 16:01 ca.crt
-rw------- 1 root root 3272 Jul 26 16:01 ca.key
-rw-r--r-- 1 root root 1931 Jul 26 16:04 kafka.crt
-rw-r--r-- 1 root root 1590 Jul 26 16:02 kafka.csr
-rw------- 1 root root 3272 Jul 26 16:01 kafka.key
-rw-r--r-- 1 root root 4400 Jul 26 16:05 kafka.keystore.jks
-rw------- 1 root root 4360 Jul 26 16:05 kafka.p12
-rw-r--r-- 1 root root 1654 Jul 26 16:06 kafka.truststore.jks
-rw-r--r-- 1 root root  423 Jul 26 16:03 san.cnf

分发文件

将kafka.truststore.jks 和kafka.keystore.jks 文件分发到其他 kafka 节点

root@kafka01:/usr/local/kafka/pki# scp kafka.keystore.jks kafka.truststore.jks  192.168.1.112:/usr/local/kafka/pki/
root@kafka01:/usr/local/kafka/pki# scp kafka.keystore.jks kafka.truststore.jks  192.168.1.113:/usr/local/kafka/pki/

Kafka服务端配置 TLS

在 Kafka KRaft 模式下的 server.properties 文件中,添加以下配置:

root@kafka01:~# vim /usr/local/kafka/config/kraft/server.properties
# 修改SSL配置
listeners=SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SSL
advertised.listeners=SSL://192.168.1.111:9092,CONTROLLER://192.168.1.111:9093

# 新增Keystore配置
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.key.password=sundayhk.com
# 新增Truststore配置
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
# 客户端连接时启用ssl
ssl.client.auth=required

重启 kafka

root@kafka01:~# systemctl restart kafka

客户端配置 TLS

所有节点操作
创建客户端配置文件,指定证书信息 admin.properties文件内容:

root@kafka01:~# cat > /usr/local/kafka/config/admin.properties << EOF
security.protocol=SSL
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
ssl.endpoint.identification.algorithm=
ssl.key.password=sundayhk.com
EOF

连接 kafka 集群测试

# 查看节点信息
root@kafka01:~# kafka-broker-api-versions.sh --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties

192.168.1.112:9092 (id: 2 rack: null) -> (
……
)
192.168.1.111:9092 (id: 1 rack: null) -> (
……
)
192.168.1.113:9092 (id: 3 rack: null) -> (
……
)

# 查看topic信息
root@kafka01:~# kafka-topics.sh --describe --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties
Topic: test     TopicId: RhzPTC_eRL-etwrn3p5z-g PartitionCount: 3       ReplicationFactor: 2    Configs: segment.bytes=1073741824
        Topic: test     Partition: 0    Leader: 3       Replicas: 3,1   Isr: 3,1        Elr:    LastKnownElr: 
        Topic: test     Partition: 1    Leader: 1       Replicas: 1,2   Isr: 1,2        Elr:    LastKnownElr:
        Topic: test     Partition: 2    Leader: 2       Replicas: 2,3   Isr: 3,2        Elr:    LastKnownElr:

生产消费消息测试

# 生产者发送消息
root@kafka02:~# kafka-console-producer.sh --bootstrap-server 192.168.1.111:9092 --topic test --producer.config /usr/local/kafka/config/admin.properties
>hello ssl

# 消费者接收消息
root@kafka03:~# kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --from-beginning --consumer.config /usr/local/kafka/config/admin.properties
hello ssl

kafka-ui 配置 TLS

修改 kafka-ui 配置文件

root@kafka01:~# cat > /usr/local/kafka-ui/config.yml << EOF
kafka:
  clusters:
    -
      name: kafka-cluster
      bootstrapServers: 192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
      metrics:
        port: 9997
        type: JMX
      properties:
        security:
          protocol: SSL
        ssl:
          keystore:
            location: /usr/local/kafka/pki/kafka.keystore.jks
            password: sundayhk.com
        ssl_endpoint_identification_algorithm: ''
      ssl:
        truststorelocation: /usr/local/kafka/pki/kafka.truststore.jks
        truststorepassword: sundayhk.com
EOF

重启 kafka-ui 并验证

root@kafka01:~# systemctl restart kafka-ui

PLAIN认证

在Kafka中,SASL(Simple Authentication and Security Layer)机制包括三种常见的身份验证方式:

  1. SASL/PLAIN认证:含义是简单身份验证和授权层应用程序接口,PLAIN认证是其中一种最简单的用户名、密码认证方式,生产环境使用维护简单易用。可用于Kafka和其他应用程序之间的认证。
  2. SASL/SCRAM认证:SCRAM-SHA-256、SCRAM-SHA-512方式认证,本认证需要客户端、服务器共同协同完成认证过程,使用和维护上较为复杂。优势是可动态增加用户,而不必重启kafka组件服务端。
  3. SASL/GSSAPI 认证:Kerberos认证,本认证适用于大型公司企业生产环境,通常结合Kerberos协议使用。使用Kerberos认证可集成目录服务,比如AD。通过本认证机制可实现优秀的安全性和良好的用户体验。

创建Kraft账号密码认证文件

以下操作在所有节点执行。

创建两个用户,分别为admin、test(此处仅用于演示,实际生产环境建议按业务创建多个不同的账号,并配置对指定 topic 的读写权限)

cat << EOF > /usr/local/kafka/config/kafka_server_jaas.conf
KafkaServer {
  org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret"
    user_admin="admin-secret"
    user_test="test-secret";
    user_alice="alice-secret";
};
EOF

该配置通过org.apache.kafka.common.security.plain.PlainLoginModule由指定采用PLAIN机制,定义了用户。

usemame和password指定该代理与集群其他代理初始化连接的用户名和密码

user_admin="admin-secret", 这个表示一个用户名为admin用户,密码是admin-secret,这个必须要有一个,且要这一个跟上面的username和password保持一致。

user_test="test-secret" 是第二个用户,表示的是用户名为test的账户,密码为test-secret。

修改 kafka 配置文件

Kafka broker 的 server.properties 配置文件,来启用 SASL/PLAIN 认证。以下是需要配置的参数

root@kafka01:~# vim /usr/local/kafka/config/kraft/server.properties
# 修改以下配置
listeners=SASL_SSL://:9092,CONTROLLER://:9093
inter.broker.listener.name=SASL_SSL
advertised.listeners=SASL_SSL://192.168.1.111:9092,CONTROLLER://192.168.1.111:9093
# 节点间CONTROLLER映射为SASL_PLAINTEXT认证
listener.security.protocol.map=CONTROLLER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# 新增以下配置
# 设置 SASL 认证机制
sasl.enabled.mechanisms=PLAIN
# 集群间认证时用的认证方式
sasl.mechanism.inter.broker.protocol=PLAIN
# 指定Kafka 客户端与 Broker 之间使用的 SASL 认证机制
sasl.mechanism=PLAIN
# 指定控制器通信时使用的认证机制
sasl.mechanism.controller.protocol=PLAIN
# 配置 SASL 认证存储方式为文件
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 设置必须授权才能用
allow.everyone.if.no.acl.found=false
# 配置超级用户
super.users=User:admin

修改启动脚本

root@kafka01:~# vim /usr/local/kafka/bin/kafka-server-start.sh
# 新增-Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf参数
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G -Djava.security.auth.login.config=/usr/local/kafka/config/kafka_server_jaas.conf"
fi

重启 kafka 集群

root@kafka01:~# systemctl restart kafka

客户端使用账户密码认证

修改客户端配置文件,新增认证信息

cat > /usr/local/kafka/config/admin.properties << EOF
bootstrap.servers=192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
ssl.keystore.location=/usr/local/kafka/pki/kafka.keystore.jks
ssl.keystore.password=sundayhk.com
ssl.truststore.location=/usr/local/kafka/pki/kafka.truststore.jks
ssl.truststore.password=sundayhk.com
ssl.endpoint.identification.algorithm=
ssl.key.password=sundayhk.com
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";
EOF

查看 boorker 信息

root@kafka01:~# kafka-broker-api-versions.sh --bootstrap-server 192.168.1.111:9092 --command-config /usr/local/kafka/config/admin.properties

192.168.1.112:9092 (id: 2 rack: null) -> (
        ……
)
192.168.1.113:9092 (id: 3 rack: null) -> (
        ……
)
192.168.1.111:9092 (id: 1 rack: null) -> (
        ……
)

查看 topic 信息

root@kafka01:~# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.1.111:9092 --list --command-config /usr/local/kafka/config/admin.properties

__consumer_offsets
test

创建客户端认证文件

cat > /usr/local/kafka/config/client_jaas.conf << EOF
KafkaClient {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin-secret";
};
EOF

修改生产者和消费者脚本,添加-Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf

root@kafka01:~# vim /usr/local/kafka/bin/kafka-console-producer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf kafka.tools.ConsoleProducer "$@"

root@kafka01:~# vim /usr/local/kafka/bin/kafka-console-consumer.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=/usr/local/kafka/config/client_jaas.conf org.apache.kafka.tools.consumer.ConsoleConsumer "$@"

生产者发送消息

root@kafka01:~# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.1.111:9092 --topic test --producer.config /usr/local/kafka/config/admin.properties
> hello kafka

消费者消费消息

root@kafka01:~# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.111:9092 --topic test --consumer.config /usr/local/kafka/config/admin.properties --from-beginning
hello kafka

kafka-ui 使用账号密码认证

更新 kafka-ui 配置文件

root@kafka01:~# vim/usr/local/kafka-ui/config.yml
kafka:
  clusters:
    - name: kafka-cluster
      bootstrapServers: 192.168.1.111:9092,192.168.1.112:9092,192.168.1.113:9092
      metrics:
        port: 9997
        type: JMX
      properties:
        security.protocol: SASL_SSL
        sasl.mechanism: PLAIN
        sasl.jaas.config: 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-secret";'
        ssl.keystore.location: /usr/local/kafka/pki/kafka.keystore.jks
        ssl.keystore.password: sundayhk.com
        # ssl.key.password: sundayhk.com
        ssl.truststore.location: /usr/local/kafka/pki/kafka.truststore.jks
        ssl.truststore.password: sundayhk.com
        ssl.endpoint.identification.algorithm: ''

重启 kafka-ui 验证

root@kafka01:~# systemctl restart kafka-ui

ACL权限

在Kafka中,ACL(Access Control List)是用来控制谁可以访问Kafka资源(如主题、消费者组等)的权限机制。ACL配置基于Kafka的kafka-acls.sh工具,能够管理对资源的读取、写入等操作权限。

Kafka ACL的基本概念

Kafka的ACL是基于以下几个方面的:

  • 资源类型(Resource Type): Kafka支持多种资源类型,包括主题(Topic)、消费者组(Consumer Group)、Kafka集群本身(Cluster)等。
  • 操作类型(Operation Type): 如Read(读取)、Write(写入)、Create(创建)、Describe(描述)、Alter(修改)等。
  • 权限类型(Permission Type): Allow表示允许访问,Deny表示拒绝访问。
  • 主体(Principal): 访问Kafka的用户或客户端。Kafka支持通过SASL认证系统中的用户来定义主体,通常是User:<username>的形式。

添加ACL

给用户User:test添加对test主题的读取权限:

root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --add --allow-principal User:test --operation Read --topic test --command-config /usr/local/kafka/config/admin.properties
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
        (principal=User:test, host=*, operation=READ, permissionType=ALLOW)
  • --allow-principal: 允许访问的用户主体。
  • --operation: 操作类型,如ReadWrite等。
  • --topic top 名称。

通过 kafka-ui 查看验证

image.png

查看现有ACL

root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --list --command-config /usr/local/kafka/config/admin.properties

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`: 
    (principal=User:test, host=*, operation=READ, permissionType=ALLOW) 

删除ACL

删除User:testtest主题的读取权限:

root@kafka01:~# kafka-acls.sh --bootstrap-server 192.168.1.111:9092 --remove --allow-principal User:test --operation Read --topic test --command-config /usr/local/kafka/config/admin.properties

Are you sure you want to remove ACLs: 
        (principal=User:test, host=*, operation=READ, permissionType=ALLOW) 
 from resource filter `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`? (y/n)
y

相关文章

Kafka集群部署(Kraft模式)

发布评论