Mapreuduce实现网络数据包的清洗工作

处理后的数据可直接放到hive或者mapreduce程序来统计网络数据流的信息,比如当前实现的是比较简单的http的Get请求的统计

第一个mapreduce:将时间、十六进制包头信息提取出来,并放在一行(这里涉及到mapreduce的键值对的对多行的特殊处理,是个值得注意的地方)

主要遇到两个问题:

  一个数据包包含时间,包头的简单信息,包头的详细信息,初衷是想要把一个数据包的时间、包十六进制详细信息(存在于很多行里)按照顺序放置到一行,在java里面按行读取,很好实现。

针对mapreduce的键值对处理的特性,原来想到有两种方式解决:

(1)以时间的key值为准,一个包的信息key值与其相同

但MR的map每次只处理一行信息,而reduce只对键相同的行做处理,而且从map阶段到reduce的过程中有一个shuffle、sort阶段(估计是这个原因,也可能是因为离reduce近的机器处理完直接发给reduce,先到先处理),相同的key的value是乱序的。

(2)所有的key值递增

这样就没有相同的key值,无法放置到一行

最后的解决办法:

(3)以时间的key值为准,同一个包的信息的key值与其相同,但在十六进制行里加一个递增的id,放置到一行,虽然是乱序的,但自带ID,就重新排一下就好啦,妙!

第二个mapreduce: 对十六进制信息进行排序,是第一个mapreduce的补充,至此,清洗工作完毕,可以统计任意位置的十六进制来分析数据

第三个mapreduce:统计http发送的GET请求个数

static int id=1;
	static int hexId=1;
  public static class TokenizerMapper
       extends Mapper<Object, Text, IntWritable, Text>
 {
    private final static IntWritable one = new IntWritable(2);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException
    {
    	//匹配时间
	 	String regexTime = "([0-2][0-4]):([0-5][0-9]):([0-5][0-9]).[0-9]{6}";// 11:08:56.149361
		Pattern patternTime = Pattern.compile(regexTime);
		Matcher matchTime = patternTime.matcher(value.toString());
		while (matchTime.find()) {
			String time ="time: " + matchTime.group()+" ";
			id=id+1;
			word.set(time);
			one.set(id);
			context.write(one, word);
		}
		//匹配十六进制
//		String regexHex = "0x[0-9]{4}:  ([A-Za-z0-9]{4} )+";
		String regexHex = " ([A-Za-z0-9]{4} )+";
		Pattern patternHex = Pattern.compile(regexHex);
		Matcher matchHex = patternHex.matcher(value.toString());
		while (matchHex.find()) {
			String hex = " "+ matchHex.group();
			hexId=hexId+1;
			hex="id:"+String.valueOf(hexId)+" "+hex;
			word.set(hex);
			one.set(id);
			context.write(one, word);
		}
    }
  }

  public static class IntSumReducer
       extends Reducer<IntWritable,Text,IntWritable,Text>
{
    private Text result = new Text();

    public void reduce(IntWritable key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException
  {
      String sum = "";
      for (Text val : values)
        {
          sum += val.toString();
         }
      result.set(sum);
      context.write(key, result);
    }
  }

  

public static class TokenizerMapper
       extends Mapper<Object, Text, Text, Text>
 {
    private final static Text one = new Text();
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException
    {
    	//匹配时间
	 	String regexTime = "time: ([0-2][0-4]):([0-5][0-9]):([0-5][0-9]).[0-9]{6}";// 11:08:56.149361
		Pattern patternTime = Pattern.compile(regexTime);
		Matcher matchTime = patternTime.matcher(value.toString());
		while (matchTime.find()) {
//			String time ="time: " + matchTime.group()+" ";
			String temptime =matchTime.group();
			String time =temptime.substring(6, temptime.length()-1);
			one.set(time);
		}

		//排序十六进制
//		String regexHex = "0x[0-9]{4}:  ([A-Za-z0-9]{4} )+";
		List<Bar> list = new ArrayList<Bar>();
		String regexHex = "id:([0-9])+   ([A-Za-z0-9]{4} )+";
		Pattern patternHex = Pattern.compile(regexHex);
		Matcher matchHex = patternHex.matcher(value.toString());
		while (matchHex.find()) {
			Bar bar = new Bar();
			String hexline = matchHex.group();
			String regexHex2 ="id:([0-9])+"; //一行十六进制的序号
			Pattern patternHex2 = Pattern.compile(regexHex2);
			Matcher matchHex2 = patternHex2.matcher(hexline);
			while (matchHex2.find()) {
				String lineId=matchHex2.group().toString().substring(3);
				bar.setId(lineId);
			}
			String regexHex3 ="([A-Za-z0-9]{4} )+"; //一行十六进制
			Pattern patternHex3 = Pattern.compile(regexHex3);
			Matcher matchHex3 = patternHex3.matcher(hexline);
			while (matchHex3.find()) {
				String lineHex= matchHex3.group().toString();
				bar.setHexValue(lineHex);
			}
			list.add(bar);
		}

		StringBuffer buffer = new StringBuffer("");
		 Collections.sort(list);
		for(int i=0;i<list.size();i++){
			Bar bar=list.get(i);
			String lineHex=bar.getHexValue();
			buffer.append(lineHex);
		}
		String hexOne= buffer.toString();

		word.set(hexOne);
		context.write(one, word);
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,Text,Text,Text>
{
    private Text result = new Text();

    public void reduce(Text key, Iterable<Text> values,
                       Context context
                       ) throws IOException, InterruptedException
  {
      String sum = "";
      for (Text val : values)
        {
    	  context.write(key, val);
         }
    }
  }

  

	public static class TokenizerMapper extends
			Mapper<Object, Text, Text, IntWritable> {
		private final static IntWritable one = new IntWritable(1);
		private Text word = new Text("sumGet");

		public void map(Object key, Text value, Context context)
				throws IOException, InterruptedException {
			int timelen=15;
			int getlen=20*5+timelen;
			String strline=value.toString();
			if (strline.length() > getlen) {// ||hexValue[20].equals("4854")
				String getPos=strline.substring(timelen+20*5,timelen+21*5-1);
				 if(getPos.equals("4745")){
					 context.write(word, one);
				 }
			}
		}
	}

	public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
		private IntWritable result = new IntWritable();

		public void reduce(Text key, Iterable<IntWritable> values, Context context)
				throws IOException, InterruptedException {
			int sum =0;
			for (IntWritable val : values) {
				sum+=val.get();
			}
			result.set(sum);
			context.write(key, result);
		}
	}

  

时间: 2024-10-11 12:21:26

Mapreuduce实现网络数据包的清洗工作的相关文章

网络数据包分析 网卡Offload

http://blog.nsfocus.net/network-packets-analysis-nic-offload/ 对于网络安全来说,网络传输数据包的捕获和分析是个基础工作,绿盟科技研究员在日常工作中,经常会捕获到一些大小远大于MTU值的数据包,经过分析这些大包的特性,发现和网卡的offload特性有关,本文对网卡Offload技术做简要描述. 文章目录 网络分片技术 网卡offload机制 发送模式 接收模式 网卡offload模式的设置 Linux windows 网卡Offload

Linux内核中网络数据包的接收-第一部分 概念和框架

与网络数据包的发送不同,网络收包是异步的的,因为你不确定谁会在什么时候突然发一个网络包给你,因此这个网络收包逻辑其实包含两件事:1.数据包到来后的通知2.收到通知并从数据包中获取数据这两件事发生在协议栈的两端,即网卡/协议栈边界以及协议栈/应用边界:网卡/协议栈边界:网卡通知数据包到来,中断协议栈收包:协议栈栈/应用边界:协议栈将数据包填充socket队列,通知应用程序有数据可读,应用程序负责接收数据.本文就来介绍一下关于这两个边界的这两件事是怎么一个细节,关乎网卡中断,NAPI,网卡poll,

用C++实现网络编程,抓取网络数据包的实现方法和介绍

做过网管或协议分析的人一般都熟悉sniffer这个工具,它可以捕捉流经本地网卡的所有数据包.抓取网络数据包进行分析有很多用处,如分析网络是否有网络病毒等异常数据,通信协议的分析(数据链路层协议.IP.UDP.TCP.甚至各种应用层协议),敏感数据的捕捉等.下面我们就来看看在windows下如何实现数据包的捕获. 下面先对网络嗅探器的原理做简单介绍. 嗅探器设计原理 嗅探器作为一种网络通讯程序,也是通过对网卡的编程来实现网络通讯的,对网卡的编程也是使用通常的套接字(socket)方式来进行.但是,

Linux程序设计学习笔记----网络编程之网络数据包拆封包与字节顺序大小端

网络数据包的封包与拆包 过程如下: 将数据从一台计算机通过一定的路径发送到另一台计算机.应用层数据通过协议栈发到网络上时,每层协议都要加上一个数据首部(header),称为封装(Encapsulation),如下图所示: 不同的协议层对数据包有不同的称谓,在传输层叫做段(segment),在网络层叫做数据包(packet),在链路层叫做帧(frame).数据封装成帧后发到传输介质上,到达目的主机后每层协议再剥掉相应的首部,最后将应用层数据交给应用程序处理. 上图对应两台计算机在同一网段中的情况,

替代WinPcap的新型Windows网络数据包截获软件——NPcap

NPcap是致力于采用Microsoft Light-Weight Filter (NDIS 6)技术对当前最流行的WinPcap工具包进行改进的一个项目.NPcap项目是最初2013年由Nmap网络扫描器项目(创始人Gordon Lyon)和北京大学罗杨博士发起,由Google公司赞助的一个开源项目,遵循MIT协议(与WinPcap一致).NPcap基于WinPcap 4.1.3源码基础上开发,支持32位和64位架构,在Windows Vista以上版本的系统中,采用NDIS 6技术的NPca

LINUX下的远端主机登入 校园网络注册 网络数据包转发和捕获

第一部分:LINUX 下的远端主机登入和校园网注册 校园网内目的主机远程管理登入程序 本程序为校园网内远程登入,管理功能,该程序分服务器端和客户端两部分:服务器端为remote_server_udp.py 客户端分为单播客户端和广播客户端: 单播客户端client_unicast.py 广播客户端client_broadcast.py 1.单播客户端为根据net.info文件中的网络记录遍历目标网段中的所有IP,向其发送UDP封包. net.info中记录了目标网络中的一个样例IP和目标网段的子

CORE网络数据包接收传递过程分析

能够接收实际网络流量是CORE的一个显著优点,这使得已有的系统能方便地接入虚拟网络进行模拟.CORE对网络设备的虚拟是通过LXC技术来实现的,而对网络的虚拟则是通过虚拟网卡(veth).网桥(Bridge).Quagga来实现的.本文档主要通过分析CORE中网络数据传递过程,来理解CORE网络模拟. 拓扑结构 为了方便描述,以如图1所示拓扑结构为例子,分析数据流从网卡eth0到虚拟节点n2的过程. 图1 示例拓扑 虚拟网络创建由CORE后台根据前台的拓扑结构和配置,执行相应的命令进行实现,如下:

一个最简单的通过WireShark破解SSL加密网络数据包的方法

原文地址: http://article.yeeyan.org/view/530101/444688 一般来说,我们用WireShark来抓取包进行分析是没有多大问题的.但这里有个问题是,如果你碰到的是用SSL/TLS等加密手段加密过的网络数据的时候,往往我们只能束手无策.在过去的话,如果我们拥有的该传输会话的私钥的话我们还是可以将它提供给WireShark来让其对这些加密数据包进行解密的 1. 简介 相信能访问到这篇文章的同行基本上都会用过流行的网络抓包工具WireShark,用它来抓取相应的

发现新大陆:一个最简单的破解SSL加密网络数据包的方法

1. 简介 相信能访问到这篇文章的同行基本上都会用过流行的网络抓包工具WireShark,用它来抓取相应的网络数据包来进行问题分析或者其他你懂的之类的事情. 一般来说,我们用WireShark来抓取包进行分析是没有多大问题的.但这里有个问题是,如果你碰到的是用SSL/TLS等加密手段加密过的网络数据的时候,往往我们只能束手无策.在过去的话,如果我们拥有的该传输会话的私钥的话我们还是可以将它提供给WireShark来让其对这些加密数据包进行解密的,但这已经是想当年还用RSA进行网络数据加密的年代的