06月13日, 2017 2,329 次查看次
实现对MapReduce日志文件的不同级别统计个数 LogCount
作业要求:
- 自行实现一个读取MapReduce日志文件的
Spout
- 按照不同日志级别计数(类似WordCount)
作业完成步骤:
在学习了Storm的基本概念后,首先对作业要求进行分析。实际操作上,我使用Spout
读取HDFS
上存储的文件,并在Bolt
中对每一行进行处理,使用正则表达式判断日志级别,统计各级别日志个数,并使用sjf4j
写入日志,便于检查结果。
首先编写拓扑部分。对于本次作业的要求来说,只需要一个Spout
从HDFS
中读取数据,输出到一个Bolt
中进行统计足矣。也考虑到了可以根据不同的日志级别分为4个Bolt
,不过由于不同的日志级别数据量差异极大,也就没有考虑这种形式,而是采用最简单的一个Spout
到一个Bolt
的形式处理。为了清晰地观察实验结果,两者的并行度都设置为1。流分组也采取最基本的随机分组即可。此部分的代码如下所示。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | public class LogCountTopology{ public static void main(String[] args){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("hdfs", new LogCountSpout(), 1); builder .setBolt("counter", new LogCountBolt(), 1) .shuffleGrouping("hdfs"); Config conf = new Config(); conf.setDebug(true); if(args != null && args.length > 0){ conf.setNumWorkers(3); try{ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); }catch(Exception e){ e.printStackTrace(); } }else{ LocalCluster cluster = new LocalCluster(); cluster.submitTopology("DDTest", conf, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("DDTest"); cluster.shutdown(); } } } |
接下来是编写Spout
部分。在这一部分需要读取HDFS
上的文件,在这里可以在open
函数中实现打开HDFS
上的文件的字节流,并使用BufferedReader
包装以方便通过readLine
方法按行读取。接下来在最关键的nextTuple
方法中,不断地按行读取日志内容,并使用emit
方法发送出数据。显然,输出的是包含一个字段的元组,所以在declareOutputFields
方法中声明的元组只需要包含一项即可,这里命名为LogEvent
。
以下是Spout
这一部分的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | public static class LogCountSpout extends BaseRichSpout{ private static final Logger LOG = LoggerFactory.getLogger(LogCountSpout.class); private BufferedReader reader; private SpoutOutputCollector collector; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector){ try{ Path pt = new Path("/data/DuanYukai/test.log"); FileSystem fs = FileSystem.get(new URI("hdfs://hxs-vm01:9000"), new Configuration()); reader = new BufferedReader(new InputStreamReader(fs.open(pt))); this.collector = collector; }catch(IOException e){ LOG.error("HDFS error", e); }catch(URISyntaxException e){ LOG.error("URI error", e); } } public void nextTuple(){ String line; try{ if((line = reader.readLine()) != null) collector.emit(new Values(line)); else Utils.sleep(1000); }catch(IOException e){ collector.reportError(e); LOG.error("Reader error", e); } } public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("LogEvent")); } } |
接下来编写Bolt
部分。这一部分是对不同的日志级别分别进行统计。在prepare
函数中,对计数的Map
进行初始化。在execute
函数中,使用正则表达式匹配日志级别并计数即可。每次通过slf4j
输出日志信息,以便我们查看日志追踪结果。此部分代码如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 | public static class LogCountBolt extends BaseRichBolt{ private static final Logger LOG = LoggerFactory.getLogger(LogCountBolt.class); private OutputCollector collector; private Map<String, Integer> counts; public void prepare(Map map, TopologyContext topologyContext, OutputCollector collector){ this.collector = collector; counts = new HashMap<>(4); counts.put("ERROR", 0); counts.put("WARN", 0); counts.put("INFO", 0); counts.put("DEBUG", 0); } public void execute(Tuple tuple){ String logStr = tuple.getString(0); Matcher matcher = Pattern .compile("\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}\\S*? (INFO|DEBUG|WARN|ERROR) [^\\r\\n]*") .matcher(logStr); if(matcher.find()){ String level = matcher.group(1); counts.put(level, counts.get(level) + 1); LOG.info("GOT one" + level + " log, current count: " + counts.get(level) + ". Log content: " + logStr); } collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields()); } } |
接下来先在本地执行该程序查看效果。下面是输出日志中的几条关于同一条读取到的日志信息内容,可以看到发送、传输、接受、自定义的输出、处理完毕时的情况。
1 2 3 4 5 6 7 | 6715 [Thread-20-hdfs-executor[3 3]] INFO o.a.s.d.task - Emitting: hdfs default [2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard()] 6715 [Thread-20-hdfs-executor[3 3]] INFO o.a.s.d.executor - TRANSFERING tuple [dest: 2 tuple: source: hdfs:3, stream: default, id: {}, [2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard()]] 6722 [Thread-18-counter-executor[2 2]] INFO o.a.s.d.executor - Processing received message FOR 2 TUPLE: source: hdfs:3, stream: default, id: {}, [2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard()] 6722 [Thread-18-counter-executor[2 2]] INFO s.d.s.LogCountTopology$LogCountBolt - GOT oneINFO log, current count: 17. Log content: 2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard() 6722 [Thread-18-counter-executor[2 2]] INFO o.a.s.d.executor - BOLT ack TASK: 2 TIME: TUPLE: source: hdfs:3, stream: default, id: {}, [2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard()] 6722 [Thread-18-counter-executor[2 2]] INFO o.a.s.d.executor - Execute done TUPLE source: hdfs:3, stream: default, id: {}, [2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard()] TASK: 2 DELTA: |
可以从日志信息中看到GOT one INFO log, current count: 17.
、GOT one ERROR log, current count: 4.
、GOT one DEBUG log, current count: 28.
、GOT one WARN log, current count: 2.
也就是INFO
17条、ERROR
4条、DEBUG
28条、WARN
2条。可以看到结果正确。
本地运行正确后,接下来提交到集群上运行。在这里需要对maven
进行一些配置。在这里重复强调一下原本配置的依赖等。依赖需要storm-core
、hadoop-client
、hadoop-hdfs
三个依赖,重点需要排除slf4j-log4j12
,以免造成无法找到主类的问题。接下来还需要配置shade插件,来打包包含依赖的jar包。还需强调的是需要通过设置<scope>provided</scope>
来避免将storm-core
打包到jar包内。最后,maven部分的总体配置如下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 | <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sh.duan.storm</groupId> <artifactId>logcount</artifactId> <version>0.0.1</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> <version>2.4</version> <configuration> <archive> <manifest> <mainClass>sh.duan.storm.LogCountTopology</mainClass> </manifest> </archive> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <configuration> <createDependencyReducedPom>true</createDependencyReducedPom> <!--<finalName>sh.duan.storm:log-count:0.0.1</finalName>--> <finalName>LogCount</finalName> </configuration> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>sh.duan.storm.LogCountTopology</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> |
执行clean compile shade:shade
后,便可以在target
文件夹下得到jar包。使用sftp传输到集群中,执行storm jar LogCount.jar sh.duan.storm.LogCountTopology DDTest
即可提交拓扑。
这里补充一点是,在集群的每台服务器上,使用命令storm logviewer
命令即可开启默认端口为8000的日志查看web端功能,方便在web端查看输出结果。
Web UI拓扑如下。
Worker日志截图如下。
搜索Worker日志如下。
与在本地的结果类似,下面是日志中的摘录。
1 2 3 | .668 s.d.s.LogCountTopology$LogCountBolt Thread-9-counter-executor[4 4] [INFO] GOT one INFO log, current count: 17. Log content: 2016-07-28 15:52:35-[Analysis-WC] INFO [http-8080-8] FrameController.dashboard(45) | Start FrameController --> dashboard() 2017-05-19 04:18:53.668 |
下面对过程中遇到的一些问题进行总结。
+ 由于有hdfs
依赖,最初打包无依赖的程序上传到集群无法运行。搜索后首先用assembly
插件进行打包为包含依赖的jar包。
+ 上传集群后提示Exception in thread "main" java.lang.ExceptionInInitializerError
,查找后发现需要为storm-core
添加<scope>provided</scope>
,来避免打包该依赖,以免与集群中冲突。
+ 上传集群后提示变为Error: Could not find or load main class sh.duan.storm.LogCountTopology
,苦苦查询网络后发现,虽然hadoop-client
中已经有hadoop-hdfs
依赖,但是仍需要单独添加hadoop-hdfs
依赖。另外的问题是必须将slf4j
的实现排除在jar包外,以免与集群冲突。具体参考maven配置中的exclusion
部分。
+ 在运行后仍然提示配置出现问题后,搜索后得知需要使用shade
插件来替换assembly
,以便对依赖中的配置文件进行整合,如上文中所述设置后,一切正常。
+ 最让人感到意外的一个问题是,在出现找不到主类的错误时,另一个原因是真的没有找到jar包。起初jar包名称中有冒号,虽有在意并试了转义,但始终找不到问题所在后,尝试tar xvf xxx:xxx.tar.gz
时发现提示找不到压缩文件。查询网络后得知
If the archive file name includes a colon (‘:’), then it is assumed to be a file on another machine[…]
所以冒号前的部分会被识别为其他的主机,实际上想一想,和我们使用scp
命令很像。在修改包名后,运行一切正常。