一、数据准备
1、每天生成随机一个文本,每小时向文本中追加2次数据,每次10万条
随机数据生成:
2,32 * * * * bash /mnt/jediael/irms/signalGenerator/signalGenerator.sh >> /home/jediael/sg.log 2>&1
类:SignalGenerator
2、每天将前一天生成的数据文本导入HDFS
32 0 * * * bash /mnt/jediael/irms/signalGenerator/copySignalToHdfs.sh >>/home/jediael/sg.log 2>&1
二、数据分析
1、每天执行一次数据分析,将结果输出到hdfs文本中。
42 0 * * * bash /mnt/jediael/irms/signalparser/signalParser.sh >>/home/jediael/sg.log 2>&1
类:SignalParser
程序文件:
/mnt/jediael/irms/signalGenerator/signalGenerator.sh
#!/bin/bash export JAVA_HOME=/usr/java/jdk1.7.0_51 export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=$CLASSPATH:/mnt/jediael/irms/signalGenerator/ java SignalGenerator
/mnt/jediael/irms/signalGenerator/copySignalToHdfs.sh
#!/bin/bash export JAVA_HOME=/usr/java/jdk1.7.0_51 PATH=/mnt/jediael/hadoop-1.2.1/bin/:/mnt/jediael/hbase-0.94.26/bin:/mnt/jediael/tomcat-7.0.54/bin:$JAVA_HOME/bin:$PATH hadoop fs -copyFromLocal /mnt/jediael/irms/signalGenerator/`date -d "-1 day" +%Y%m%d`.txt /irms/signal >> sg.log
/mnt/jediael/irms/signalparser/signalParser.sh
#!/bin/bash export JAVA_HOME=/usr/java/jdk1.7.0_51 PATH=/mnt/jediael/hadoop-1.2.1/bin/:/mnt/jediael/hbase-0.94.26/bin:/mnt/jediael/tomcat-7.0.54/bin:$JAVA_HOME/bin:$PATH hadoop jar signalgenerator.jar /irms/signal/`date -d "-1 day" +%Y%m%d`.txt /irms/result/`date -d "-1 day" +%Y%m%d`
SignalGenerator.java
package com.gmcc.irms.util; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.text.SimpleDateFormat; import java.util.Random; //此类用于模拟生成信令数据,每次生成100000条 public class SignalGenerator { // 业务类型,如呼入、呼出、发短信、收短信、上网、WLAN等 private int activeType = 0; private String getNextSign() { String sign = ""; Random rand = new Random(); activeType = rand.nextInt(6); // 主叫号码 String callNum = "1390222" + rand.nextInt(9) + rand.nextInt(9) + rand.nextInt(9) + rand.nextInt(9); // 被叫号码 String beCallNum = "1390222" + rand.nextInt(9) + rand.nextInt(9) + rand.nextInt(9) + rand.nextInt(9); // 时长、或者是流量 String callDuration = ""; for (int i = 0; i < 16; i++) { callDuration += rand.nextInt(9); } sign = activeType + callNum + beCallNum + callDuration; for (int i = 0; i < 800; i++) { sign += rand.nextInt(9); } return sign; } public static void main(String[] args) throws IOException { SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); long time = System.currentTimeMillis(); String fileName = sdf.format(time) + ".txt"; OutputStream os = new FileOutputStream(fileName, true); SignalGenerator sg = new SignalGenerator(); String newline = System.getProperty("line.separator"); for (int i = 0; i < 100000; i++) { os.write((sg.getNextSign() + newline).getBytes()); } os.flush(); os.close(); } }
SignalParser.java
package com.gmcc.irms.signal; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SignalParser { public static void main(String[] args) throws Exception { Job job = new Job(); job.setJarByClass(SignalParser.class); job.setJobName("signal parser"); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.setMapperClass(SignalParserMapper.class); job.setReducerClass(SignalParserReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } class SignalParserMapper extends Mapper<LongWritable, Text, Text, Text> { @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 业务类型 String activeType = line.substring(0, 1); // 主叫电话号码 String customer = line.substring(1, 12); // 通话时长、web时长、wlan时长 int duration = Integer.parseInt(line.substring(23, 30)); context.write(new Text(customer),new Text(activeType + "," + duration)); } } class SignalParserReducer extends Reducer<Text, Text, Text, Text> { @Override public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { //总通话时长 int sumCallDuration = 0; //通话次数 int callTimes = 0; //发送短信次数 int smsTimes = 0; //上网总时长 int sumWebDuration = 0; //上网次数 int webTimes = 0; //WLAN总时长 int sumWlanDuration = 0; //WLAN次数 int wlanTimes = 0; String[] valueArray = null; int activeType = -1; int duration = -1; for (Text value:values){ valueArray = value.toString().split(","); System.out.println(valueArray[0]+" a "+valueArray[1]); activeType = Integer.parseInt(valueArray[0]); duration = Integer.parseInt(valueArray[1]); if(activeType == 0){ //呼出 sumCallDuration += duration; callTimes++; }else if(activeType == 2){ //发sms smsTimes++; }else if(activeType == 4){ //上网 sumWebDuration += duration; webTimes ++; }else if(activeType == 5){ //WLAN sumWlanDuration += duration; wlanTimes ++; }else{ } } context.write(key, new Text(sumCallDuration + "\t" + callTimes + "\t" +smsTimes+"\t" + sumWebDuration+"\t" + webTimes+"\t" + sumWebDuration+"\t" + webTimes)); } }
时间: 2024-11-11 16:47:02