kafka|KRaft跨集群数据镜像

实验环境

主机名/IP角色系统
server01/192.168.1.201controllerubuntu22.04
server02/192.168.1.202brokerubuntu22.04
server03/192.168.1.203controllerubuntu22.04
server04/192.168.1.204brokerubuntu22.04
server05/192.168.1.205mm2ubuntu22.04

All Nodes

安装kafka

apt install openjdk-11-jre-headless -y
wget https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz
tar -zxvf kafka_2.13-3.8.0.tgz -C /usr/local/etc

配置简单集群

server01

修改配置文件

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.201:9093
listeners=CONTROLLER://192.168.1.201:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT

server02

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.201:9093
listeners=PLAINTEXT://192.168.1.202:9092
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

server03

修改配置文件

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.203:9093
listeners=CONTROLLER://192.168.1.203:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT

server04

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.203:9093
listeners=PLAINTEXT://192.168.1.204:9092
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT

server01

生成uuid

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh random-uuid
# 这个生成的uuid后面要用在其他节点上
viJ79TulTXaN7yG53fdp7w

server03

生成uuid

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh random-uuid
# 这个生成的uuid后面要用在其他节点上
SDSD9TulTXsdssda2fdp7w

server01 && server02

初始化集群数据目录

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh format -t viJ79TulTXaN7yG53fdp7w -c /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

server03 && server04

初始化集群数据目录

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-storage.sh format -t SDSD9TulTXsdssda2fdp7w -c /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

测试集群

server05

查看集群状态

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.202:9092 describe --status
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.204:9092 describe --status

配置用户

server05

创建用户

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.202:9092 --alter  --entity-type users --entity-name wakamizu --add-config 'SCRAM-SHA-512=[password=wakamizu]'
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.204:9092 --alter  --entity-type users --entity-name wakamizu --add-config 'SCRAM-SHA-512=[password=wakamizu]'

查看用户列表

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.202:9092 --describe --entity-type users
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-configs.sh --bootstrap-server 192.168.1.204:9092 --describe --entity-type users

配置认证授权

server01

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
# 配置角色为控制器
process.roles=controller
# 当前节点 ID 为 1
node.id=1
# 控制器投票者,ID 为 1,地址为 192.168.1.201:9093
controller.quorum.voters=1@192.168.1.201:9093
# 控制器监听地址为 192.168.1.201:9093,协议为 CONTROLLER
listeners=CONTROLLER://192.168.1.201:9093
# 控制器监听器名称为 CONTROLLER
controller.listener.names=CONTROLLER
# CONTROLLER 协议对应 PLAINTEXT 安全协议
listener.security.protocol.map=CONTROLLER:PLAINTEXT
# 使用 org.apache.kafka.metadata.authorizer.StandardAuthorizer 作为授权类
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 如果没有找到 ACL,拒绝所有人访问
allow.everyone.if.no.acl.found=true

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

server02

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
# 配置角色为 broker
process.roles=broker
# 当前节点 ID 为 2
node.id=2
# 控制器投票者,ID 为 1,地址为 192.168.1.201:9093
controller.quorum.voters=1@192.168.1.201:9093
# 监听地址为 9092 和 9091,协议分别为 CLIENT 和 BROKER
listeners=CLIENT://:9092,BROKER://:9091
# broker 间通信的监听器名称为 BROKER
inter.broker.listener.name=BROKER
# 对外宣告的监听地址为 192.168.1.202:9092 和 192.168.1.202:9091
advertised.listeners=CLIENT://192.168.1.202:9092,BROKER://192.168.1.202:9091
# 启用的 SASL 机制为 SCRAM-SHA-512 和 PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAINTEXT
# broker 间通信使用的 SASL 机制为 PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAINTEXT
# 超级用户为 User:wakamizu
super.users=User:wakamizu
# 如果没有找到 ACL,拒绝所有人访问
allow.everyone.if.no.acl.found=true
# 使用 org.apache.kafka.metadata.authorizer.StandardAuthorizer 作为授权类
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
# 控制器监听器名称为 CONTROLLER
controller.listener.names=CONTROLLER
# 安全协议映射,CONTROLLER 使用 PLAINTEXT,BROKER 使用 PLAINTEXT,CLIENT 使用 SASL_PLAINTEXT
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:SASL_PLAINTEXT
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
};
vim /usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf"

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

server03

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=controller
node.id=1
controller.quorum.voters=1@192.168.1.203:9093
listeners=CONTROLLER://192.168.1.203:9093
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
allow.everyone.if.no.acl.found=true

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

server04

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties
process.roles=broker
node.id=2
controller.quorum.voters=1@192.168.1.203:9093
listeners=CLIENT://:9092,BROKER://:9091 
inter.broker.listener.name=BROKER
advertised.listeners=CLIENT://192.168.1.204:9092,BROKER://192.168.1.204:9091
sasl.enabled.mechanisms=SCRAM-SHA-512,PLAINTEXT                
sasl.mechanism.inter.broker.protocol=PLAINTEXT      
super.users=User:wakamizu                        
allow.everyone.if.no.acl.found=true  
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT,CLIENT:SASL_PLAINTEXT
vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf
# SASL 配置,使用 SCRAM 登录模块,用户名为 wakamizu,密码为 wakamizu
KafkaServer {
    org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
};
vim /usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh
# 设置 Kafka 的 JVM 选项,指定 JAAS 配置文件的位置
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/local/etc/kafka_2.13-3.8.0/config/kraft/jaas.conf"

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-server-start.sh  /usr/local/etc/kafka_2.13-3.8.0/config/kraft/server.properties

测试集群

server05

创建客户端凭证

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf
# 安全协议为 SASL_PLAINTEXT
security.protocol=SASL_PLAINTEXT
# SASL 机制为 SCRAM-SHA-512
sasl.mechanism=SCRAM-SHA-512
# SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";

查看集群状态

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.202:9092  --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf describe --status
/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-metadata-quorum.sh --bootstrap-server 192.168.1.204:9092  --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf describe --status

配置mm2

server05

vim /usr/local/etc/kafka_2.13-3.8.0/config/kraft/mm2.properties
# 配置集群
clusters = primary,secondary
# 主集群引导服务器地址
primary.bootstrap.servers = 192.168.1.202:9092
# 次集群引导服务器地址
secondary.bootstrap.servers = 192.168.1.204:9092
# 启用从主集群到次集群的数据流
primary->secondary.enabled = true
# 启用从次集群到主集群的数据流
secondary->primary.enabled = true
# SASL_PLAINTEXT 配置
# 主集群安全协议为 SASL_PLAINTEXT
primary.security.protocol=SASL_PLAINTEXT
# 主集群 SASL 机制为 SCRAM-SHA-512
primary.sasl.mechanism=SCRAM-SHA-512
# 主集群 SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
primary.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
# 次集群安全协议为 SASL_PLAINTEXT
secondary.security.protocol=SASL_PLAINTEXT
# 次集群 SASL 机制为 SCRAM-SHA-512
secondary.sasl.mechanism=SCRAM-SHA-512
# 次集群 SASL JAAS 配置,用户名为 wakamizu,密码为 wakamizu
secondary.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="wakamizu" password="wakamizu";
primary->secondary.topics=.*
secondary->primary.topics=.*
offset.storage.replication.factor=1
config.storage.replication.factor=1
status.storage.replication.factor=1
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
replication.factor=1
refresh.topics.interval.seconds=5

启动服务

/usr/local/etc/kafka_2.13-3.8.0/bin/connect-mirror-maker.sh /usr/local/etc/kafka_2.13-3.8.0/config/kraft/mm2.properties

测试跨集群数据镜像

server05

创建主题

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-topics.sh --create --bootstrap-server 192.168.1.202:9092 --topic wakamizu --command-config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf

集群1生产数据

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-console-producer.sh --broker-list 192.168.1.202:9092 --topic wakamizu --producer.config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf

集群2消费数据

/usr/local/etc/kafka_2.13-3.8.0/bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.204:9092 --consumer.config /usr/local/etc/kafka_2.13-3.8.0/config/kraft/client.conf   --topic wakamizu --from-beginning
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!