1. 日志文件每天生成一份(需要将日志文件定时上传至hdfs) 2. 分析日志文件中包含的字段:访问IP,访问时间,访问URL,访问状态,访问流量 3. 现在有"昨日"的日志文件即logclean.jar 3. 需求指标 a. 统计PV值 b. 统计注册人数 c. 统计IP数 d. 统计跳出率 f. 统计二跳率
1. 数据采集 使用shell脚本定时上传 2. 数据清洗 过滤字段 格式化时间等字段 3. 数据分析 使用一级分区(date) 4. 数据导出 sqoop 5. 使用到的框架有: shell脚本 hdfs mapreduce hive sqoop mysql 期望结果 pv register ip jumpprob two_jumpprob
1. 自动上传到hdfs $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1 $HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1 $HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1 2. 数据清洗(使用mapreduce过滤脏数据与不需要的静态数据及去双引号,转换date) $HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_OUTPUT_PATH > /dev/null 2>&1 $HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday 3. 在Hive中创建日志数据库和分区表并将清洗后的文件加入分区 $HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1 $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE( ip string,day string,url string) partitioned by (date string) row format delimited fields terminated by ‘\t‘ location ‘$HDFS_OUTPUT_PATH‘ " $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date=‘$yesterday‘)" 4. 分析数据并使用sqoop导出至mysql pv: create table if not exists pv_tb(pv string) row format delimited fields terminated by ‘\t‘; insert overwrite table pv_tb select count(1) from weblog_clean where date=‘2016_11_13‘; register: create table if not exists register_tb(register string) row format delimited fields terminated by ‘\t‘; insert overwrite table register_tb select count(1) from weblog_clean where date=‘2016_11_13‘ and instr(url,‘member.php?mod=register‘) > 0; ip: create table if not exists ip_tb(ip string) row format delimited fields terminated by ‘\t‘; insert overwrite table ip_tb select count(distinct ip) from weblog_clean where date=‘2016_11_13‘; jumpprob: create table if not exists jumpprob_tb(jump double) row format delimited fields terminated by ‘\t‘; insert overwrite table jumpprob_tb select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips <2) gip) ghip, (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip; two_jumpprob: create table if not exists two_jumpprob_tb(jump double) row format delimited fields terminated by ‘\t‘; insert overwrite table two_jumpprob_tb select ghip.singleip/aip.ips from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips >=2) gip) ghip, (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip; merge table # 注意上面几个表是分开创建,效率比下面高,但存储消耗上面较高 create table if not exists log_result(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by ‘\t‘; insert overwrite table log_result select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from weblog_clean where date=‘2016_11_13‘) log_pv, (select count(1) register from weblog_clean where date=‘2016_11_13‘ and instr(url,‘member.php?mod=register‘) > 0) log_register, (select count(distinct ip) ip from weblog_clean where date=‘2016_11_13‘) log_ip, (select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips <2) gip) ghip, (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip) log_jumpprob, (select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from weblog_clean where date=‘2016_11_13‘ group by ip having ips >=2) gip) ghip, (select count(ip) ips from weblog_clean where date=‘2016_11_13‘) aip) log_two_jumpprob;
mysql> select * from weblog_result; +--------+----------+-------+----------+--------------+ | pv | register | ip | jumpprob | two_jumpprob | +--------+----------+-------+----------+--------------+ | 169857 | 28 | 10411 | 0.02 | 0.04 | +--------+----------+-------+----------+--------------+ 1 row in set (0.00 sec)
package org.apache.hadoop.log.project; import java.net.URI; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Locale; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class LogClean extends Configured implements Tool { public static void main(String[] args) { Configuration conf = new Configuration(); try { int res = ToolRunner.run(conf, new LogClean(), args); System.exit(res); } catch (Exception e) { e.printStackTrace(); } } public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "logclean"); // 设置为可以打包运行 job.setJarByClass(LogClean.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(LongWritable.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(NullWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); // 清理已存在的输出文件 FileSystem fs = FileSystem.get(new URI(args[0]), getConf()); Path outPath = new Path(args[1]); if (fs.exists(outPath)) { fs.delete(outPath, true); } boolean success = job.waitForCompletion(true); if(success){ System.out.println("Clean process success!"); } else{ System.out.println("Clean process failed!"); } return 0; } static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text> { LogParser logParser = new LogParser(); Text outputValue = new Text(); protected void map( LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, LongWritable, Text>.Context context) throws java.io.IOException, InterruptedException { final String[] parsed = logParser.parse(value.toString()); // step1.过滤掉静态资源访问请求 if (parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")) { return; } // step2.过滤掉开头的指定字符串 if (parsed[2].startsWith("GET /")) { parsed[2] = parsed[2].substring("GET /".length()); } else if (parsed[2].startsWith("POST /")) { parsed[2] = parsed[2].substring("POST /".length()); } // step3.过滤掉结尾的特定字符串 if (parsed[2].endsWith(" HTTP/1.1")) { parsed[2] = parsed[2].substring(0, parsed[2].length() - " HTTP/1.1".length()); } // step4.只写入前三个记录类型项 outputValue.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]); context.write(key, outputValue); } } static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable> { protected void reduce( LongWritable k2, java.lang.Iterable<Text> v2s, org.apache.hadoop.mapreduce.Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws java.io.IOException, InterruptedException { for (Text v2 : v2s) { context.write(v2, NullWritable.get()); } }; } /* * 日志解析类 */ static class LogParser { public static final SimpleDateFormat FORMAT = new SimpleDateFormat( "d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH); public static final SimpleDateFormat dateformat1 = new SimpleDateFormat( "yyyyMMddHHmmss"); public static void main(String[] args) throws ParseException { final String S1 = " - - [30/May/2013:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127"; LogParser parser = new LogParser(); final String[] array = parser.parse(S1); System.out.println("样例数据: " + S1); System.out.format( "解析结果: ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]); } /** * 解析英文时间字符串 * * @param string * @return * @throws ParseException */ private Date parseDateFormat(String string) { Date parse = null; try { parse = FORMAT.parse(string); } catch (ParseException e) { e.printStackTrace(); } return parse; } /** * 解析日志的行记录 * * @param line * @return 数组含有5个元素,分别是ip、时间、url、状态、流量 */ public String[] parse(String line) { String ip = parseIP(line); String time = parseTime(line); String url = parseURL(line); String status = parseStatus(line); String traffic = parseTraffic(line); return new String[] { ip, time, url, status, traffic }; } private String parseTraffic(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String traffic = trim.split(" ")[1]; return traffic; } private String parseStatus(String line) { final String trim = line.substring(line.lastIndexOf("\"") + 1) .trim(); String status = trim.split(" ")[0]; return status; } private String parseURL(String line) { final int first = line.indexOf("\""); final int last = line.lastIndexOf("\""); String url = line.substring(first + 1, last); return url; } private String parseTime(String line) { final int first = line.indexOf("["); final int last = line.indexOf("+0800]"); String time = line.substring(first + 1, last).trim(); Date date = parseDateFormat(time); return dateformat1.format(date); } private String parseIP(String line) { String ip = line.split("- -")[0].trim(); return ip; } } }
#!/bin/bash echo -ne | cat <<eot ############################################################################# ########################## 普 度 众 生 ########################### _oo0oo_ 088888880 88" . "88 (| -_- |) 0\ = /0 ___/‘---‘\___ .‘ \\\\| |// ‘. / \\\\||| : |||// \\ /_ ||||| -:- |||||- \\ | | \\\\\\ - /// | | | \_| ‘‘\---/‘‘ |_/ | \ .-\__ ‘-‘ __/-. / ___‘. .‘ /--.--\ ‘. .‘___ ."" ‘< ‘.___\_<|>_/___.‘ >‘ "". | | : ‘- \‘.;‘\ _ /‘;.‘/ - ‘ : | | \ \ ‘_. \_ __\ /__ _/ .-‘ / / =====‘-.____‘.___ \_____/___.-‘____.-‘===== ‘=---=‘ ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 佛祖保佑 iii 永不出错 eot ##get yesterday date yesterday=`date -d ‘-1 day‘ +‘%Y_%m_%d‘` echo $yesterday ############ ## define ## ############ HADOOP_HOME=/opt/cdh-5.6.3/hadoop-2.5.0-cdh5.3.6 HIVE_HOME=/opt/cdh-5.6.3/hive-0.13.1-cdh5.3.6 SQOOP_HOME=/opt/cdh-5.6.3/sqoop-1.4.5-cdh5.2.6 HIVE_DATABASE=weblog HIVE_TABLE=weblog_clean HIVE_RSTABLE=weblog_result MYSQL_USERNAME=root MYSQL_PASSWORD=root EXPORT_DIR=/user/hive/warehouse/weblog.db/weblog_result NUM_MAPPERS=1 ######################### ## get logfile path ## ######################### LOG_PATH=/home/liuwl/opt/datas/weblog/access_$yesterday.log JAR_PATH=/home/liuwl/opt/datas/logclean.jar ENTRANCE=org.apache.hadoop.log.project.LogClean HDFS_INPUT_PATH=/weblog/source HDFS_OUTPUT_PATH=/weblog/clean SQOOP_JDBC=jdbc:mysql://hadoop09-linux-01.ibeifeng.com:3306/$HIVE_DATABASE ############################ ## upload logfile to hdfs ## ############################ echo "start to upload logfile" #$HADOOP_HOME/bin/hdfs dfs -rm -r $HDFS_INPUT_PATH > /dev/null 2>&1 HSFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_INPUT_PATH/$yesterday` if [ -z "$HSFiles" ]; then $HADOOP_HOME/bin/hdfs dfs -mkdir -p $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1 $HADOOP_HOME/bin/hdfs dfs -put $LOG_PATH $HDFS_INPUT_PATH/$yesterday > /dev/null 2>&1 echo "upload ok" else echo "exists" fi ########################### ## clean the source file ## ########################### echo "start to clean logfile" HCFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $HDFS_OUTPUT_PATH` if [ -z "$HCFiles" ]; then $HADOOP_HOME/bin/yarn jar $JAR_PATH $ENTRANCE $HDFS_INPUT_PATH/$yesterday $HDFS_OUTPUT_PATH/date=$yesterday echo "clean ok" fi ########################### ## create the hive table ## ########################### echo "start to create the hive table" $HIVE_HOME/bin/hive -e "create database if not exists $HIVE_DATABASE" > /dev/null 2>&1 $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create external table if not exists $HIVE_TABLE(ip string,day string,url string) partitioned by (date string) row format delimited fields terminated by ‘\t‘ location ‘$HDFS_OUTPUT_PATH‘ " echo "add patition to hive table" $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "alter table $HIVE_TABLE add partition (date=‘$yesterday‘)" ################################## ## create the hive reslut table ## ################################## echo "start to create the hive reslut table" $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "create table if not exists $HIVE_RSTABLE(pv string,register string,ip string,jumpprob double,two_jumpprob double ) row format delimited fields terminated by ‘\t‘;" ################# ## insert data ## ################# echo "start to insert data" HTFiles=`$HADOOP_HOME/bin/hdfs dfs -ls $EXPORT_DIR` if [ -z "$HTFiles" ]; then $HIVE_HOME/bin/hive --database $HIVE_DATABASE -e "insert overwrite table $HIVE_RSTABLE select log_pv.pv,log_register.register,log_ip.ip,log_jumpprob.jumpprob,log_two_jumpprob.two_jumpprob from (select count(1) pv from $HIVE_TABLE where date=‘$yesterday‘) log_pv,(select count(1) register from $HIVE_TABLE where date=‘$yesterday‘ and instr(url,‘member.php?mod=register‘) > 0) log_register,(select count(distinct ip) ip from $HIVE_TABLE where date=‘$yesterday‘) log_ip,(select ghip.singleip/aip.ips jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘ group by ip having ips <2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘) aip) log_jumpprob,(select ghip.singleip/aip.ips two_jumpprob from (select count(1) singleip from(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘ group by ip having ips >=2) gip) ghip,(select count(ip) ips from $HIVE_TABLE where date=‘$yesterday‘) aip) log_two_jumpprob" fi ################################### ## create the mysql reslut table ## ################################### mysql -u$MYSQL_USERNAME -p$MYSQL_PASSWORD -e " create database if not exists $HIVE_DATABASE DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; use $HIVE_DATABASE; create table if not exists $HIVE_RSTABLE(pv varchar(20) not null,register varchar(20) not null,ip varchar(20) not null,jumpprob double(6,2) not null,two_jumpprob double(6,2) not null) DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci; truncate table if exists $HIVE_RSTABLE; quit" ####################################### ## export hive result table to mysql ## ####################################### echo "start to export hive result table to mysql" $SQOOP_HOME/bin/sqoop export --connect $SQOOP_JDBC --username $MYSQL_USERNAME --password $MYSQL_PASSWORD --table $HIVE_RSTABLE --export-dir $EXPORT_DIR --num-mappers $NUM_MAPPERS --input-fields-terminated-by ‘\t‘ echo "shell finished"
时间: 2025-01-17 20:58:47