实验环境
主机名/IP | 角色 | 系统 |
server01/192.168.1.201 | controller | ubuntu22.04 |
server02/192.168.1.202 | broker | ubuntu22.04 |
server03/192.168.1.203 | controller | ubuntu22.04 |
server04/192.168.1.204 | broker | ubuntu22.04 |
server05/192.168.1.205 | mm2 | ubuntu22.04 |
All Nodes
安装kafka
apt install openjdk-11-jre-headless -y
wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
tar -zxvf kafka_2.13-3.8.0.tgz -C /usr/local/etc
配置简单集群
server01
修改配置文件
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.201:9093
listeners=CONTROLLER://192.168.1.201:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT
server02
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.201:9093
listeners=PLAINTEXT://192.168.1.202:9092
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
server03
修改配置文件
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.203:9093
listeners=CONTROLLER://192.168.1.203:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT
server04
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.203:9093
listeners=PLAINTEXT://192.168.1.204:9092
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
server01
生成uuid
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh random-uuid
# 这个生成的uuid后面要用在其他节点上
viJ79TulTXaN7yG53fdp7w
server03
生成uuid
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh random-uuid
# 这个生成的uuid后面要用在其他节点上
SDSD9TulTXsdssda2fdp7w
server01 && server02
初始化集群数据目录
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh format -t viJ79TulTXaN7yG53fdp7w -c /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
server03 && server04
初始化集群数据目录
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh format -t SDSD9TulTXsdssda2fdp7w -c /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
测试集群
server05
查看集群状态
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.202:9092 describe --status
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.204:9092 describe --status
配置用户
server05
创建用户
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.202:9092 --alter --entity-type users --entity-name wakamizu --add-config 'SCRAM-SHA-512=[password=wakamizu]'
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.204:9092 --alter --entity-type users --entity-name wakamizu --add-config 'SCRAM-SHA-512=[password=wakamizu]'
查看用户列表
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.202:9092 --describe --entity-type users
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.204:9092 --describe --entity-type users
配置认证授权
server01
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
# 配置角色为控制器
process.roles=controller
# 当前节点 ID 为 1
node.id=1
# 控制器投票者,ID 为 1,地址为 192.168.1.201:9093
controller.quorum.voters=1@192.168.1.201:9093
# 控制器监听地址为 192.168.1.201:9093,协议为 CONTROLLER
listeners=CONTROLLER://192.168.1.201:9093
# 控制器监听器名称为 CONTROLLER
controller.listener.names=CONTROLLER
# CONTROLLER 协议对应 PLAINTEXT 安全协议
listener.security.protocol.map=CONTROLLER:PLAINTEXT
# 使用 org.apache.kafka.metadata.authorizer.StandardAuthorizer 作为授权类
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 如果没有找到 ACL,拒绝所有人访问
allow.everyone.if.no.acl.found=true
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
server02
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
# 配置角色为 broker
process.roles=broker
# 当前节点 ID 为 2
node.id=2
# 控制器投票者,ID 为 1,地址为 192.168.1.201:9093
controller.quorum.voters=1@192.168.1.201:9093
# 监听地址为 9092 和 9091,协议分别为 CLIENT 和 BROKER
listeners=CLIENT://:9092,BROKER://:9091
# broker 间通信的监听器名称为 BROKER
inter.broker.listener.name=BROKER
# 对外宣告的监听地址为 192.168.1.202:9092 和 192.168.1.202:9091
advertised.listeners=CLIENT://192.168.1.202:9092,BROKER://192.168.1.202:9091
# 启用的 SASL 机制为 SCRAM-SHA-512 和 PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAINTEXT
# broker 间通信使用的 SASL 机制为 PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAINTEXT
# 超级用户为 User:wakamizu
super.users=User:wakamizu
# 如果没有找到 ACL,拒绝所有人访问
allow.everyone.if.no.acl.found=true
# 使用 org.apache.kafka.metadata.authorizer.StandardAuthorizer 作为授权类
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 控制器监听器名称为 CONTROLLER
controller.listener.names=CONTROLLER
# 安全协议映射,CONTROLLER 使用 PLAINTEXT,BROKER 使用 PLAINTEXT,CLIENT 使用 SASL_PLAINTEXT
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:SASL_PLAINTEXT
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
};
vim /usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf"
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
server03
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.203:9093
listeners=CONTROLLER://192.168.1.203:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
server04
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.203:9093
listeners=CLIENT://:9092,BROKER://:9091
inter.broker.listener.name=BROKER
advertised.listeners=CLIENT://192.168.1.204:9092,BROKER://192.168.1.204:9091
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAINTEXT
super.users=User:wakamizu
allow.everyone.if.no.acl.found=true
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:SASL_PLAINTEXT
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf
# SASL 配置,使用 SCRAM 登录模块,用户名为 wakamizu,密码为 wakamizu
KafkaServer {
org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
};
vim /usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh
# 设置 Kafka 的 JVM 选项,指定 JAAS 配置文件的位置
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf"
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
测试集群
server05
创建客户端凭证
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf
# 安全协议为 SASL_PLAINTEXT
security.protocol=SASL_PLAINTEXT
# SASL 机制为 SCRAM-SHA-512
sasl.mechanism=SCRAM-SHA-512
# SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
查看集群状态
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.202:9092 --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf describe --status
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.204:9092 --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf describe --status
配置mm2
server05
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/mm2.properties
# 配置集群
clusters = primary,secondary
# 主集群引导服务器地址
primary.bootstrap.servers = 192.168.1.202:9092
# 次集群引导服务器地址
secondary.bootstrap.servers = 192.168.1.204:9092
# 启用从主集群到次集群的数据流
primary->secondary.enabled = true
# 启用从次集群到主集群的数据流
secondary->primary.enabled = true
# SASL_PLAINTEXT 配置
# 主集群安全协议为 SASL_PLAINTEXT
primary.security.protocol=SASL_PLAINTEXT
# 主集群 SASL 机制为 SCRAM-SHA-512
primary.sasl.mechanism=SCRAM-SHA-512
# 主集群 SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
primary.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
# 次集群安全协议为 SASL_PLAINTEXT
secondary.security.protocol=SASL_PLAINTEXT
# 次集群 SASL 机制为 SCRAM-SHA-512
secondary.sasl.mechanism=SCRAM-SHA-512
# 次集群 SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
secondary.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
primary->secondary.topics=.*
secondary->primary.topics=.*
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
replication.factor=1
refresh.topics.interval.seconds=5
启动服务
/usr/local/etc/kafka_2.13-3.8.0/bin/connect-mirror-maker.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/mm2.properties
测试跨集群数据镜像
server05
创建主题
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.202:9092 --topic wakamizu --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf
集群1生产数据
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-console-producer.sh --broker-list 192.168.1.202:9092 --topic wakamizu --producer.config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf
集群2消费数据
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.204:9092 --consumer.config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf --topic wakamizu --from-beginning