Log Processing With Storm

有代码的书籍看起来就是爽,看完顺便跑个demo,感觉很爽!

场景分析

主要是利用apache的访问日志来进行分析统计 如用户的IP来源,来自哪个国家或地区,用户使用的Os,浏览器等信息,以及像搜索的热词等信息的统计 这里日志信息如下

24.25.135.19 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"

这里为了后面的keyword关键词不为空,增加了name=qq

180.183.50.208 - - [1-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"

这里主要涉及到这几个属性

ip ,ip来源 180.183.50.208

dateTime 访问时间 2011:06:20:31

request 请求类型 GET / HTTP/1.1

response 相应状态200

bytesSent 864

referrer http://www.adeveloper.com/resource.html

useragent Mozilla/5.0xxxxx.....

country

browser Firefox

os Windows

keyword qq

主要流程

文中使用file->kafka-->storm-->mysql的模式

这里不让kafka直接读文件了,改了下从flume获取数据,还是使用典型的:

flume—>kafka-->storm-->mysql 这一套来做

环境要求zookeeper,kafka,mysql,flume,storm,其实跑demo本地模式完全不需要storm的

apache-flume-1.4.0-bin.tar.gz
kafka_2.8.0-0.8.0.tar.gz
zookeeper-3.4.5-cdh4.4.0.tar.gz
Storm

首先是建立数据库

create table apachelog(
      id INT NOT NULL AUTO_INCREMENT,
      ip VARCHAR(100) NOT NULL,
      dateTime VARCHAR(200) NOT NULL,
      request VARCHAR(100) NOT NULL,
      response VARCHAR(200) NOT NULL,
      bytesSent VARCHAR(200) NOT NULL,
      referrer VARCHAR(500) NOT NULL,
      useragent VARCHAR(500) NOT NULL,
      country VARCHAR(200) NOT NULL,
      browser VARCHAR(200) NOT NULL,
      os VARCHAR(200) NOT NULL,
      keyword VARCHAR(200) NOT NULL,
      PRIMARY KEY (id)
);

然后,flume kafka producer的配置 cat conf/producer2.properties

#agent section
producer.sources = s
producer.channels = c
producer.sinks = r

#source section
producer.sources.s.type = exec
producer.sources.s.command = tail -F /data/apache.log
producer.sources.s.channels = c

# Each sink‘s type must be defined
producer.sinks.r.type = org.apache.flume.plugins.KafkaSink
producer.sinks.r.metadata.broker.list=127.0.0.1:9092
producer.sinks.r.partition.key=0
producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition
producer.sinks.r.serializer.class=kafka.serializer.StringEncoder
producer.sinks.r.request.required.acks=0
producer.sinks.r.max.message.size=1000000
producer.sinks.r.producer.type=sync
producer.sinks.r.custom.encoding=UTF-8
producer.sinks.r.custom.topic.name=kafkaToptic

#Specify the channel the sink should use
producer.sinks.r.channel = c

# Each channel‘s type is defined.
producer.channels.c.type = memory
producer.channels.c.capacity = 1000

这里数据准备可以参考kafka-producer项目下的apache_test.log,这里为了给keyword创造点数据,特意加上了?name=qq

echo ‘202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"‘>>/data/apache.log

数据准备ok 启动zookeeper,启动kafka,

cd kafka_2.8.0-0.8.0
bin/kafka-server-start.sh config/server.properties

启动flume

bin/flume-ng agent --conf conf  --conf-file conf/producer2.properties  --name producer -Dflume.root.logger=INFO,console

启动flume之后可以自己造点数据了

echo ‘202.27.9.1 - - [2-01-2011:06:20:31 -0500] "GET / HTTP/1.1" 200 864 "http://www.adeveloper.com/resource.html?name=qq" "Mozilla/5.0 (Windows; U; Windows NT 5.1; hu-HU; rv:1.7.12) Gecko/20050919 Firefox/1.0.7"‘>>/data/apache.log

到此数据准备完成,下面是Topology 主类:LogProcessingTopology

import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;

public class LogProcessingTopology {
    public static void main(String[] args) throws Exception {

        ZkHosts zkHosts = new ZkHosts("192.168.137.10:2181");
        SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, "kafkaToptic", "","id");

        kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        //每次都从头开始,额呵呵!
        kafkaConfig.forceFromStart = true;
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("KafkaSpout", new KafkaSpout(kafkaConfig), 1);

        builder.setBolt("LogSplitter", new ApacheLogSplitterBolt(), 1)
                .globalGrouping("KafkaSpout");
        builder.setBolt("IpToCountry",new UserInformationGetterBolt(args[0]), 1)
                .globalGrouping("LogSplitter");
        builder.setBolt("Keyword", new KeyWordIdentifierBolt(), 1)
                .globalGrouping("IpToCountry");
        builder.setBolt("PersistenceBolt",new PersistenceBolt(args[1], args[2], args[3], args[4]),
                1).globalGrouping("Keyword");
        if (args.length == 6) {
            // Run the topology on remote cluster.
            Config conf = new Config();
            conf.setNumWorkers(4);
            try {
                StormSubmitter.submitTopology(args[4], conf,
                        builder.createTopology());
            } catch (AlreadyAliveException alreadyAliveException) {
                System.out.println(alreadyAliveException);
            } catch (InvalidTopologyException invalidTopologyException) {
                System.out.println(invalidTopologyException);
            }
        } else {
            // in local mode.
            LocalCluster cluster = new LocalCluster();
            Config conf = new Config();
            cluster.submitTopology("KafkaToplogy", conf,builder.createTopology());
            try {
                System.out.println("**********************Waiting to consume from kafka");
                Thread.sleep(10000);
            } catch (Exception exception) {
                System.out.println("******************Thread interrupted exception : "+ exception);
            }
            cluster.killTopology("KafkaToplogy");

            cluster.shutdown();

        }

    }
}

这里有KafkaSpout,以及ApacheLogSplitterBolt,UserInformationGetterBolt,KeyWordI

dentifierBolt,PersistenceBolt,开着类名其实就知道是干啥的了,KafkaSpout必然是从

kafka获取数据了,ApacheLogSplitterBolt用来split日志,UserInformationGetterBolt是

用户信息相关,KeyWordIdentifierBolt关键词,热词,PersistenceBolt最后一个自然是讲

数据写入mysql

在来看看各个类的实现

ApacheLogSplitterBolt就是通过表达式来从日志中匹配我们要得东西,主要还是看ApacheLogSplitter

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class ApacheLogSplitterBolt extends BaseBasicBolt {

    private static final long serialVersionUID = 1L;
    private static final ApacheLogSplitter apacheLogSplitter = new ApacheLogSplitter();
    private static final List<String> LOG_ELEMENTS = new ArrayList<String>();
    static {
        LOG_ELEMENTS.add("ip");
        LOG_ELEMENTS.add("dateTime");
        LOG_ELEMENTS.add("request");
        LOG_ELEMENTS.add("response");
        LOG_ELEMENTS.add("bytesSent");
        LOG_ELEMENTS.add("referrer");
        LOG_ELEMENTS.add("useragent");
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String log = input.getString(0);

        if (StringUtils.isBlank(log)||log.equals("xxxx")) {
            return;
        }
        Map<String, Object> logMap = apacheLogSplitter.logSplitter(log);
        List<Object> logdata = new ArrayList<Object>();
        for (String element : LOG_ELEMENTS) {
            logdata.add(logMap.get(element));
        }
        collector.emit(logdata);

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("ip", "dateTime", "request", "response","bytesSent", "referrer", "useragent"));
    }
}

ApacheLogSplitter类,负责日志split。

public class ApacheLogSplitter {

    public Map<String, Object> logSplitter(String apacheLog) {

        String logEntryLine = apacheLog;
        String logEntryPattern = "^([\\d.]+) (\\S+) (\\S+) \\[([\\w-:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) (\\d+) \"([^\"]+)\" \"([^\"]+)\"";

        Pattern p = Pattern.compile(logEntryPattern);
        Matcher matcher = p.matcher(logEntryLine);
        Map<String, Object> logMap = new HashMap<String, Object>();
        if (!matcher.matches() || 9 != matcher.groupCount()) {
            System.err.println("Bad log entry (or problem with RE?):");
            System.err.println(logEntryLine);
            return logMap;
        }
        // set the ip, dateTime, request, etc into map.
        logMap.put("ip", matcher.group(1));
        logMap.put("dateTime", matcher.group(4));
        logMap.put("request", matcher.group(5));
        logMap.put("response", matcher.group(6));
        logMap.put("bytesSent", matcher.group(7));
        logMap.put("referrer", matcher.group(8));
        System.out.println("#######"+matcher.group(8));
        logMap.put("useragent", matcher.group(9));
        return logMap;
    }
}

UserInformationGetterBolt这个做的事有点多,主要是从ip到country的,以及os,浏览器的的定位

package com.learningstorm.stormlogprocessing;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * This class use the IpToCountryConverter and UserAgentTools class to calculate
 * the country, os and browser from log line.
 *
 */
public class UserInformationGetterBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;
    private IpToCountryConverter ipToCountryConverter = null;
    private UserAgentTools userAgentTools = null;
    public OutputCollector collector;
    private String pathTOGeoLiteCityFile;

    public UserInformationGetterBolt(String pathTOGeoLiteCityFile) {
        this.pathTOGeoLiteCityFile = pathTOGeoLiteCityFile;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("ip", "dateTime", "request", "response",
                "bytesSent", "referrer", "useragent", "country", "browser",
                "os"));
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        this.ipToCountryConverter = new IpToCountryConverter(this.pathTOGeoLiteCityFile);
        this.userAgentTools = new UserAgentTools();

    }

    public void execute(Tuple input) {

        String ip = input.getStringByField("ip").toString();

        // calculate the country from ip
        Object country = ipToCountryConverter.ipToCountry(ip);
        // calculate the browser from useragent.
        Object browser = userAgentTools.getBrowser(input.getStringByField(
                "useragent").toString())[1];
        // calculate the os from useragent.
        Object os = userAgentTools.getOS(input.getStringByField("useragent")
                .toString())[1];
        collector.emit(new Values(input.getString(0), input.getString(1), input
                .getString(2), input.getString(3), input.getString(4), input
                .getString(5), input.getString(6), country, browser, os));

    }
}

ip到country使用的是GeoIP包得LookupService

package com.learningstorm.stormlogprocessing;

import com.maxmind.geoip.Location;
import com.maxmind.geoip.LookupService;

/**
 * This class contains logic to calculate the country name from IP address
 *
 */
public class IpToCountryConverter {

    private static LookupService cl = null;

    /**
     * An parameterised constructor which would take the location of
     * GeoLiteCity.dat file as input.
     *
     * @param pathTOGeoLiteCityFile
     */
    public IpToCountryConverter(String pathTOGeoLiteCityFile) {
        try {
            cl = new LookupService(pathTOGeoLiteCityFile,
                    LookupService.GEOIP_MEMORY_CACHE);
        } catch (Exception e) {
            throw new RuntimeException(
                    "Error occurred while initializing IpToCountryConverter class : "+e.getMessage());
        }
    }

    /**
     * This method takes ip address an input and convert it into country name.
     *
     * @param ip
     * @return
     */
    public String ipToCountry(String ip) {
        Location location = cl.getLocation(ip);
        if (location == null) {
            return "NA";
        }
        if (location.countryName == null) {
            return "NA";
        }
        return location.countryName;
    }
}

用户,浏览器,终端这种慢慢看吧,太废劲!

package com.learningstorm.stormlogprocessing;

public class UserAgentTools {

      public  String getFirstVersionNumber(String a_userAgent, int a_position, int numDigits) {
        String ver = getVersionNumber(a_userAgent, a_position);
        if (ver==null) return "";
        int i = 0;
        String res="";
        while (i<ver.length() && i<numDigits) {
          res+=String.valueOf(ver.charAt(i));
          i++;
        }
        return res;
      }
      public  String getVersionNumber(String a_userAgent, int a_position) {
          if (a_position<0) return "";
          StringBuffer res = new StringBuffer();
          int status = 0;

          while (a_position < a_userAgent.length()) {
              char c = a_userAgent.charAt(a_position);
              switch (status) {
                case 0: //<SPAN class="codecomment"> No valid digits encountered yet</span>
                  if (c == ‘ ‘ || c==‘/‘) break;
                  if (c == ‘;‘ || c==‘)‘) return "";
                  status = 1;
                case 1: //<SPAN class="codecomment"> Version number in progress</span>
                  if (c == ‘;‘ || c==‘/‘ || c==‘)‘ || c==‘(‘ || c==‘[‘) return res.toString().trim();
                  if (c == ‘ ‘) status = 2;
                  res.append(c);
                  break;
                case 2: //<SPAN class="codecomment"> Space encountered - Might need to end the parsing</span>
                  if ((Character.isLetter(c) &&
                       Character.isLowerCase(c)) ||
                      Character.isDigit(c)) {
                      res.append(c);
                      status=1;
                  } else
                      return res.toString().trim();
                  break;
              }
              a_position++;
          }
          return res.toString().trim();
      }

      public  String[]getArray(String a, String b, String c) {
        String[]res = new String[3];
        res[0]=a;
        res[1]=b;
        res[2]=c;
        return res;
      }

      public  String[] getBotName(String userAgent) {
        userAgent = userAgent.toLowerCase();
        int pos=0;
        String res=null;
        if ((pos=userAgent.indexOf("help.yahoo.com/"))>-1) {
            res= "Yahoo";
            pos+=7;
        } else
        if ((pos=userAgent.indexOf("google/"))>-1) {
            res= "Google";
            pos+=7;
        } else
        if ((pos=userAgent.indexOf("msnbot/"))>-1) {
            res= "MSNBot";
            pos+=7;
        } else
        if ((pos=userAgent.indexOf("googlebot/"))>-1) {
            res= "Google";
            pos+=10;
        } else
        if ((pos=userAgent.indexOf("webcrawler/"))>-1) {
            res= "WebCrawler";
            pos+=11;
        } else
        //<SPAN class="codecomment"> The following two bots don‘t have any version number in their User-Agent strings.</span>
        if ((pos=userAgent.indexOf("inktomi"))>-1) {
            res= "Inktomi";
            pos=-1;
        } else
        if ((pos=userAgent.indexOf("teoma"))>-1) {
            res= "Teoma";
            pos=-1;
        }
        if (res==null) return null;
        return getArray(res,res,res + getVersionNumber(userAgent,pos));
      }

      public  String[] getOS(String userAgent) {
        if (getBotName(userAgent)!=null) return getArray("Bot","Bot","Bot");
        String[]res = null;
        int pos;
        if ((pos=userAgent.indexOf("Windows-NT"))>-1) {
            res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+8));
        } else
        if (userAgent.indexOf("Windows NT")>-1) {
            //<SPAN class="codecomment"> The different versions of Windows NT are decoded in the verbosity level 2</span>
            //<SPAN class="codecomment"> ie: Windows NT 5.1 = Windows XP</span>
            if ((pos=userAgent.indexOf("Windows NT 5.1"))>-1) {
                res = getArray("Win","WinXP","Win"+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT 6.0"))>-1) {
                res = getArray("Win","Vista","Vista"+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT 6.1"))>-1) {
                res = getArray("Win","Seven","Seven "+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT 5.0"))>-1) {
                res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT 5.2"))>-1) {
                res = getArray("Win","Win2003","Win"+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT 4.0"))>-1) {
                res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+7));
            } else
            if ((pos=userAgent.indexOf("Windows NT)"))>-1) {
                res = getArray("Win","WinNT","WinNT");
            } else
            if ((pos=userAgent.indexOf("Windows NT;"))>-1) {
                res = getArray("Win","WinNT","WinNT");
            } else
            res = getArray("Win","<B>WinNT?</B>","<B>WinNT?</B>");
        } else
        if (userAgent.indexOf("Win")>-1) {
            if (userAgent.indexOf("Windows")>-1) {
                if ((pos=userAgent.indexOf("Windows 98"))>-1) {
                    res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+7));
                } else
                if ((pos=userAgent.indexOf("Windows_98"))>-1) {
                    res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+8));
                } else
                if ((pos=userAgent.indexOf("Windows 2000"))>-1) {
                    res = getArray("Win","Win2000","Win"+getVersionNumber(userAgent,pos+7));
                } else
                if ((pos=userAgent.indexOf("Windows 95"))>-1) {
                    res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+7));
                } else
                if ((pos=userAgent.indexOf("Windows 9x"))>-1) {
                    res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+7));
                } else
                if ((pos=userAgent.indexOf("Windows ME"))>-1) {
                    res = getArray("Win","WinME","Win"+getVersionNumber(userAgent,pos+7));
                } else
                if ((pos=userAgent.indexOf("Windows CE;"))>-1) {
                    res = getArray("Win","WinCE","WinCE");
                } else
                if ((pos=userAgent.indexOf("Windows 3.1"))>-1) {
                    res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+7));
                }

            }
            if (res == null) {
                if ((pos=userAgent.indexOf("Win98"))>-1) {
                    res = getArray("Win","Win98","Win"+getVersionNumber(userAgent,pos+3));
                } else
                if ((pos=userAgent.indexOf("Win31"))>-1) {
                    res = getArray("Win","Win31","Win"+getVersionNumber(userAgent,pos+3));
                } else
                if ((pos=userAgent.indexOf("Win95"))>-1) {
                    res = getArray("Win","Win95","Win"+getVersionNumber(userAgent,pos+3));
                } else
                if ((pos=userAgent.indexOf("Win 9x"))>-1) {
                    res = getArray("Win","Win9x","Win"+getVersionNumber(userAgent,pos+3));
                } else
                if ((pos=userAgent.indexOf("WinNT4.0"))>-1) {
                    res = getArray("Win","WinNT4","Win"+getVersionNumber(userAgent,pos+3));
                } else
                if ((pos=userAgent.indexOf("WinNT"))>-1) {
                    res = getArray("Win","WinNT","Win"+getVersionNumber(userAgent,pos+3));
                }
            }
            if (res == null) {
                if ((pos=userAgent.indexOf("Windows"))>-1) {
                  res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+7)+"</B>");
                } else
                if ((pos=userAgent.indexOf("Win"))>-1) {
                  res = getArray("Win","<B>Win?</B>","<B>Win?"+getVersionNumber(userAgent,pos+3)+"</B>");
                } else
                  res = getArray("Win","<B>Win?</B>","<B>Win?</B>");
            }
        } else
        if ((pos=userAgent.indexOf("Mac OS X"))>-1) {
            if ((userAgent.indexOf("iPhone"))>-1) {
                pos = userAgent.indexOf("iPhone OS");
                if ((userAgent.indexOf("iPod"))>-1) {
                    res = getArray("iOS","iOS-iPod","iOS-iPod "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
                } else {
                    res = getArray("iOS","iOS-iPhone","iOS-iPhone "+((pos<0)?"":getVersionNumber(userAgent,pos+9)));
                }
            } else
            if ((userAgent.indexOf("iPad"))>-1) {
                pos = userAgent.indexOf("CPU OS");
                res = getArray("iOS","iOS-iPad","iOS-iPad "+((pos<0)?"":getVersionNumber(userAgent,pos+6)));
            } else
                res = getArray("Mac","MacOSX","MacOS "+getVersionNumber(userAgent,pos+8));
        } else
        if ((pos=userAgent.indexOf("Android"))>-1) {
            res = getArray("Linux","Android","Android "+getVersionNumber(userAgent,pos+8));
        } else
        if ((pos=userAgent.indexOf("Mac_PowerPC"))>-1) {
            res = getArray("Mac","MacPPC","MacOS "+getVersionNumber(userAgent,pos+3));
        } else
        if ((pos=userAgent.indexOf("Macintosh"))>-1) {
            if (userAgent.indexOf("PPC")>-1)
                res = getArray("Mac","MacPPC","Mac PPC");
            else
                res = getArray("Mac?","Mac?","MacOS?");
        } else
        if ((pos=userAgent.indexOf("FreeBSD"))>-1) {
            res = getArray("*BSD","*BSD FreeBSD","FreeBSD "+getVersionNumber(userAgent,pos+7));
        } else
        if ((pos=userAgent.indexOf("OpenBSD"))>-1) {
            res = getArray("*BSD","*BSD OpenBSD","OpenBSD "+getVersionNumber(userAgent,pos+7));
        } else
        if ((pos=userAgent.indexOf("Linux"))>-1) {
            String detail = "Linux "+getVersionNumber(userAgent,pos+5);
            String med = "Linux";
            if ((pos=userAgent.indexOf("Ubuntu/"))>-1) {
                detail = "Ubuntu "+getVersionNumber(userAgent,pos+7);
                med+=" Ubuntu";
            }
            res = getArray("Linux",med,detail);
        } else
        if ((pos=userAgent.indexOf("CentOS"))>-1) {
            res = getArray("Linux","Linux CentOS","CentOS");
        } else
        if ((pos=userAgent.indexOf("NetBSD"))>-1) {
            res = getArray("*BSD","*BSD NetBSD","NetBSD "+getVersionNumber(userAgent,pos+6));
        } else
        if ((pos=userAgent.indexOf("Unix"))>-1) {
            res = getArray("Linux","Linux","Linux "+getVersionNumber(userAgent,pos+4));
        } else
        if ((pos=userAgent.indexOf("SunOS"))>-1) {
            res = getArray("Unix","SunOS","SunOS"+getVersionNumber(userAgent,pos+5));
        } else
        if ((pos=userAgent.indexOf("IRIX"))>-1) {
            res = getArray("Unix","IRIX","IRIX"+getVersionNumber(userAgent,pos+4));
        } else
        if ((pos=userAgent.indexOf("SonyEricsson"))>-1) {
            res = getArray("SonyEricsson","SonyEricsson","SonyEricsson"+getVersionNumber(userAgent,pos+12));
        } else
        if ((pos=userAgent.indexOf("Nokia"))>-1) {
            res = getArray("Nokia","Nokia","Nokia"+getVersionNumber(userAgent,pos+5));
        } else
        if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
            res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+10));
        } else
        if ((pos=userAgent.indexOf("SymbianOS"))>-1) {
            res = getArray("SymbianOS","SymbianOS","SymbianOS"+getVersionNumber(userAgent,pos+10));
        } else
        if ((pos=userAgent.indexOf("BeOS"))>-1) {
            res = getArray("BeOS","BeOS","BeOS");
        } else
        if ((pos=userAgent.indexOf("Nintendo Wii"))>-1) {
            res = getArray("Nintendo Wii","Nintendo Wii","Nintendo Wii"+getVersionNumber(userAgent,pos+10));
        } else
        if ((pos=userAgent.indexOf("J2ME/MIDP"))>-1) {
            res = getArray("Java","J2ME","J2ME/MIDP");
        } else
        res = getArray("<b>?</b>","<b>?</b>","<b>?</b>");
        return res;
      }

      public  String []getBrowser(String userAgent) {
        String []botName;
        if ((botName=getBotName(userAgent))!=null) return botName;
        String[]res = null;
        int pos;
        if ((pos=userAgent.indexOf("Lotus-Notes/"))>-1) {
            res = getArray("LotusNotes","LotusNotes","LotusNotes"+getVersionNumber(userAgent,pos+12));
        } else
        if ((pos=userAgent.indexOf("Opera"))>-1) {
            String ver = getVersionNumber(userAgent,pos+5);
            res = getArray("Opera","Opera"+getFirstVersionNumber(userAgent,pos+5,1),"Opera"+ver);
            if ((pos=userAgent.indexOf("Opera Mini/"))>-1) {
                String ver2 = getVersionNumber(userAgent,pos+11);
                res = getArray("Opera","Opera Mini","Opera Mini "+ver2);
            } else
            if ((pos=userAgent.indexOf("Opera Mobi/"))>-1) {
                String ver2 = getVersionNumber(userAgent,pos+11);
                res = getArray("Opera","Opera Mobi","Opera Mobi "+ver2);
            }
        } else
        if (userAgent.indexOf("MSIE")>-1) {
            if ((pos=userAgent.indexOf("MSIE 6.0"))>-1) {
                res = getArray("MSIE","MSIE6","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 5.0"))>-1) {
                res = getArray("MSIE","MSIE5","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 5.5"))>-1) {
                res = getArray("MSIE","MSIE5.5","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 5."))>-1) {
                res = getArray("MSIE","MSIE5.x","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 4"))>-1) {
                res = getArray("MSIE","MSIE4","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 7"))>-1 && userAgent.indexOf("Trident/4.0")<0) {
                res = getArray("MSIE","MSIE7","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 8"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
                res = getArray("MSIE","MSIE8","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            if ((pos=userAgent.indexOf("MSIE 9"))>-1 || userAgent.indexOf("Trident/4.0")>-1) {
                res = getArray("MSIE","MSIE9","MSIE"+getVersionNumber(userAgent,pos+4));
            } else
            res = getArray("MSIE","<B>MSIE?</B>","<B>MSIE?"+getVersionNumber(userAgent,userAgent.indexOf("MSIE")+4)+"</B>");
        } else
        if ((pos=userAgent.indexOf("Gecko/"))>-1) {
            res = getArray("Gecko","Gecko","Gecko"+getFirstVersionNumber(userAgent,pos+5,4));
            if ((pos=userAgent.indexOf("Camino/"))>-1) {
                res[1]+="(Camino)";
                res[2]+="(Camino"+getVersionNumber(userAgent,pos+7)+")";
            } else
            if ((pos=userAgent.indexOf("Chimera/"))>-1) {
                res[1]+="(Chimera)";
                res[2]+="(Chimera"+getVersionNumber(userAgent,pos+8)+")";
            } else
            if ((pos=userAgent.indexOf("Firebird/"))>-1) {
                res[1]+="(Firebird)";
                res[2]+="(Firebird"+getVersionNumber(userAgent,pos+9)+")";
            } else
            if ((pos=userAgent.indexOf("Phoenix/"))>-1) {
                res[1]+="(Phoenix)";
                res[2]+="(Phoenix"+getVersionNumber(userAgent,pos+8)+")";
            } else
            if ((pos=userAgent.indexOf("Galeon/"))>-1) {
                res[1]+="(Galeon)";
                res[2]+="(Galeon"+getVersionNumber(userAgent,pos+7)+")";
            } else
            if ((pos=userAgent.indexOf("Firefox/"))>-1) {
                res[1]+="(Firefox)";
                res[2]+="(Firefox"+getVersionNumber(userAgent,pos+8)+")";
            } else
            if ((pos=userAgent.indexOf("Netscape/"))>-1) {
                if ((pos=userAgent.indexOf("Netscape/6"))>-1) {
                    res[1]+="(NS6)";
                    res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
                } else
                if ((pos=userAgent.indexOf("Netscape/7"))>-1) {
                    res[1]+="(NS7)";
                    res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
                } else
                if ((pos=userAgent.indexOf("Netscape/8"))>-1) {
                    res[1]+="(NS8)";
                    res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
                } else
                if ((pos=userAgent.indexOf("Netscape/9"))>-1) {
                    res[1]+="(NS9)";
                    res[2]+="(NS"+getVersionNumber(userAgent,pos+9)+")";
                } else {
                    res[1]+="(NS?)";
                    res[2]+="(NS?"+getVersionNumber(userAgent,userAgent.indexOf("Netscape/")+9)+")";
                }
            }
        } else
        if ((pos=userAgent.indexOf("Netscape/"))>-1) {
            if ((pos=userAgent.indexOf("Netscape/4"))>-1) {
                res = getArray("NS","NS4","NS"+getVersionNumber(userAgent,pos+9));
            } else
                res = getArray("NS","NS?","NS?"+getVersionNumber(userAgent,pos+9));
        } else
        if ((pos=userAgent.indexOf("Chrome/"))>-1) {
            res = getArray("KHTML","KHTML(Chrome)","KHTML(Chrome"+getVersionNumber(userAgent,pos+6)+")");
        } else
        if ((pos=userAgent.indexOf("Safari/"))>-1) {
            res = getArray("KHTML","KHTML(Safari)","KHTML(Safari"+getVersionNumber(userAgent,pos+6)+")");
        } else
        if ((pos=userAgent.indexOf("Konqueror/"))>-1) {
            res = getArray("KHTML","KHTML(Konqueror)","KHTML(Konqueror"+getVersionNumber(userAgent,pos+9)+")");
        } else
        if ((pos=userAgent.indexOf("KHTML"))>-1) {
            res = getArray("KHTML","KHTML?","KHTML?("+getVersionNumber(userAgent,pos+5)+")");
        } else
        if ((pos=userAgent.indexOf("NetFront"))>-1) {
            res = getArray("NetFront","NetFront","NetFront "+getVersionNumber(userAgent,pos+8));
        } else
        if ((pos=userAgent.indexOf("BlackBerry"))>-1) {
            pos=userAgent.indexOf("/",pos+2);
            res = getArray("BlackBerry","BlackBerry","BlackBerry"+getVersionNumber(userAgent,pos+1));
        } else
        //<SPAN class="codecomment"> We will interpret Mozilla/4.x as Netscape Communicator is and only if x</span>
        //<SPAN class="codecomment"> is not 0 or 5</span>
        if (userAgent.indexOf("Mozilla/4.")==0 &&
            userAgent.indexOf("Mozilla/4.0")<0 &&
            userAgent.indexOf("Mozilla/4.5 ")<0) {
            res = getArray("Communicator","Communicator","Communicator"+getVersionNumber(userAgent,pos+8));
        } else
        return getArray("<B>?</B>","<B>?</B>","<B>?</B>");
        return res;
      }
    }

还有个keyword的提取

package com.learningstorm.stormlogprocessing;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

/**
 * This class use the KeywordGenerator class to generate the search keyword from
 * referrer URL.
 *
 */
public class KeyWordIdentifierBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;
    private KeywordGenerator keywordGenerator = null;
    public OutputCollector collector;

    public KeyWordIdentifierBolt() {

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("ip", "dateTime", "request", "response",
                "bytesSent", "referrer", "useragent", "country", "browser",
                "os", "keyword"));
    }

    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
        this.keywordGenerator = new KeywordGenerator();

    }

    public void execute(Tuple input) {

        String referrer = input.getStringByField("referrer").toString();
        // call the getKeyword(String referrer) method KeywordGenerator class to
        // generate the search keyword.
        Object keyword = keywordGenerator.getKeyword(referrer);
        // emits all the field emitted by previous bolt + keyword
        collector.emit(new Values(input.getString(0), input.getString(1), input
                .getString(2), input.getString(3), input.getString(4), input
                .getString(5), input.getString(6), input.getString(7), input
                .getString(8), input.getString(9), keyword));

    }
}

这里干事的其实也是keywordGenerator,通过匹配URL的参数


import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class KeywordGenerator {
    public String getKeyword(String referer) {

        String[] temp;
        Pattern pat = Pattern.compile("[?&#]name=([^&]+)");
        Matcher m = pat.matcher(referer);
        if (m.find()) {
            String searchTerm = null;
            searchTerm = m.group(1);
            temp = searchTerm.split("\\+");
            searchTerm = temp[0];
            for (int i = 1; i < temp.length; i++) {
                searchTerm = searchTerm + " " + temp[i];
            }
            return searchTerm;
        } else {
            pat = Pattern.compile("[?&#]p=([^&]+)");
            m = pat.matcher(referer);
            if (m.find()) {
                String searchTerm = null;
                searchTerm = m.group(1);
                temp = searchTerm.split("\\+");
                searchTerm = temp[0];
                for (int i = 1; i < temp.length; i++) {
                    searchTerm = searchTerm + " " + temp[i];
                }
                return searchTerm;
            } else {
                //
                pat = Pattern.compile("[?&#]query=([^&]+)");
                m = pat.matcher(referer);
                if (m.find()) {
                    String searchTerm = null;
                    searchTerm = m.group(1);
                    temp = searchTerm.split("\\+");
                    searchTerm = temp[0];
                    for (int i = 1; i < temp.length; i++) {
                        searchTerm = searchTerm + " " + temp[i];
                    }
                    return searchTerm;
                }  else {
                        return "NA";
                    }
                }
        }
    }

}

最后是PersistenceBolt,主要是将最终的结果

public class PersistenceBolt implements IBasicBolt {

    private MySQLDump mySQLDump = null;
    private static final long serialVersionUID = 1L;
    private String database;
    private String user;
    private String ip;
    private String password;

    public PersistenceBolt(String ip, String database, String user,
            String password) {
        this.ip = ip;
        this.database = database;
        this.user = user;
        this.password = password;
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public void prepare(Map stormConf, TopologyContext context) {
        mySQLDump = new MySQLDump(ip, database, user, password);
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        System.out.println("Input tuple : " + input);
        mySQLDump.persistRecord(input);
    }

    public void cleanup() {
        mySQLDump.close();
    }

}

实际的数据库操作

package com.learningstorm.stormlogprocessing;

import java.sql.Connection;
import java.sql.PreparedStatement;

import backtype.storm.tuple.Tuple;
/**
 * This class contains logic to persist record into MySQL database.
 *
 */
public class MySQLDump {
    private String database;
    private String user;
    private String ip;
    private String password;

    public MySQLDump(String ip, String database, String user, String password) {
        this.ip = ip;
        this.database = database;
        this.user = user;
        this.password = password;
    }

    private Connection connect = MySQLConnection.getMySQLConnection("192.168.137.10", "test", "root", "111111");

    private PreparedStatement preparedStatement = null;
    public void persistRecord(Tuple tuple) {
        try {

            preparedStatement = connect
                    .prepareStatement("insert into  apachelog values (default,?, ?, ?, ?, ? , ?, ?, ?,?,?,?)");

            preparedStatement.setString(1, tuple.getStringByField("ip"));
            preparedStatement.setString(2, tuple.getStringByField("dateTime"));
            preparedStatement.setString(3, tuple.getStringByField("request"));
            preparedStatement.setString(4, tuple.getStringByField("response"));
            preparedStatement.setString(5, tuple.getStringByField("bytesSent"));
            preparedStatement.setString(6, tuple.getStringByField("referrer"));
            preparedStatement.setString(7, tuple.getStringByField("useragent"));
            preparedStatement.setString(8, tuple.getStringByField("country"));
            preparedStatement.setString(9, tuple.getStringByField("browser"));
            preparedStatement.setString(10, tuple.getStringByField("os"));
            preparedStatement.setString(11, tuple.getStringByField("keyword"));

            preparedStatement.executeUpdate();

        } catch (Exception e) {
            throw new RuntimeException(
                    "Error occurred while persisting records in mysql : ");
        } finally {
            if (preparedStatement != null) {
                try {
                    preparedStatement.close();
                } catch (Exception exception) {
                    System.out
                            .println("Error occurred while closing PreparedStatement : ");
                }
            }
        }

    }

    public void close() {
        try {
        connect.close();
        }catch(Exception exception) {
            System.out.println("Error occurred while clossing the connection");
        }
    }

}

数据库连接类MySQLConnection

public class MySQLConnection {

    private static Connection connect = null;

    public static Connection getMySQLConnection(String ip, String database, String user, String password) {
        try {
            Class.forName("com.mysql.jdbc.Driver");
            String url ="jdbc:mysql://"+ip+"/"+database+"?"
                    + "user="+user+"&password="+password+"";
            connect = DriverManager
                    .getConnection(url);
            return connect;
        } catch (Exception e) {
            throw new RuntimeException("Error occurred while get mysql connection : " +e.getMessage());
        }
    }
时间: 2024-08-10 19:06:04

Log Processing With Storm的相关文章

Storm(2) - Log Stream Processing

Introduction This chapter will present an implementation recipe for an enterprise log storage and a search and analysis solution based on the Storm processor. Log data processing isn't necessarily a problem that needs solving again; it is, however, a

storm启动nimbus源码分析-nimbus.clj

nimbus是storm集群的"控制器",是storm集群的重要组成部分.我们可以通用执行bin/storm nimbus >/dev/null 2>&1 &来启动nimbus.bin/storm是一个python脚本,在这个脚本中定义了一个nimbus函数: nimbus函数 def nimbus(klass="backtype.storm.daemon.nimbus"):    """Syntax: [s

storm启动supervisor源码分析-supervisor.clj

supervisor是storm集群重要组成部分,supervisor主要负责管理各个"工作节点".supervisor与zookeeper进行通信,通过zookeeper的"watch机制"可以感知到是否有新的任务需要认领或哪些任务被重新分配.我们可以通用执行bin/storm supervisor >/dev/null 2>&1 &来启动supervisor.bin/storm是一个python脚本,在这个脚本中定义了一个superv

Storm实战常见问题及解决方案

文档说明 该文档包涵了storm实战中经常遇到一些问题,及对应解决方案.这个文档是群里一个朋友在学习storm,并实战storm中遇到的一些问题,及和群里其他朋友一起交流给出的对应解决方案,并由他整理好,委托我发布出来(也算是交流者之一),供大家参考,希望能对大家有所帮助. 感谢 某某(哈哈 鉴于部分原因,不便透露名字~~~~!)… 问题锦集 1 关于Storm集群 1.1 关于storm集群的环境变量配置问题 安装好JDK后,需要配置环境变量,通常情况下出于经验,我们往往会修改/etc/pro

转载文档:Storm实战常见问题及解决方案

该文档为实实在在的原创文档,转载请注明: http://blog.sina.com.cn/s/blog_8c243ea30101k0k1.html 类型 详细 备注 该文档是群里几个朋友在storm实战中遇到的一些问题,及其对应解决办法.     相关描述 ²  其他相关文档请参考新浪博客http://blog.sina.com.cn/huangchongyuan ²  有任何其他想法,可以邮件[email protected] ² 文档及相关资料下载请个人360云盘http://yunpan.

Storm 安装速记

大数据数据处理最常用的是两类模式:批处理和流计算.在open source领域,批处理最有名的组件自然是大名鼎鼎的Hadoop MapReduce,而流计算则是Storm. Storm是一个分布式的.容错的实时计算系统,目前是Apache 的一个incubator project (http://storm.incubator.apache.org/).介绍Storm基本概念的文章已经很多了,本文就不再赘述.在此仅仅速记一下笔者实际安装 Storm的步骤. 详细步骤: 1.安装Zookeeper

storm - 简介

一 Storm简介 Storm是Twitter开源的一个类似于Hadoop的实时数据处理框架,它原来是由BackType开发,后BackType被Twitter收购,将Storm作为Twitter的实时数据分析系统. 实时数据处理的应用场景很广泛,例如商品推荐,广告投放,它能根据当前情景上下文(用户偏好,地理位置,已发生的查询和点击等)来估计用户点击的可能性并实时做出调整. twitter列举了storm的三大作用领域: 1.信息流处理(Stream Processing) Storm可以用来实

Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2

Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2 .mobi: http://www.t00y.com/file/79497801 Apache Hadoop YARN: Moving beyond MapReduce and Batch Processing with Apache Hadoop 2.pdf: http://www.t00y.com/file/8034244

storm入门

最近学习了storm的一些基础知识,感觉storm是一个非常强大的实时流处理系统.对其进行简要介绍如下: STORM 1.什么是storm Storm是一个开源的,分布式的,可靠的,实时数据流处理系统.类比Hadoop对数据进行批处理,storm对数据进行实时处理. 2.storm的应用场景 Storm的处理速度快吞吐量大,根据Storm官方网站的资料介绍,Storm的一个节点(Intel [email protected]的CPU,24 GB的内存)在1秒钟能够处理100万个100字节的消息.