搭建Kafka集群
将Kafka的安装包上传到虚拟机,并解压
1 2 3 cd /export/software/ tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/ cd /export/server/kafka_2.12-2.4.1/
修改 server.properties
1 2 3 4 5 6 7 8 cd /export/server/kafka_2.12-2.4.1/config vim server.properties # 指定broker的id broker.id=0 # 指定Kafka数据的位置 log.dirs=/export/server/kafka_2.12-2.4.1/data # 配置zk的三个节点 zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
将安装好的kafka复制到另外两台服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 cd /export/server scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD 修改另外两个节点的broker.id分别为1和2 ---------node2.itcast.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim erver.properties broker.id=1 ---------node3.itcast.cn-------------- cd /export/server/kafka_2.12-2.4.1/config vim server.properties broker.id=2
配置KAFKA_HOME环境变量
1 2 3 4 5 6 7 8 9 vim /etc/profile export KAFKA_HOME=/export/server/kafka_2.12-2.4.1 export PATH=:$PATH:${KAFKA_HOME} 分发到各个节点 scp /etc/profile node2.itcast.cn:$PWD scp /etc/profile node3.itcast.cn:$PWD 每个节点加载环境变量 source /etc/profile
启动服务器
1 2 3 4 5 6 7 # 启动ZooKeeper nohup bin/zookeeper-server-start.sh config/zookeeper.properties & # 启动Kafka cd /export/server/kafka_2.12-2.4.1 nohup bin/kafka-server-start.sh config/server.properties & # 测试Kafka集群是否启动成功 bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list
目录结构分析
目录名称
说明
bin
Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
config
Kafka的所有配置文件
libs
运行Kafka所需要的所有JAR包
logs
Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
site-docs
Kafka的网站帮助文件
Kafka一键启动/关闭脚本 为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。
在节点1中创建 /export/onekey 目录
cd /export/onekey
准备slave配置文件,用于保存要启动哪几个节点上的kafka
1 2 3 node1.itcast.cn node2.itcast.cn node3.itcast.cn
编写start-kafka.sh脚本
1 2 3 4 5 6 7 8 9 vim start-kafka.sh cat /export/onekey/slave | while read line do { echo $line ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & " }& wait done
编写stop-kafka.sh脚本
1 2 3 4 5 6 7 8 9 vim stop-kafka.sh cat /export/onekey/slave | while read line do { echo $line ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9" }& wait done
给start-kafka.sh、stop-kafka.sh配置执行权限
1 2 chmod u+x start-kafka.sh chmod u+x stop-kafka.sh
执行一键启动、一键关闭
1 2 ./start-kafka.sh ./stop-kafka.sh
基础操作
创建topic 创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。
1 2 3 4 # 创建名为test 的主题 bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test # 查看目前Kafka中的主题 bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092
生产消息到Kafka 使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。
1 bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test
从Kafka消费消息 使用下面的命令来消费 test 主题中的消息。
1 bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning
连接Kafka集群
安装Kafka Tools后启动Kafka
创建topic
安装Kafka-Eagle 开启Kafka JMX 接口 JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。
在启动Kafka的脚本前,添加:
1 2 3 cd ${KAFKA_HOME} export JMX_PORT=9988 nohup bin/kafka-server-start.sh config/server.properties &
安装Kafka-Eagle
安装JDK,并配置好JAVA_HOME。
将kafka_eagle上传,并解压到 /export/server 目录中。
1 2 3 4 5 cd cd /export/software/ tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/ cd /export/server/kafka-eagle-bin-1.4.6/ tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
配置 kafka_eagle 环境变量。
vim /etc/profile
1 2 export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6 export PATH=$PATH:$KE_HOME/bin
source /etc/profile
配置 kafka_eagle。使用vi打开conf目录下的system-config.properties
vim conf/system-config.properties
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 # 修改第4行,配置kafka集群别名 kafka.eagle.zk.cluster.alias=cluster1 # 修改第5行,配置ZK集群地址 cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181 # 注释第6行 # cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181 # 修改第32行,打开图标统计 kafka.eagle.metrics.charts=true kafka.eagle.metrics.retain=30 # 注释第69行,取消sqlite数据库连接配置 # kafka.eagle.driver=org.sqlite.JDBC # kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db # kafka.eagle.username=root # kafka.eagle.password=www.kafka-eagle.org # 修改第77行,开启mys kafka.eagle.driver=com.mysql.jdbc.Driver kafka.eagle.url=jdbc:mysql://node1.itcast.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull kafka.eagle.username=root kafka.eagle.password=123456
配置JAVA_HOME
1 2 3 4 cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin vim ke.sh # 在第24行添加JAVA_HOME环境配置 export JAVA_HOME=/export/server/jdk1.8.0_241
修改Kafka eagle可执行权限
1 2 cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin chmod +x ke.sh
启动 kafka_eagle。
访问Kafka eagle,默认用户为admin,密码为:123456
http://node1.itcast.cn:8048/ke