利用mapreduce清洗日志

package com.libc;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Process {

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, Text> {
		private Text word = new Text();

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {

			// TODO Auto-generated method stub
			String datas = "";
			try {
				datas = new String(value.getBytes(), 0, value.getLength(),
						"GBK");
			} catch (UnsupportedEncodingException e1) {
				// TODO Auto-generated catch block
				e1.printStackTrace();
			}
			// datas = value.toString();
			try {

				String[] split = datas.split(" time=");

				// 处理头中包含空格的字段
				Pattern p = Pattern.compile("phonemodel=\"(.*?)\"");
				String pm = getIndex(split[0], p);
				split[0] = split[0].replaceAll(pm, pm.replace(" ", ""));
				Pattern p1 = Pattern.compile("networktype=\"(.*?)\"");
				String nt = getIndex(split[0], p1);
				split[0] = split[0].replaceAll(nt, nt.replace(" ", ""));
				for (int i = 1; i < split.length; i++) {
					String[] codes = split[i].split(" ", 4);
					int headLen = split[0].split(" ").length;
					if (headLen != 20) {
						// 丢掉错误日志
						continue;
					}
					// 处理旧版本日志判别标准:|
					if (codes[2].equals("code=\"100\"")){
						if(codes[3].indexOf("contact_name")>-1){
							codes[3] = process100(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);

					}else if(codes[2].equals("code=\"101\"") ){
						if(codes[3].indexOf("message_to_")>-1){
							codes[3] = process101(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);
					}
					else if(codes[2].equals("code=\"102\"")){
						if(codes[3].indexOf("caller_n")>-1||codes[3].indexOf("caller_d")>-1){
							codes[3] = process102(codes[3]);
						}
							codes[3] = codes[3].replace(‘ ‘, ‘#‘);

					}else{
						codes[3] = codes[3].replace("  ", " ");
					}

					String collect = split[0] + " time=" + codes[0] + " "
							+ codes[1] + " " + codes[2] + " " + codes[3];
					word.set(collect);

					context.write(word, new Text(""));
				}

			} catch (Exception e) {
				// TODO Auto-generated catch block
			}
		}
	}

	public static String process100(String code) throws Exception{
		String[] codes = code.split(" ");
		HashMap<String, Contact> hs = new HashMap<String, Process.Contact>();
		Pattern p0 = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)\"");
		for (int i = 0; i < codes.length; i++) {
			if (codes[i].equals(""))
				continue;
			String index = getIndex(codes[i], p0);
			if (index == null)
				continue;
			String value = getIndex(codes[i], p1);
			Contact contact = null;
			if (hs.containsKey(index)) {
				contact = hs.get(index);
			} else {
				contact = new Contact();
			}
			if (codes[i].startsWith("contact_name_")) {
				contact.contactName = value;
			} else if (codes[i].startsWith("contact_num_")) {
				contact.contactNum = value;
			}
			contact.index = index;
			hs.put(index, contact);
		}

		return printToString(hs);
	}

	public static String process101(String code) throws Exception{
		String[] codes = code.split("\"  ");
		HashMap<String, Message> hs = new HashMap<String, Process.Message>();
		Pattern p = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)");
		for (int i = 0; i < codes.length; i++) {
			String index = getIndex(codes[i], p);
			String value = getIndex(codes[i], p1);
			if (index == null)
				continue;
			Message message = null;
			if (hs.containsKey(index)) {
				message = hs.get(index);
			} else {
				message = new Message();
			}
			if (codes[i].startsWith("message_time_")) {
				message.messageTime = value;
			} else if (codes[i].startsWith("message_to_")) {
				message.messageTo = value;
			}
			message.index = index;
			hs.put(index, message);
		}

		return printToString(hs);
	}

	public static String process102(String code) throws Exception{
		String[] codes = code.split("\"  ");
		HashMap<String, CallLog> hs = new HashMap<String, Process.CallLog>();
		Pattern p = Pattern.compile("_(\\d*)=");
		Pattern p1 = Pattern.compile("\"(.*)");
		for (int i = 0; i < codes.length; i++) {
			String index = getIndex(codes[i], p);
			if (index == null)
				continue;
			String value = getIndex(codes[i], p1);
			CallLog callLog = null;
			if (hs.containsKey(index)) {
				callLog = hs.get(index);
			} else {
				callLog = new CallLog();
			}
			if (codes[i].startsWith("caller_date_")) {
				callLog.callerDate = value;
			} else if (codes[i].startsWith("caller_duration_")) {
				callLog.callerDuration = value;
			} else if (codes[i].startsWith("caller_name_")) {
				callLog.callerName = value;
			} else if (codes[i].startsWith("caller_num_")) {
				callLog.callerNum = value;
			}
			callLog.index = index;
			hs.put(index, callLog);
		}

		return printToString(hs);
	}

	public static String printToString(Map hs) {
		Set set = hs.keySet();
		Iterator<String> it = set.iterator();
		String result = "";
		while (it.hasNext()) {
			result = result + hs.get(it.next()).toString() + "|";
		}
		return result;
	}

	public static String getIndex(String code, Pattern p) {
		String index = null;

		Matcher matcher = p.matcher(code);
		if (matcher.find()) {
			index = matcher.group(1);
		}
		return index;
	}

	public static class IntSumReducer extends Reducer<Text, Text, Text, Text> {

		public void reduce(Text key, Text rr, Context context)
				throws IOException, InterruptedException {
			context.write(key, new Text(""));
		}
	}

	public static class Contact {

		public String index;
		public String contactName;
		public String contactNum;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "contact_" + index + "=" + this.contactName + ";"
					+ this.contactNum;
		}
	}

	public static class Message {
		public String index;
		public String messageTime;
		public String messageTo;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "message_" + this.index + "=" + this.messageTo + ";"
					+ this.messageTime;
		}
	}

	public static class CallLog {
		public String index;
		public String callerDuration;
		public String callerNum;
		public String callerName;
		public String callerDate;

		@Override
		public String toString() {
			// TODO Auto-generated method stub
			return "callLog_" + this.index + "=" + this.callerName + ";"
					+ this.callerNum + ";" + this.callerDate + ";"
					+ this.callerDuration;
		}
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length != 2) {
			System.err.println("Usage: process <in> <out>");
			System.exit(2);
		}
		Job job = new Job(conf, "process");
		job.setJarByClass(Process.class);
		job.setMapperClass(TokenizerMapper.class);
		job.setCombinerClass(IntSumReducer.class);
		job.setReducerClass(IntSumReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

  此版本为第一版,运行几天后服务器日志量暴增,导致堆栈溢出错误,

因此修改为第二版后可以对jvm内存自定义配置

时间: 2025-01-04 21:56:07

利用mapreduce清洗日志的相关文章

Hadoop 中利用 mapreduce 读写 mysql 数据

Hadoop 中利用 mapreduce 读写 mysql 数据 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv.uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方. 好了言归正传,简单的说说背景.原理以及需要注意的地方: 1.为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInp

Hadoop阅读笔记(二)——利用MapReduce求平均数和去重

前言:圣诞节来了,我怎么能虚度光阴呢?!依稀记得,那一年,大家互赠贺卡,短短几行字,字字融化在心里:那一年,大家在水果市场,寻找那些最能代表自己心意的苹果香蕉梨,摸着冰冷的水果外皮,内心早已滚烫.这一年……我在博客园-_-#,希望用dt的代码燃烧脑细胞,温暖小心窝. 上篇<Hadoop阅读笔记(一)——强大的MapReduce>主要介绍了MapReduce的在大数据集上处理的优势以及运行机制,通过专利数据编写Demo加深了对于MapReduce中输入输出数据结构的细节理解.有了理论上的指导,仍

利用TraceSource写日志

利用TraceSource写日志 从微软推出第一个版本的.NET Framework的时候,就在“System.Diagnostics”命名空间中提供了Debug和Trace两个类帮助我们完成针对调试和跟踪信息的日志记录.在.NET Framework 2.0中,微软引入了TraceSource并对跟踪日志系统进行了优化,优化后的跟踪日志系统在.NET Core中又经过了相应的简化..NET Core的日志模型借助TraceSourceLoggerProvider实现对TraceSource的整

hadoop笔记之MapReduce的应用案例(利用MapReduce进行排序)

MapReduce的应用案例(利用MapReduce进行排序) MapReduce的应用案例(利用MapReduce进行排序) 思路: Reduce之后直接进行结果合并 具体样例: 程序名:Sort.java import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; impo

[cocos2dx]利用NDK崩溃日志查找BUG

摘要: 在android上开发c++应用, crash日志都是汇编码, 很难对应到c++代码中去. 通过此文, 你可以定位到程序崩溃时的C++代码, 精确查找问题. 博客: http://www.cnblogs.com/jhzhu 邮箱: [email protected] 作者: 知明所以 时间: 2014-06-20 背景介绍 本文主要内容: 利用android的crash log来对c++开发的android应用进行错误定位. 容易稳定复现的BUG, 一般可以通过断点调试来解决. 如果测试

利用MapReduce实现倒排索引

这里来学习的是利用MapReduce的分布式编程模型来实现简单的倒排索引. 首先什么是倒排索引? 倒排索引是文档检索中最常用的数据结构,被广泛地应用于全文搜索引擎. 它主要是用来存储某个单词(或词组)在一个文档或一组文档中存储位置的映射,即可以通过内容来查找文档: 而不是通过文档来确定文档所包含的内容,因而被称作倒排索引(Inverted Index). 倒排索引的基本原理和建立过程可以用图来说明. 各种类型的文件经过解析后变成纯文本,再经过中文分词,并与对应的文档号进行组合, 就形成了最简单的

asp.net利用log4net写入日志到SqlServer数据库

asp.net利用log4net写入日志到SqlServer数据库 作者: Kein  来源: 博客园  发布时间: 2010-10-14 07:19  阅读: 6427 次  推荐: 6   原文链接   [收藏] 摘要:Log4net是一个开源的错误日志记录项目,asp.net利用log4net写入日志到SqlServer数据库.下面就我的安装部署log4net到MS sql server的经验与大家分享. asp.net利用log4net写入日志到SqlServer数据库,Log4net是

利用MapReduce实现数据去重

数据去重主要是为了利用并行化的思想对数据进行有意义的筛选. 统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重. 示例文件内容: 此处应有示例文件 设计思路 数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次. 自然就想到将同一数据的所有记录都交给一台reduce机器,无路这个数据出现多少次,只要在最终结果中输出一次就可以了. 具体就是reduce的输入应该以数据作为key,而对value-list没有要求. 当reduce收到一个

利用vim查看日志,快速定位问题

起因 在一般的情况下,如果开发过程中测试报告了一个问题,我一般会这么做: 1.在自己的开发环境下重试一下测试的操作,看看能不能重现问题.不行转2 2.数据库连接池改成测试库的地址,在自己的开发环境下重试一下测试的操作,看看能不能重现问题.不行转3 3.去测试环境查看日志.分析问题. 快速定位问题 去测试环境查看日志,分析问题的时候.因为我Linux命令也不是很熟悉.所以查看日志内容比较累..我以前的做法是下载日志以后在windows环境下利用文本编辑器定位问题. 但是有时候日志比较大,有几百MB