搭建Kafka集群

  1. 将Kafka的安装包上传到虚拟机,并解压
1
2
3
cd /export/software/
tar -xvzf kafka_2.12-2.4.1.tgz -C ../server/
cd /export/server/kafka_2.12-2.4.1/
  1. 修改 server.properties
1
2
3
4
5
6
7
8
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
# 指定broker的id
broker.id=0
# 指定Kafka数据的位置
log.dirs=/export/server/kafka_2.12-2.4.1/data
# 配置zk的三个节点
zookeeper.connect=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
  1. 将安装好的kafka复制到另外两台服务器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
cd /export/server
scp -r kafka_2.12-2.4.1/ node2.itcast.cn:$PWD
scp -r kafka_2.12-2.4.1/ node3.itcast.cn:$PWD

修改另外两个节点的broker.id分别为1和2
---------node2.itcast.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim erver.properties
broker.id=1

---------node3.itcast.cn--------------
cd /export/server/kafka_2.12-2.4.1/config
vim server.properties
broker.id=2
  1. 配置KAFKA_HOME环境变量
1
2
3
4
5
6
7
8
9
vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
export PATH=:$PATH:${KAFKA_HOME}

分发到各个节点
scp /etc/profile node2.itcast.cn:$PWD
scp /etc/profile node3.itcast.cn:$PWD
每个节点加载环境变量
source /etc/profile
  1. 启动服务器
1
2
3
4
5
6
7
# 启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
# 测试Kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.itcast.cn:9092 --list

目录结构分析

目录名称 说明
bin Kafka的所有执行脚本都在这里。例如:启动Kafka服务器、创建Topic、生产者、消费者程序等等
config Kafka的所有配置文件
libs 运行Kafka所需要的所有JAR包
logs Kafka的所有日志文件,如果Kafka出现一些问题,需要到该目录中去查看异常信息
site-docs Kafka的网站帮助文件

Kafka一键启动/关闭脚本

为了方便将来进行一键启动、关闭Kafka,我们可以编写一个shell脚本来操作。将来只要执行一次该脚本就可以快速启动/关闭Kafka。

  1. 在节点1中创建 /export/onekey 目录

cd /export/onekey

  1. 准备slave配置文件,用于保存要启动哪几个节点上的kafka
1
2
3
node1.itcast.cn
node2.itcast.cn
node3.itcast.cn
  1. 编写start-kafka.sh脚本
1
2
3
4
5
6
7
8
9
vim start-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;export JMX_PORT=9988;nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >/dev/nul* 2>&1 & "
}&
wait
done
  1. 编写stop-kafka.sh脚本
1
2
3
4
5
6
7
8
9
vim stop-kafka.sh
cat /export/onekey/slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -d' ' -f1 |xargs kill -s 9"
}&
wait
done
  1. 给start-kafka.sh、stop-kafka.sh配置执行权限
1
2
chmod u+x start-kafka.sh
chmod u+x stop-kafka.sh
  1. 执行一键启动、一键关闭
1
2
./start-kafka.sh
./stop-kafka.sh

基础操作

创建topic

创建一个topic(主题)。Kafka中所有的消息都是保存在主题中,要生产消息到Kafka,首先必须要有一个确定的主题。

1
2
3
4
# 创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.itcast.cn:9092 --topic test
# 查看目前Kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.itcast.cn:9092

生产消息到Kafka

使用Kafka内置的测试程序,生产一些消息到Kafka的test主题中。

1
bin/kafka-console-producer.sh --broker-list node1.itcast.cn:9092 --topic test

从Kafka消费消息

使用下面的命令来消费 test 主题中的消息。

1
bin/kafka-console-consumer.sh --bootstrap-server node1.itcast.cn:9092 --topic test --from-beginning

使用Kafka Tools操作Kafka

连接Kafka集群

安装Kafka Tools后启动Kafka

创建topic

安装Kafka-Eagle

开启Kafka JMX 接口

JMX(Java Management Extensions)是一个为应用程序植入管理功能的框架。JMX是一套标准的代理和服务,实际上,用户可以在任何Java应用程序中使用这些代理和服务实现管理。很多的一些软件都提供了JMX接口,来实现一些管理、监控功能。

在启动Kafka的脚本前,添加:

1
2
3
cd ${KAFKA_HOME}
export JMX_PORT=9988
nohup bin/kafka-server-start.sh config/server.properties &

安装Kafka-Eagle

  1. 安装JDK,并配置好JAVA_HOME。
  2. 将kafka_eagle上传,并解压到 /export/server 目录中。
1
2
3
4
5
cd cd /export/software/
tar -xvzf kafka-eagle-bin-1.4.6.tar.gz -C ../server/
cd /export/server/kafka-eagle-bin-1.4.6/
tar -xvzf kafka-eagle-web-1.4.6-bin.tar.gz
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
  1. 配置 kafka_eagle 环境变量。

vim /etc/profile

1
2
export KE_HOME=/export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6
export PATH=$PATH:$KE_HOME/bin

source /etc/profile

  1. 配置 kafka_eagle。使用vi打开conf目录下的system-config.properties

vim conf/system-config.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 修改第4行,配置kafka集群别名
kafka.eagle.zk.cluster.alias=cluster1
# 修改第5行,配置ZK集群地址
cluster1.zk.list=node1.itcast.cn:2181,node2.itcast.cn:2181,node3.itcast.cn:2181
# 注释第6行
#cluster2.zk.list=xdn10:2181,xdn11:2181,xdn12:2181

# 修改第32行,打开图标统计
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=30

# 注释第69行,取消sqlite数据库连接配置
#kafka.eagle.driver=org.sqlite.JDBC
#kafka.eagle.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
#kafka.eagle.username=root
#kafka.eagle.password=www.kafka-eagle.org

# 修改第77行,开启mys
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node1.itcast.cn:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  1. 配置JAVA_HOME
1
2
3
4
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
vim ke.sh
# 在第24行添加JAVA_HOME环境配置
export JAVA_HOME=/export/server/jdk1.8.0_241
  1. 修改Kafka eagle可执行权限
1
2
cd /export/server/kafka-eagle-bin-1.4.6/kafka-eagle-web-1.4.6/bin
chmod +x ke.sh
  1. 启动 kafka_eagle。
1
./ke.sh start
  1. 访问Kafka eagle,默认用户为admin,密码为:123456

http://node1.itcast.cn:8048/ke