实现对MapReduce日志文件的不同级别统计个数 LogCount

作业要求:

  • 自行实现一个读取MapReduce日志文件的Spout
  • 按照不同日志级别计数(类似WordCount)

作业完成步骤:

在学习了Storm的基本概念后,首先对作业要求进行分析。实际操作上,我使用Spout读取HDFS上存储的文件,并在Bolt中对每一行进行处理,使用正则表达式判断日志级别,统计各级别日志个数,并使用sjf4j写入日志,便于检查结果。

首先编写拓扑部分。对于本次作业的要求来说,只需要一个SpoutHDFS中读取数据,输出到一个Bolt中进行统计足矣。也考虑到了可以根据不同的日志级别分为4个Bolt,不过由于不同的日志级别数据量差异极大,也就没有考虑这种形式,而是采用最简单的一个Spout到一个Bolt的形式处理。为了清晰地观察实验结果,两者的并行度都设置为1。流分组也采取最基本的随机分组即可。此部分的代码如下所示。

接下来是编写Spout部分。在这一部分需要读取HDFS上的文件,在这里可以在open函数中实现打开HDFS上的文件的字节流,并使用BufferedReader包装以方便通过readLine方法按行读取。接下来在最关键的nextTuple方法中,不断地按行读取日志内容,并使用emit方法发送出数据。显然,输出的是包含一个字段的元组,所以在declareOutputFields方法中声明的元组只需要包含一项即可,这里命名为LogEvent

以下是Spout这一部分的代码。

接下来编写Bolt部分。这一部分是对不同的日志级别分别进行统计。在prepare函数中,对计数的Map进行初始化。在execute函数中,使用正则表达式匹配日志级别并计数即可。每次通过slf4j输出日志信息,以便我们查看日志追踪结果。此部分代码如下。

接下来先在本地执行该程序查看效果。下面是输出日志中的几条关于同一条读取到的日志信息内容,可以看到发送、传输、接受、自定义的输出、处理完毕时的情况。

可以从日志信息中看到GOT one INFO log, current count: 17.GOT one ERROR log, current count: 4.GOT one DEBUG log, current count: 28.GOT one WARN log, current count: 2.也就是INFO17条、ERROR4条、DEBUG28条、WARN2条。可以看到结果正确。

本地运行正确后,接下来提交到集群上运行。在这里需要对maven进行一些配置。在这里重复强调一下原本配置的依赖等。依赖需要storm-corehadoop-clienthadoop-hdfs三个依赖,重点需要排除slf4j-log4j12,以免造成无法找到主类的问题。接下来还需要配置shade插件,来打包包含依赖的jar包。还需强调的是需要通过设置<scope>provided</scope>来避免将storm-core打包到jar包内。最后,maven部分的总体配置如下。

执行clean compile shade:shade后,便可以在target文件夹下得到jar包。使用sftp传输到集群中,执行storm jar LogCount.jar sh.duan.storm.LogCountTopology DDTest即可提交拓扑。
这里补充一点是,在集群的每台服务器上,使用命令storm logviewer命令即可开启默认端口为8000的日志查看web端功能,方便在web端查看输出结果。

Web UI拓扑如下。

Worker日志截图如下。

搜索Worker日志如下。

与在本地的结果类似,下面是日志中的摘录。

下面对过程中遇到的一些问题进行总结。
+ 由于有hdfs依赖,最初打包无依赖的程序上传到集群无法运行。搜索后首先用assembly插件进行打包为包含依赖的jar包。
+ 上传集群后提示Exception in thread "main" java.lang.ExceptionInInitializerError,查找后发现需要为storm-core添加<scope>provided</scope>,来避免打包该依赖,以免与集群中冲突。
+ 上传集群后提示变为Error: Could not find or load main class sh.duan.storm.LogCountTopology,苦苦查询网络后发现,虽然hadoop-client中已经有hadoop-hdfs依赖,但是仍需要单独添加hadoop-hdfs依赖。另外的问题是必须将slf4j的实现排除在jar包外,以免与集群冲突。具体参考maven配置中的exclusion部分。
+ 在运行后仍然提示配置出现问题后,搜索后得知需要使用shade插件来替换assembly,以便对依赖中的配置文件进行整合,如上文中所述设置后,一切正常。
+ 最让人感到意外的一个问题是,在出现找不到主类的错误时,另一个原因是真的没有找到jar包。起初jar包名称中有冒号,虽有在意并试了转义,但始终找不到问题所在后,尝试tar xvf xxx:xxx.tar.gz时发现提示找不到压缩文件。查询网络后得知
If the archive file name includes a colon (‘:’), then it is assumed to be a file on another machine[…]
所以冒号前的部分会被识别为其他的主机,实际上想一想,和我们使用scp命令很像。在修改包名后,运行一切正常。

欢迎留言