企业级数仓项目6
flume消费kafka数据
流程
规划:
服务器hadoop102 | 服务器hadoop103 | 服务器hadoop104 | |
---|---|---|---|
Flume(消费Kafka) | Flume |
我们的进度来到了使用flume消费kafka数据了。
由图可知,我们现在要创建两组flume控件
实现
根据集群规划来看,这个采集任务要交给node03来执行,/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件
vim kafka-flume-hdfs.conf
1 | ## 组件 |
配置解释
batchSize 是 指定文件读取数量
batchDurationMillis 是 读取数据的延迟
启动读取kafka的flume
再node03上启动
1 | /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka_flume_hdfs.conf - |
我预期的效果是flume会从kafka集群中读取所有数据,将压缩为lzo的格式的数据提交到hdfs上,并且生成两个文件夹,分别是 topic_event、topic_start 但是我只有一个topic_event生成了。我查看日志:
1 | vim /opt/module/flume/log.txt |
发现日志中的报错为:
Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}
表示无法识别kafka hostname ??经过百度,发现需要修改的是kafka的配置,没错,我上一篇kafka配置不够全面。解决办法:
打开kafka的server.properties
1 | vim /opt/module/kafka_2.11-0.11.0.2/config/server.properties |
修改一个监听配置,让服务器之间对于kafka的监听端口能够识别得到
1 | # Hostname and port the broker will advertise to producers and consumers. If not set, |
之后,要先关闭 采集kafka的flume!!,再关闭采集本地数据传递到kafka集群的flume,最后重启kafka,然后重启两个采集服务
性能优化
- 问题描述:如果启动消费Flume抛出如下异常
ERROR hdfs.HDFSEventSink: process failed
java.lang.OutOfMemoryError: GC overhead limit exceeded
- 解决方案步骤:
在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置
export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”
- Flume内存参数设置及优化
JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)
-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
flume组件
- FileChannel和MemoryChannel区别
MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。
FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。
- FileChannel优化
通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
官方说明如下:
Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据
- Sink:HDFS Sink
(1)HDFS存入大量小文件,有什么影响?
元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命
计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。
(2)HDFS小文件处理
官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount
基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0,hdfs.roundValue=10,hdfs.roundUnit= second几个参数综合作用,这几个参数,只要有一个达到了,就会产生效果,效果如下:
(1)hdfs.rollSize 意为 tmp文件在达到128M时会滚动生成正式文件
(2)hdfs.rollInterval 单位是秒 意为 tmp文件创建超3600秒时会滚动生成正式文件
举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:
/atguigu/20180101/atguigu.201801010520.tmp
即使文件内容没有达到128M,也会在05:33时滚动生成正式文件
经验
如果没有启动kafka就启动了 flume向hdfs插入数据,会怎么样?
flume启动后疯狂寻找kafka中的数据,但是因为kafka未启动,所以flume沉寂下去了,之后就不会再找了。这时候只能重启flume。为了避免这样的麻烦出现,我们对于程序的功能要做到了如指掌,这样才不会出现程序顺序问题导致的异常