kafka部署

解压

1
tar -zxvf  安装包     -C /opt/module/

在/opt/module/kafka目录下创建logs文件夹

配置文件

在kafka的 config文件夹下

1
2
cd config/
vi server.properties

需要修改的:

1
2
3
4
5
6
7
8
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#配置连接Zookeeper集群地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
1
mkdir -p /opt/module/kafka/logs

尝试启动kafka

启动kafka的前提的启动zookeeper,jps 看看自己启动了没有。

进入到kafka的安装目录下执行:

1
./bin/kafka-server-start.sh config/server.properties

编写启动脚本

vim kaf.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/bin/bash

case $1 in
"start"){
for i in node01 node02 node03
do
echo " --------启动 $i Kafka-------"
ssh $i "export JMX_PORT=9988 && /opt/module/kafka_2.11-0.11.0.2/bin/kafka-server-start.sh -daemon /opt/module
/kafka_2.11-0.11.0.2/config/server.properties"
done
};;

"stop"){
for i in node01 node02 node03
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka_2.11-0.11.0.2/bin/kafka-server-stop.sh stop"
done
};;
esac

说明:启动Kafka时要先开启JMX端口,是用于后续KafkaManager监控。

给脚本运行权限: chmod 777 kaf.sh

将脚本移动到 /usr/local/bin 可以在任意位置调用此脚本

冲突预见

软件关闭顺序

在关闭kafka的时候,会发现kafka的关闭速度不是很快的,这是因为kafka中的数据状态需要在zookeeper中保存,那如果我们在kafka向zookeeper中保存数据的时候关闭了zookeeper,会发生什么?

答:kafka会面临无法关闭的问题,因为他还在向zookeeper中写入数据。

kafka中的幽灵topic

当你将此前所有的软件都启动了之后,执行指令,查看kafka的topic会发现,多出了个topic:

1598333119755

我们只是安装了kafka,还没有创建topic为什么出现了topic?

答:当你启动了flume后,flume会自动向kafka中发送数据,发送数据的过程中发现kakfa中没有topic,就会自动创建topic,并且你想要删除topic也无法删除,因为你一删除flume就会再创建一个。要删除topic必须先关闭flume,停止flume对数据的读取传输

验证kafka

开启消费者

1
[root@node01 module]# kafka-console-consumer.sh --zookeeper node01:2181 --from-beginning --topic topic_start

因为从开始消费数据,所以,如果此时你的flume监控的目录下已经存在数据的话,就会从头开始消费数据,那么就会将之前flume生成的数据全部消费一遍

1598333638144

开启生产者

1
[root@node02 ~]# kafka-console-producer.sh --broker-list node01:9092 --topic topic_start

开启生产者之后,可以正常的输入数据给消费者消费

1598333588174

kafka的管理者安装

管理者不需要分发,只装在一个节点上就行了

启动命令

1
nohup ./bin/kafka-manager -Dhttp.port=7456  >/opt/module/kafka-manager-1.3.3.22/start.log 2>&1 & 

启动之后,打开浏览器,输入 http://node01:7456/ 看到这个窗口,意味着你成功了

1598334565956

具体的使用流程呢?

https://blog.csdn.net/u011089412/article/details/87895652

启动脚本

由于这个也是我们最终全部项目启动的一部分,所以也给这个做一个启动脚本。

启动脚本因为只有一个节点上有kafka管理,所以不需要分发。

1
2
3
4
5
6
7
8
9
10
11
12
13
#! /bin/bash

case $1 in
"start"){
echo " -------- 启动 KafkaManager -------"
nohup /opt/module/kafka-manager-1.3.3.22/bin/kafka-manager -Dhttp.port=7456 >/opt/module/kafka-manager-1.3.3.22/sta
rt.log 2>&1 &
};;
"stop"){
echo " -------- 停止 KafkaManager -------"
ps -ef | grep ProdServerStart | grep -v grep |awk '{print $2}' | xargs kill
};;
esac

kafka压力测试

Kafka压测

用Kafka官方自带的脚本,对Kafka进行压测。Kafka压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO达到瓶颈。

kafka-consumer-perf-test.sh

kafka-producer-perf-test.sh

Kafka Producer压力测试

(1)在/opt/module/kafka/bin目录下面有这两个文件。我们来测试一下

[atguigu@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh –topic test –record-size 100 –num-records 100000 –throughput 1000 –producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

说明:record-size是一条信息有多大,单位是字节。num-records是总共发送多少条信息。throughput 是每秒多少条信息。

(2)Kafka会打印下面的信息

5000 records sent, 999.4 records/sec (0.10 MB/sec), 1.9 ms avg latency, 254.0 max latency.

5002 records sent, 1000.4 records/sec (0.10 MB/sec), 0.7 ms avg latency, 12.0 max latency.

5001 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 4.0 max latency.

5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.7 ms avg latency, 3.0 max latency.

5000 records sent, 1000.0 records/sec (0.10 MB/sec), 0.8 ms avg latency, 5.0 max latency.

参数解析:本例中一共写入10w条消息,每秒向Kafka写入了0.10****MB的数据,平均是1000条消息/秒,每次写入的平均延迟为0.8毫秒,最大的延迟为254毫秒。

Kafka Consumer压力测试

Consumer的测试,如果这四个指标(IO,CPU,内存,网络)都不能改变,考虑增加分区数来提升性能。

[atguigu@hadoop102 kafka]$

bin/kafka-consumer-perf-test.sh –zookeeper hadoop102:2181 –topic test –fetch-size 10000 –messages 10000000 –threads 1

参数说明:

–zookeeper 指定zookeeper的链接信息

–topic 指定topic的名称

–fetch-size 指定每次fetch的数据的大小

–messages 总共要消费的消息个数

测试结果说明:

start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg**, nMsg.sec**

2019-02-19 20:29:07:566, 2019-02-19 20:29:12:170, 9.5368, 2.0714, 100010, 21722.4153

开始测试时间,测试结束数据,最大吞吐9.5368MB/s,平均每秒消费2.0714MB/s最大每秒消费100010条,平均每秒消费21722.4153****条。

kafka集群数计算

Kafka机器数量(经验公式)=2(峰值生产速度副本数/100)+1

先要预估一天大概产生多少数据,然后用Kafka自带的生产压测(只测试Kafka的写入速度,保证数据不积压),计算出峰值生产速度。再根据设定的副本数,就能预估出需要部署Kafka的数量。

比如我们采用压力测试测出写入的速度是10M/s一台,峰值的业务数据的速度是50M/s。副本数为2。

Kafka机器数量=2(502/100)+ 1=3台

注意,一定是先要用压力测试计算出