zookeeper安装

启动脚本

集群中的启动脚本可以让你省去大部分无意义的服务启动时间,极大提高工作效率,下面的代码分别可以传入三种参数

start、stop、status,对应了三种zookeeper的命令,脚本设置权限777,放到/usr/local/bin 目录下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash

case $1 in
"start"){
for i in node01 node02 node03
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start"
done
};;

"stop"){
for i in node01 node02 node03
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop"
done
};;

"status"){
for i in node01 node02 node03
do
ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status"
done
};;
esac

规划

在node01、node02和node03三个节点上部署Zookeeper。

安装

1
2
3
4
(1)解压Zookeeper安装包到/opt/module/目录下
[root@hadoop102 software]# tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
(2)同步/opt/module/zookeeper-3.4.10目录内容到node02、node03
[root@hadoop102 module]# xsync zookeeper-3.4.10/

服务器编号

就是给zookeeper的服务器起个名字,在zookeeper集群中作为唯一表示存在

1
2
3
4
5
6
7
8
9
10
(1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData
[root@hadoop102 zookeeper-3.4.10]# mkdir zkData
(2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件
[root@hadoop102 zkData]# touch myid
(3)编辑myid文件
[root@hadoop102 zkData]# vi myid
对应的集群添加数字
1
2
3

修改配置文件

起了名字,但还是要通过ip地址找到主机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
(1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg
[root@hadoop102 conf]# mv zoo_sample.cfg zoo.cfg
(2)打开zoo.cfg文件
[root@hadoop102 conf]# vim zoo.cfg
修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.4.10/zkData
增加如下配置
#######################cluster##########################
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888

(3)配置参数解读
server.A=B:C:D。
A是一个数字,表示这个是第几号服务器;
集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。
B是这个服务器的ip地址;
C是这个服务器与集群中的Leader服务器交换信息的端口;
D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

分发

1
[root@node01 module]# xsync zookeeper-3.4.10/

这时候,我们刚开始配置的一键启动脚本就会有用了

日志数据生成

还记得我们之前在idea上的代码吗,现在用到了,首先将那个idea的项目jar包上传到服务器中,我这里只上传到node01、node02,执行代码

1
java -classpath /opt/software/bd01-1.0-SNAPSHOT-jar-with-dependencies.jar com.ls.appclient.AppMain $1 $2 >/opt/module/test.log

-classpath 指定jar的位置

$1 $2 传递进jar的参数

最后指定一个打印日志生成的位置,而不是我们数据的位置,数据的位置在你的项目resource下的xml中有配置,如果你没改,那就是 /tmp/logs/ 目录下

但是这样的指令要输入两个集群实在是太麻烦了,所以我们来整合一个脚本来执行

1
2
3
4
5
6
#! /bin/bash
for i in node01 node02
do
ssh $i "java -classpath /opt/software/bd01-1.0-SNAPSHOT-jar-with-dependencie
s.jar com.ls.appclient.AppMain $1 $2 >/opt/module/test.log &"
done

数据生成完毕

集群日志打印

我们集群很多,如果想要查看集群的服务是否开启怎么办?要一个一个的jps、date、|grep 吗?我们可以用一个脚本,将一个命令分给集群的其他机器执行,达到可以在一台机器上查看所有进程的目的。

1
2
3
4
5
6
#! /bin/bash
for i in node01 node02 node03
do
echo --------- $i ----------
ssh $i "$*"
done

便利三个节点,打印出节点名称

通过ssh 连接遍历出的每个节点

$* 代表这个脚本/程序的所有参数

效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[root@node01 ~]# sout.sh jps
--------- node01 ----------
24359 Jps
--------- node02 ----------
2785 Jps
--------- node03 ----------
2653 Jps
[root@node01 ~]# sout.sh
--------- node01 ----------
Last login: Sun Aug 23 14:26:18 2020 from node01
[root@node01 ~]# sout.sh date
--------- node01 ----------
2020年 08月 23日 星期日 14:27:59 CST
--------- node02 ----------
2020年 08月 23日 星期日 14:27:59 CST
--------- node03 ----------
2020年 08月 23日 星期日 14:27:59 CST
[root@node01 ~]#

flume安装

解压

将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下
解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下

1
tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/

修改

修改apache-flume-1.7.0-bin的名称为flume

1
2
3
4
5
6

mv apache-flume-1.7.0-bin flume
将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件
mv flume-env.sh.template flume-env.sh
vi flume-env.sh
export JAVA_HOME=/opt/module/jdk1.8.0_144

这个参数用来确定集群内存的浮动,xms与xmx一般要设置一致,并且取消注解设置大小为4g,学习环境就不配置了

1
2
# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
# export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

分发

1
xsync flume/

插件功能比较

Source

Taildir Source相比Exec Source、Spooling Directory Source的优势

TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。

Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。

Spooling Directory Source监控目录,不支持断点续传。

batchSize大小如何设置?

答:Event 1K左右时,500-1000合适(默认为100)

Channel

采用Kafka Channel,省去了Sink,提高了效率。

flume需求分析:

img

  1. flume数据源为刚刚生成的日志信息,flume数据的目标位置是 kafaka集群
  2. 数据传输过程中,为了不影响效率,flume只对数据进行简单的etl筛选,而不是挨个字段的检测判空
  3. 数据要通过多个channel分发给不同的topic,用类型区分将channel分开发放

作到哪一步都要做到心中有谱

文件读取配置

在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件

vim file-flume-kafka.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
a1.sources=r1
a1.channels=c1 c2

# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /tmp/logs/app.+
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2

#interceptor
a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = com.ls.intereceptor.LogETLInterceptor$Builder
a1.sources.r1.interceptors.i2.type = com.ls.intereceptor.LogTypeInter$Builder

a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = topic
a1.sources.r1.selector.mapping.topic_start = c1
a1.sources.r1.selector.mapping.topic_event = c2

# configure channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.channels.c1.kafka.topic = topic_start
a1.channels.c1.parseAsFlumeEvent = false
a1.channels.c1.kafka.consumer.group.id = flume-consumer

a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092
a1.channels.c2.kafka.topic = topic_event
a1.channels.c2.parseAsFlumeEvent = false
a1.channels.c2.kafka.consumer.group.id = flume-consumer

这里有两个默认的参数配置,在interceptor中,也就是需要我们自己实现的拦截器功能,这些方法都需要在idea中定义出来

根据配置重写拦截器

本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。

ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志

日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。

创建工程

maven工程名字为bd02

创建包名 com.ls

pom文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.7.0</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

创建包,书写三个类

包:com.ls.intereceptor

1598246738472

etl处理类:LogETLInterceptor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.ls.intereceptor;

import org.apache.commons.lang.CharSet;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

public class LogETLInterceptor implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
//获取数据
byte[] body = event.getBody();
String log = new String(body, Charset.forName("UTF-8"));
if (log.contains("start")) {
if (LogUtils.valueStart(log)) {
return event;
}
} else {
if (LogUtils.valueEnent(log)) {
return event;
}
}
return null;
}

@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();

for (Event event : list) {
Event intercept = intercept(event);

if (intercept != null) {
events.add(intercept);
}
}

return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new LogETLInterceptor();
}

@Override
public void configure(Context context) {

}
}
}

拦截类:LogTypeInter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.ls.intereceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class LogTypeInter implements Interceptor {
@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
byte[] body = event.getBody();


String log = new String(body, Charset.forName("UTF-8"));

//获取header
Map<String, String> headers = event.getHeaders();
//向header里添加日志
if (log.contains("start")) {
headers.put("topic", "topic_start");
} else {
headers.put("topic", "topic_event");
}
return event;
}

@Override
public List<Event> intercept(List<Event> list) {
ArrayList<Event> events = new ArrayList<>();

for (Event event : list) {
Event intercept = intercept(event);
events.add(intercept);
}

return events;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new LogTypeInter();
}

@Override
public void configure(Context context) {

}
}
}

日志完整性简单判断工具类:LogUtils

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ls.intereceptor;

import org.apache.commons.lang.math.NumberUtils;

public class LogUtils {
public static boolean valueStart(String log) {
//json
if (log == null) {
return false;
}

//不要 {json json}
if (!log.trim().startsWith("{") || !log.trim().endsWith("}")) {
return false;
}

return true;
}

public static boolean valueEnent(String log) {
if (log == null) {
return false;
}

// 时间|json
String[] split = log.split("\\|");

//不为2说明不完整
if (split.length != 2) {
return false;
}

//时间的字符长度为13 是否都是数字
if (split[0].length() != 13 || !NumberUtils.isDigits(split[0])) {
return false;
}

//判断是否是正确的json格式
return split[1].trim().startsWith("{") && split[1].trim().endsWith("}");
}
}

启动脚本

flume的启动与关闭事实上有点麻烦的,所以比任何启动脚本都要优先布置出来,

单独启动一个flume:

1
/opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file_flume_kafka.conf  --name a1

单独关闭一个flume:

xargs的意思是将管道传递过来的数,作为右边定义的命令的参数,所以直到xargs之前的命令都是为了查询出flume运行的端口号

1
ps -ef | grep file_flume_kafka | grep -v grep |awk '{print \$2}' | xargs kill

可以看到单是一个集群的开启关闭就需要这么多命令了,必须优化!!:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#! /bin/bash

case $1 in
"start"){
for i in node01 node02
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file_flume_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &"
done
};;
"stop"){
for i in node01 node02
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file_flume_kafka | grep -v grep |awk '{print \$2}' | xargs kill"
done

};;
esac

说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。

说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。

2>&1 的意思就是将标准错误重定向到标准输出。这里标准输出已经重定向到了 /dev/null。那么标准错误也会输出到/dev/null

标准输入0:从键盘获得输入 /proc/self/fd/0

标准输出1:输出到屏幕(即控制台) /proc/self/fd/1

错误输出2:输出到屏幕(即控制台) /proc/self/fd/2

-Dflume.root.logger=INFO,LOGFILE 是将日志结果输出到控制台,然后扔到黑洞中,也就是清空日志

脚本后续处理

  1. 设置权限 777 chmod 777 fl.sh
  2. 将脚本移动到 /usr/local/bin 下,这样就可以在任何位置调用脚本了