zookeeper安装 启动脚本 集群中的启动脚本可以让你省去大部分无意义的服务启动时间,极大提高工作效率,下面的代码分别可以传入三种参数
start、stop、status,对应了三种zookeeper的命令,脚本设置权限777,放到/usr/local/bin 目录下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #!/bin/bash case $1 in "start" ){ for i in node01 node02 node03 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh start" done };; "stop" ){ for i in node01 node02 node03 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop" done };; "status" ){ for i in node01 node02 node03 do ssh $i "/opt/module/zookeeper-3.4.10/bin/zkServer.sh status" done };; esac
规划 在node01、node02和node03三个节点上部署Zookeeper。
安装 1 2 3 4 (1)解压Zookeeper安装包到/opt/module/目录下 [root@hadoop102 software] (2)同步/opt/module/zookeeper-3.4.10目录内容到node02、node03 [root@hadoop102 module]
服务器编号 就是给zookeeper的服务器起个名字,在zookeeper集群中作为唯一表示存在
1 2 3 4 5 6 7 8 9 10 (1)在/opt/module/zookeeper-3.4.10/这个目录下创建zkData [root@hadoop102 zookeeper-3.4.10] (2)在/opt/module/zookeeper-3.4.10/zkData目录下创建一个myid的文件 [root@hadoop102 zkData] (3)编辑myid文件 [root@hadoop102 zkData] 对应的集群添加数字 1 2 3
修改配置文件 起了名字,但还是要通过ip地址找到主机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 (1)重命名/opt/module/zookeeper-3.4.10/conf这个目录下的zoo_sample.cfg为zoo.cfg [root@hadoop102 conf] (2)打开zoo.cfg文件 [root@hadoop102 conf] 修改数据存储路径配置 dataDir=/opt/module/zookeeper-3.4.10/zkData 增加如下配置 server.1=node01:2888:3888 server.2=node02:2888:3888 server.3=node03:2888:3888 (3)配置参数解读 server.A=B:C:D。 A是一个数字,表示这个是第几号服务器; 集群模式下配置一个文件myid,这个文件在dataDir目录下,这个文件里面有一个数据就是A的值,Zookeeper启动时读取此文件,拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。 B是这个服务器的ip地址; C是这个服务器与集群中的Leader服务器交换信息的端口; D是万一集群中的Leader服务器挂了,需要一个端口来重新进行选举,选出一个新的Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
分发
这时候,我们刚开始配置的一键启动脚本就会有用了
日志数据生成 还记得我们之前在idea上的代码吗,现在用到了,首先将那个idea的项目jar包上传到服务器中,我这里只上传到node01、node02,执行代码
1 java -classpath /opt/software/bd01-1.0-SNAPSHOT-jar-with-dependencies.jar com.ls.appclient.AppMain $1 $2 >/opt/module/test.log
-classpath 指定jar的位置
$1 $2 传递进jar的参数
最后指定一个打印日志生成的位置,而不是我们数据的位置,数据的位置在你的项目resource下的xml中有配置,如果你没改,那就是 /tmp/logs/ 目录下
但是这样的指令要输入两个集群实在是太麻烦了,所以我们来整合一个脚本来执行
1 2 3 4 5 6 #! /bin/bash for i in node01 node02 do ssh $i "java -classpath /opt/software/bd01-1.0-SNAPSHOT-jar-with-dependencie s.jar com.ls.appclient.AppMain $1 $2 >/opt/module/test.log &" done
数据生成完毕
集群日志打印 我们集群很多,如果想要查看集群的服务是否开启怎么办?要一个一个的jps、date、|grep 吗?我们可以用一个脚本,将一个命令分给集群的其他机器执行,达到可以在一台机器上查看所有进程的目的。
1 2 3 4 5 6 #! /bin/bash for i in node01 node02 node03do echo --------- $i ---------- ssh $i "$*" done
便利三个节点,打印出节点名称
通过ssh 连接遍历出的每个节点
$* 代表这个脚本/程序的所有参数
效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 [root@node01 ~] --------- node01 ---------- 24359 Jps --------- node02 ---------- 2785 Jps --------- node03 ---------- 2653 Jps [root@node01 ~] --------- node01 ---------- Last login: Sun Aug 23 14:26:18 2020 from node01 [root@node01 ~] --------- node01 ---------- 2020年 08月 23日 星期日 14:27:59 CST --------- node02 ---------- 2020年 08月 23日 星期日 14:27:59 CST --------- node03 ---------- 2020年 08月 23日 星期日 14:27:59 CST [root@node01 ~]
flume安装 解压 将apache-flume-1.7.0-bin.tar.gz上传到linux的/opt/software目录下 解压apache-flume-1.7.0-bin.tar.gz到/opt/module/目录下
1 tar -zxf apache-flume-1.7.0-bin.tar.gz -C /opt/module/
修改 修改apache-flume-1.7.0-bin的名称为flume
1 2 3 4 5 6 mv apache-flume-1.7.0-bin flume 将flume/conf下的flume-env.sh.template文件修改为flume-env.sh,并配置flume-env.sh文件 mv flume-env.sh.template flume-env.sh vi flume-env.sh export JAVA_HOME=/opt/module/jdk1.8.0_144
这个参数用来确定集群内存的浮动,xms与xmx一般要设置一致,并且取消注解设置大小为4g,学习环境就不配置了
1 2 # Give Flume more memory and pre-allocate, enable remote monitoring via JMX # export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"
分发
插件功能比较
Source
Taildir Source相比Exec Source、Spooling Directory Source的优势
TailDir Source:断点续传、多目录。Flume1.6以前需要自己自定义Source记录每次读取文件位置,实现断点续传。
Exec Source可以实时搜集数据,但是在Flume不运行或者Shell命令出错的情况下,数据将会丢失。
Spooling Directory Source监控目录,不支持断点续传。
batchSize大小如何设置?
答:Event 1K左右时,500-1000合适(默认为100)
Channel
采用Kafka Channel,省去了Sink,提高了效率。
flume需求分析:
flume数据源为刚刚生成的日志信息,flume数据的目标位置是 kafaka集群
数据传输过程中,为了不影响效率,flume只对数据进行简单的etl筛选,而不是挨个字段的检测判空
数据要通过多个channel分发给不同的topic,用类型区分将channel分开发放
作到哪一步都要做到心中有谱
文件读取配置 在/opt/module/flume/conf目录下创建file-flume-kafka.conf文件
vim file-flume-kafka.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 a1.sources=r1 a1.channels=c1 c2 # configure source a1.sources.r1.type = TAILDIR a1.sources.r1.positionFile = /opt/module/flume/test/log_position.json a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ a1.sources.r1.fileHeader = true a1.sources.r1.channels = c1 c2 #interceptor a1.sources.r1.interceptors = i1 i2 a1.sources.r1.interceptors.i1.type = com.ls.intereceptor.LogETLInterceptor$Builder a1.sources.r1.interceptors.i2.type = com.ls.intereceptor.LogTypeInter$Builder a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = topic a1.sources.r1.selector.mapping.topic_start = c1 a1.sources.r1.selector.mapping.topic_event = c2 # configure channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.channels.c1.kafka.topic = topic_start a1.channels.c1.parseAsFlumeEvent = false a1.channels.c1.kafka.consumer.group.id = flume-consumer a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c2.kafka.bootstrap.servers = node01:9092,node02:9092,node03:9092 a1.channels.c2.kafka.topic = topic_event a1.channels.c2.parseAsFlumeEvent = false a1.channels.c2.kafka.consumer.group.id = flume-consumer
这里有两个默认的参数配置,在interceptor中,也就是需要我们自己实现的拦截器功能,这些方法都需要在idea中定义出来
根据配置重写拦截器 本项目中自定义了两个拦截器,分别是:ETL拦截器、日志类型区分拦截器。
ETL拦截器主要用于,过滤时间戳不合法和Json数据不完整的日志
日志类型区分拦截器主要用于,将启动日志和事件日志区分开来,方便发往Kafka的不同Topic。
创建工程 maven工程名字为bd02
创建包名 com.ls
pom文件 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 <dependencies > <dependency > <groupId > org.apache.flume</groupId > <artifactId > flume-ng-core</artifactId > <version > 1.7.0</version > </dependency > </dependencies > <build > <plugins > <plugin > <artifactId > maven-compiler-plugin</artifactId > <version > 2.3.2</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <artifactId > maven-assembly-plugin</artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
创建包,书写三个类 包:com.ls.intereceptor
etl处理类:LogETLInterceptor
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 package com.ls.intereceptor;import org.apache.commons.lang.CharSet;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;public class LogETLInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { byte [] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8" )); if (log.contains("start" )) { if (LogUtils.valueStart(log)) { return event; } } else { if (LogUtils.valueEnent(log)) { return event; } } return null ; } @Override public List<Event> intercept (List<Event> list) { ArrayList<Event> events = new ArrayList<>(); for (Event event : list) { Event intercept = intercept(event); if (intercept != null ) { events.add(intercept); } } return events; } @Override public void close () { } public static class Builder implements Interceptor .Builder { @Override public Interceptor build () { return new LogETLInterceptor(); } @Override public void configure (Context context) { } } }
拦截类:LogTypeInter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 package com.ls.intereceptor;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.Charset;import java.util.ArrayList;import java.util.List;import java.util.Map;public class LogTypeInter implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { byte [] body = event.getBody(); String log = new String(body, Charset.forName("UTF-8" )); Map<String, String> headers = event.getHeaders(); if (log.contains("start" )) { headers.put("topic" , "topic_start" ); } else { headers.put("topic" , "topic_event" ); } return event; } @Override public List<Event> intercept (List<Event> list) { ArrayList<Event> events = new ArrayList<>(); for (Event event : list) { Event intercept = intercept(event); events.add(intercept); } return events; } @Override public void close () { } public static class Builder implements Interceptor .Builder { @Override public Interceptor build () { return new LogTypeInter(); } @Override public void configure (Context context) { } } }
日志完整性简单判断工具类:LogUtils
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.ls.intereceptor;import org.apache.commons.lang.math.NumberUtils;public class LogUtils { public static boolean valueStart (String log) { if (log == null ) { return false ; } if (!log.trim().startsWith("{" ) || !log.trim().endsWith("}" )) { return false ; } return true ; } public static boolean valueEnent (String log) { if (log == null ) { return false ; } String[] split = log.split("\\|" ); if (split.length != 2 ) { return false ; } if (split[0 ].length() != 13 || !NumberUtils.isDigits(split[0 ])) { return false ; } return split[1 ].trim().startsWith("{" ) && split[1 ].trim().endsWith("}" ); } }
启动脚本 flume的启动与关闭事实上有点麻烦的,所以比任何启动脚本都要优先布置出来,
单独启动一个flume:
1 /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file_flume_kafka.conf --name a1
单独关闭一个flume:
xargs的意思是将管道传递过来的数,作为右边定义的命令的参数,所以直到xargs之前的命令都是为了查询出flume运行的端口号
1 ps -ef | grep file_flume_kafka | grep -v grep |awk '{print \$2}' | xargs kill
可以看到单是一个集群的开启关闭就需要这么多命令了,必须优化!!:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 #! /bin/bash case $1 in "start" ){ for i in node01 node02 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file_flume_kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/dev/null 2>&1 &" done };; "stop" ){ for i in node01 node02 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file_flume_kafka | grep -v grep |awk '{print \$2}' | xargs kill" done };; esac
说明1:nohup,该命令可以在你退出帐户/关闭终端之后继续运行相应的进程。nohup就是不挂起的意思,不挂断地运行命令。
说明2:/dev/null代表linux的空设备文件,所有往这个文件里面写入的内容都会丢失,俗称“黑洞”。
2>&1 的意思就是将标准错误重定向到标准输出。这里标准输出已经重定向到了 /dev/null。那么标准错误也会输出到/dev/null
标准输入0:从键盘获得输入 /proc/self/fd/0
标准输出1:输出到屏幕(即控制台) /proc/self/fd/1
错误输出2:输出到屏幕(即控制台) /proc/self/fd/2
-Dflume.root.logger=INFO,LOGFILE 是将日志结果输出到控制台,然后扔到黑洞中,也就是清空日志
脚本后续处理
设置权限 777 chmod 777 fl.sh
将脚本移动到 /usr/local/bin 下,这样就可以在任何位置调用脚本了