flume消费kafka数据

流程

规划:

服务器hadoop102 服务器hadoop103 服务器hadoop104
Flume(消费Kafka) Flume

1598402226405

我们的进度来到了使用flume消费kafka数据了。

1598402242099

由图可知,我们现在要创建两组flume控件

实现

根据集群规划来看,这个采集任务要交给node03来执行,/opt/module/flume/conf目录下创建kafka-flume-hdfs.conf文件

vim kafka-flume-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2

## source1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sources.r1.kafka.topics=topic_start

## source2
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.sources.r2.kafka.topics=topic_event

## channel1
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6

## channel2
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6

## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second

##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.round = true
a1.sinks.k2.hdfs.roundValue = 10
a1.sinks.k2.hdfs.roundUnit = second

## 不要产生大量小文件
a1.sinks.k1.hdfs.rollInterval = 10
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0

## 控制输出文件是原生文件。
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop

## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1

a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

配置解释

1598406194849

batchSize 是 指定文件读取数量

batchDurationMillis 是 读取数据的延迟

启动读取kafka的flume

再node03上启动

1
2
/opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/kafka_flume_hdfs.conf -
-name a1

我预期的效果是flume会从kafka集群中读取所有数据,将压缩为lzo的格式的数据提交到hdfs上,并且生成两个文件夹,分别是 topic_event、topic_start 但是我只有一个topic_event生成了。我查看日志:

1
vim /opt/module/flume/log.txt 

发现日志中的报错为:

Error while fetching metadata with correlation id : {LEADER_NOT_AVAILABLE}

表示无法识别kafka hostname ??经过百度,发现需要修改的是kafka的配置,没错,我上一篇kafka配置不够全面。解决办法:

打开kafka的server.properties

1
vim /opt/module/kafka_2.11-0.11.0.2/config/server.properties 

修改一个监听配置,让服务器之间对于kafka的监听端口能够识别得到

1
2
3
4
# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
advertised.listeners=PLAINTEXT://node01:9092

之后,要先关闭 采集kafka的flume!!,再关闭采集本地数据传递到kafka集群的flume,最后重启kafka,然后重启两个采集服务

性能优化

  • 问题描述:如果启动消费Flume抛出如下异常

ERROR hdfs.HDFSEventSink: process failed

java.lang.OutOfMemoryError: GC overhead limit exceeded

  • 解决方案步骤:

在hadoop102服务器的/opt/module/flume/conf/flume-env.sh文件中增加如下配置

export JAVA_OPTS=”-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote”

  • Flume内存参数设置及优化

JVM heap一般设置为4G或更高,部署在单独的服务器上(4核8线程16G内存)

-Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。

flume组件

  • FileChannel和MemoryChannel区别

MemoryChannel传输数据速度更快,但因为数据保存在JVM的堆内存中,Agent进程挂掉会导致数据丢失,适用于对数据质量要求不高的需求。

FileChannel传输速度相对于Memory慢,但数据安全保障高,Agent进程挂掉也可以从失败中恢复数据。

  • FileChannel优化

通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。

官方说明如下:

Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir和backupCheckpointDir也尽量配置在不同硬盘对应的目录中,保证checkpoint坏掉后,可以快速使用backupCheckpointDir恢复数据

  • Sink:HDFS Sink

(1)HDFS存入大量小文件,有什么影响?

元数据层面:每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属组,权限,创建时间等,这些信息都保存在Namenode内存中。所以小文件过多,会占用Namenode服务器大量内存,影响Namenode性能和使用寿命

计算层面:默认情况下MR会对每个小文件启用一个Map任务计算,非常影响计算性能。同时也影响磁盘寻址时间。

(2)HDFS小文件处理

官方默认的这三个参数配置写入HDFS后会产生小文件,hdfs.rollInterval、hdfs.rollSize、hdfs.rollCount

基于以上hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0,hdfs.roundValue=10,hdfs.roundUnit= second几个参数综合作用,这几个参数,只要有一个达到了,就会产生效果,效果如下:

(1)hdfs.rollSize 意为 tmp文件在达到128M时会滚动生成正式文件

(2)hdfs.rollInterval 单位是秒 意为 tmp文件创建超3600秒时会滚动生成正式文件

举例:在2018-01-01 05:23的时侯sink接收到数据,那会产生如下tmp文件:

/atguigu/20180101/atguigu.201801010520.tmp

即使文件内容没有达到128M,也会在05:33时滚动生成正式文件

经验

如果没有启动kafka就启动了 flume向hdfs插入数据,会怎么样?

flume启动后疯狂寻找kafka中的数据,但是因为kafka未启动,所以flume沉寂下去了,之后就不会再找了。这时候只能重启flume。为了避免这样的麻烦出现,我们对于程序的功能要做到了如指掌,这样才不会出现程序顺序问题导致的异常