mapreduce 程序实例:hotlink统计程序及其oozie 调度

mapreduce程序 :

描述: 提取出二进制文件里的短链,并统计排序(由大到小排序)

思路

HotlinkCount:

  1 /*
  2  * Copyright (c) EEFUNG 2014 All Rights Reserved
  3  *
  4  * 统计每个连接出现的频率
  5  *
  6  */
  7 package com.eefung.hstore.mr.job.hotLink;
  8
  9 import cn.antvision.eagleattack.model.Status;
 10 import com.eefung.hstore.help.HstoreConstants;
 11 import com.eefung.hstore.help.AvroUtils;
 12 import com.eefung.hstore.mr.input.TimeFramePathFilter;
 13 import org.apache.hadoop.conf.Configuration;
 14 import org.apache.hadoop.conf.Configured;
 15 import org.apache.hadoop.fs.Path;
 16 import org.apache.hadoop.io.BytesWritable;
 17 import org.apache.hadoop.io.IntWritable;
 18 import org.apache.hadoop.io.LongWritable;
 19 import org.apache.hadoop.io.Text;
 20 import org.apache.hadoop.mapreduce.Job;
 21 import org.apache.hadoop.mapreduce.Mapper;
 22 import org.apache.hadoop.mapreduce.Reducer;
 23 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 24 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 25 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 26 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 27 import org.apache.hadoop.util.Tool;
 28 import org.apache.hadoop.util.ToolRunner;
 29 import org.slf4j.Logger;
 30 import org.slf4j.LoggerFactory;
 31 import java.io.IOException;
 32 import java.util.regex.Matcher;
 33 import java.util.regex.Pattern;
 34
 35 public class HotLinkCount extends Configured implements Tool {
 36
 37     private static final Logger logger = LoggerFactory
 38             .getLogger(HotLinkCount.class);
 39
 40     public static void main(String[] args) throws Exception {
 41         System.exit(ToolRunner.run(new Configuration(), new HotLinkCount(), args));
 42     }
 43
 44     @Override
 45     public int run(String[] args) throws Exception {
 46         if (args.length != 3) {
 47             logger.warn("wrong args!");
 48             return 1;
 49         }
 50         String input = args[0].trim();
 51         String output = args[1].trim();
 52         int reducerNum = Integer.parseInt(args[2].trim());
 53         Job job = Job.getInstance(getConf());
 54         job.setJobName("HotLink Count");
 55         job.setJarByClass(HotLinkCount.class);
 56         job.setMapperClass(InnerMapper.class);
 57         job.setReducerClass(InnerReducer.class);
 58
 59         job.setInputFormatClass(SequenceFileInputFormat.class);
 60         job.setOutputFormatClass(TextOutputFormat.class);
 61
 62         job.setMapOutputKeyClass(Text.class);
 63         job.setMapOutputValueClass(LongWritable.class);
 64         job.setOutputKeyClass(LongWritable.class);
 65         job.setOutputValueClass(Text.class);
 66
 67         job.setNumReduceTasks(reducerNum);
 68         FileInputFormat.setInputPathFilter(job, TimeFramePathFilter.class);///
 69
 70
 71         String[] sources = input.split(HstoreConstants.DELIMITER_COMMA);
 72         for (String source : sources) {
 73             FileInputFormat.addInputPath(job, new Path(source));
 74         }
 75
 76         FileOutputFormat.setOutputPath(job, new Path(output));
 77         return job.waitForCompletion(true) ? 0 : 1;
 78     }
 79
 80     public static class InnerMapper
 81             extends Mapper<LongWritable, BytesWritable, Text, LongWritable>
 82             implements HstoreConstants {
 83
 84         private final static LongWritable one = new LongWritable(1);
 85         private Text word = new Text();
 86
 87         Pattern pattern = Pattern
 88                 .compile("https?:\\/\\/[-A-Za-z0-9+&#%?=~_|!:,.;]+\\/[A-Za-z0-9+&#%=~_]{4,}");
 89
 90         protected void map(LongWritable key, BytesWritable value, Context context) throws IOException, InterruptedException {
 91             Status status = null;
 92             try {
 93                 status = AvroUtils.convertStatusAvroFromBytes(value.getBytes());   //
 94
 95
 96             } catch (Exception e) {
 97                 e.printStackTrace();
 98             }
 99             if (status == null) {
100                 return;
101             }
102             if (status.getContent() == null) {
103                 return;
104             }
105             String sb = status.getContent().toString();
106             Matcher matcher = pattern.matcher(sb);
107             while (matcher.find()) {
108                 word.set(matcher.group(0));
109                 context.write(word, one);
110             }
111         }
112     }
113
114     public static class InnerReducer extends
115             Reducer<Text, LongWritable ,LongWritable,Text> implements HstoreConstants {
116
117         @Override
118         protected void reduce(Text key, Iterable<LongWritable> values,
119                               Context context) throws IOException, InterruptedException {
120             Long sum = 0L;
121             for (LongWritable value : values) {
122                 sum += value.get();
123             }
124             if (key.toString().length() == 0) {
125                 return;
126             }
127             context.write( new LongWritable(sum),key);
128         }
129         //TimeFramePathFilter
130     }
131 }

hotlinkSort

  1 package com.eefung.hstore.mr.job.hotLink;
  2 import java.io.IOException;
  3 import java.util.Iterator;
  4
  5 import com.eefung.hstore.help.HstoreConstants;
  6 import org.apache.hadoop.conf.Configuration;
  7 import org.apache.hadoop.conf.Configured;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.*;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Reducer;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 17 import org.apache.hadoop.util.Tool;
 18 import org.apache.hadoop.util.ToolRunner;
 19 import org.slf4j.Logger;
 20 import org.slf4j.LoggerFactory;
 21
 22 public class HotLinkSort extends Configured implements Tool {
 23
 24     private static final Logger logger = LoggerFactory
 25             .getLogger(HotLinkSort.class);
 26
 27
 28
 29     public static void main(String[] args) throws Exception {
 30         System.exit(ToolRunner.run(new Configuration(), new HotLinkSort(), args));
 31     }
 32
 33     @Override
 34     public int run(String[] args) throws Exception {
 35         if (args.length != 3) {
 36             logger.warn("wrong args!");
 37             return 1;
 38         }
 39         String input = args[0].trim();
 40         String output = args[1].trim();
 41         int reducerNum = Integer.parseInt(args[2].trim());
 42         Job job = Job.getInstance(getConf());
 43         job.setJobName("HotLinkSort");
 44         job.setJarByClass(HotLinkSort.class);
 45
 46         job.setMapperClass(InnerMapper.class);
 47         job.setReducerClass(InnerReducer.class);
 48         job.setSortComparatorClass(InnerComparator.class);
 49
 50         job.setInputFormatClass(KeyValueTextInputFormat.class);
 51         job.setOutputFormatClass(TextOutputFormat.class);
 52
 53         job.setMapOutputKeyClass(LongWritable.class);
 54         job.setMapOutputValueClass(Text.class);
 55         job.setOutputKeyClass(Text.class);
 56         job.setOutputValueClass(LongWritable.class);
 57
 58         job.setNumReduceTasks(reducerNum);
 59         String[] path = input.split(",");
 60         for (int i = 0; i < path.length; i++) {
 61             FileInputFormat.addInputPath(job, new Path(path[i]));
 62         }
 63         FileOutputFormat.setOutputPath(job, new Path(output));
 64
 65         return job.waitForCompletion(true) ? 0 : 1;
 66     }
 67
 68     public static class InnerMapper
 69             extends Mapper<Text,Text, LongWritable,Text>
 70             implements HstoreConstants {
 71
 72
 73         LongWritable key_out;
 74         long key_long;
 75         String key_str;
 76
 77          protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, LongWritable,Text>.Context context) throws java.io.IOException, InterruptedException {
 78
 79              try {
 80                  key_str=key.toString();
 81                  logger.info("@@@@@@@@@@@@@@ value 是:"+key_str);
 82                  key_long=Long.parseLong(key_str.trim());
 83                  key_out=new LongWritable(key_long);
 84
 85                  context.write(key_out, value);
 86              } catch (NumberFormatException e) {
 87                  e.printStackTrace();
 88              } catch (IOException e) {
 89                  e.printStackTrace();
 90              } catch (InterruptedException e) {
 91                  e.printStackTrace();
 92              }
 93
 94          }
 95         }
 96
 97     public static class InnerReducer
 98             extends Reducer<LongWritable,Text,Text,LongWritable> implements
 99             HstoreConstants {
100         @Override
101         protected void setup(Context context) throws IOException, InterruptedException {
102             super.setup(context);
103            // logger.info("@@@@@@@@@@@@@@@@@@@@@@@@@");
104         }
105
106         public void reduce(LongWritable key, Iterable<Text> values,
107                            Context context
108         ) throws IOException, InterruptedException {
109
110             Iterator ite = values.iterator();
111             while(ite.hasNext())
112             {
113                 String record = ite.next().toString();
114                 Text temp=new Text();
115                 temp.set(record);
116                 context.write(temp, key);
117             }
118
119         }
120     }
121
122     public static class InnerComparator extends WritableComparator {
123
124         protected InnerComparator(){
125             super(LongWritable.class, true);
126         }
127
128         public int compare(WritableComparable a, WritableComparable b)
129         {
130             LongWritable aa=(LongWritable)a;
131             LongWritable bb=(LongWritable)b;
132             return -aa.compareTo(bb);
133         }
134     }
135 }

本来还有个短链转长链的过程:

  1 package com.eefung.hstore.mr.job.hotLink;
  2 import java.io.BufferedReader;
  3 import java.io.FileReader;
  4 import java.io.IOException;
  5 import java.text.SimpleDateFormat;
  6 import java.util.Date;
  7 import java.util.Iterator;
  8 import org.apache.http.HttpHost;
  9 import org.apache.http.HttpResponse;
 10 import org.apache.http.client.HttpClient;
 11 import org.apache.http.client.methods.HttpGet;
 12 import org.apache.http.client.methods.HttpUriRequest;
 13 import org.apache.http.impl.client.DefaultHttpClient;
 14 import org.apache.http.params.HttpConnectionParams;
 15 import org.apache.http.protocol.BasicHttpContext;
 16 import org.apache.http.protocol.ExecutionContext;
 17 import org.apache.http.protocol.HttpContext;
 18 import org.apache.http.util.EntityUtils;
 19 import com.eefung.hstore.help.HstoreConstants;
 20 import org.apache.hadoop.conf.Configuration;
 21 import org.apache.hadoop.conf.Configured;
 22 import org.apache.hadoop.fs.Path;
 23 import org.apache.hadoop.io.*;
 24 import org.apache.hadoop.mapreduce.Job;
 25 import org.apache.hadoop.mapreduce.Mapper;
 26 import org.apache.hadoop.mapreduce.Reducer;
 27 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 28 import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
 29 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 30 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 31 import org.apache.hadoop.util.Tool;
 32 import org.apache.hadoop.util.ToolRunner;
 33 import org.slf4j.Logger;
 34 import org.slf4j.LoggerFactory;
 35
 36 public class HotLinkRedirector extends Configured implements Tool {
 37
 38     private static Redirector redirector = new Redirector();
 39
 40     private static final Logger logger = LoggerFactory
 41             .getLogger(HotLinkRedirector.class);
 42
 43
 44
 45     public static void main(String[] args) throws Exception {
 46         System.exit(ToolRunner.run(new Configuration(), new HotLinkRedirector(), args));
 47
 48         /*redirector = new Redirector();
 49         BufferedReader reader = new BufferedReader(new FileReader("E:\\temp.txt"));
 50         String shortLine = null;
 51
 52         SimpleDateFormat df1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
 53         df1.format(new Date());// new Date()为获取当前系统时间
 54
 55
 56         while (true) {
 57             shortLine = reader.readLine();
 58             if (shortLine == null)
 59                 break;
 60
 61             String url = redirector.getRedirectInfo(shortLine.trim());
 62            // System.out.println("短链是:   " + shortLine + "   长链是:" + url);
 63
 64         }
 65         SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//设置日期格式
 66         df2.format(new Date());
 67         System.out.println("开始 "+df1);
 68         System.out.println("结束 "+df2);*/
 69
 70     }
 71
 72     @Override
 73     public int run(String[] args) throws Exception {
 74         if (args.length != 3) {
 75             logger.warn("wrong args!");
 76             return 1;
 77         }
 78         int reducerNum = Integer.parseInt(args[2].trim());
 79         Job job = Job.getInstance(getConf());
 80         job.setJobName("HotLink Redirector");
 81
 82         job.setJarByClass(HotLinkRedirector.class);
 83         job.setMapperClass(InnerMapper.class);
 84
 85         job.setInputFormatClass(KeyValueTextInputFormat.class);
 86         job.setOutputFormatClass(TextOutputFormat.class);
 87
 88         job.setMapOutputKeyClass(Text.class);
 89         job.setMapOutputValueClass(LongWritable.class);
 90         job.setOutputKeyClass(Text.class);
 91         job.setOutputValueClass(LongWritable.class);
 92
 93         job.setNumReduceTasks(reducerNum);
 94
 95         FileInputFormat.addInputPath(job, new Path(args[0]));
 96         FileOutputFormat.setOutputPath(job, new Path(args[1]));
 97
 98         return job.waitForCompletion(true) ? 0 : 1;
 99     }
100
101     public static class InnerMapper
102             extends Mapper<Text,Text,Text,LongWritable>
103             implements HstoreConstants {
104         Text textOut =  new Text();
105         LongWritable valueOut;
106
107
108         @Override
109         protected void cleanup(Context context) throws IOException, InterruptedException {
110             redirector.release();
111
112         }
113
114         protected void map(Text key, Text value, org.apache.hadoop.mapreduce.Mapper<Text, Text, Text, LongWritable>.Context context) throws java.io.IOException, InterruptedException {
115             String shortLink=key.toString();
116             String longLink=redirector.getRedirectInfo(shortLink);
117             if(longLink==null)
118                  longLink=shortLink;
119             textOut.set(longLink);
120
121             String temp=value.toString();
122             long a=Long.parseLong(temp);
123             valueOut=new LongWritable(a);
124             context.write(textOut,valueOut);
125         }
126     }
127
128
129
130     private static class Redirector {
131         private HttpClient httpClient; // 发送HTTP请求和从由URI识别的资源接收HTTP响应。
132
133         public Redirector() {
134             httpClient = new DefaultHttpClient();
135             httpClient.getParams().setParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 1000);
136             httpClient.getParams().setParameter(HttpConnectionParams.SO_TIMEOUT, 1000);
137
138         }
139
140         public void release() {
141             httpClient.getConnectionManager().shutdown();
142         }
143
144         public String getRedirectInfo(String shortLink) {
145             HttpResponse response=null;
146             try {
147                 HttpGet httpGet = new HttpGet(shortLink); // 用于发送 HTTP GET 请求及接收 HTTP
148                 HttpContext httpContext  = new BasicHttpContext(); // 封装了一个独立的HTTP请求的特定的信息。
149                 response = httpClient.execute(httpGet, httpContext);
150
151
152                 HttpHost targetHost = (HttpHost) httpContext
153                         .getAttribute(ExecutionContext.HTTP_TARGET_HOST);
154
155
156                 HttpUriRequest realRequest = (HttpUriRequest) httpContext
157                         .getAttribute(ExecutionContext.HTTP_REQUEST);
158
159                 return targetHost.toString()
160                         + realRequest.getURI().toString();
161
162             } catch (Throwable e) {
163                 return null;
164             }finally {
165                 try {
166                     if (response != null) {
167                         EntityUtils.consume(response.getEntity());
168                     }
169                 } catch (IOException e) {
170                     e.printStackTrace();
171                 }
172             }
173         }
174     }
175 }

但是这个转换的过程太慢了,数据量大的时候 放在哪个阶段在哪个阶段就kill掉了。

oozie 调度:

学习的时候主要是看的 http://shiyanjun.cn/archives/tag/oozie 这个博客

描述:分别跑sina,twitter,sohu,tencent,netease几个平台是数据,(先执行hotlinkCount 其结果用于hotlinkSort的输入)。

编写代码:在工程里的workflow目录下建hotlink目录,其下编写以下几个文件:

1. coordinator.xml:

1 <?xml version="1.0" encoding="UTF-8"?>
2 <coordinator-app name="hotlink" frequency="${coord:days(1)}"
3                  start="${start}" end="${end}" timezone="UTC" xmlns="uri:oozie:coordinator:0.1">
4     <action>
5         <workflow>
6             <app-path>${root}/workflow.xml</app-path>
7         </workflow>
8     </action>
9 </coordinator-app>

2. job.properties:

 1 nn=hdfs://hdt01:8020
 2 rm=hdt01:8050
 3
 4 root=/eefung/yangzf/oozie/jobs/hotlink
 5 start=2015-01-21T13:00Z
 6 end=2020-01-01T00:00Z
 7
 8 dest=/hotlink
 9
10 #oozie.wf.application.path=${nn}${root}
11
12 oozie.coord.application.path=${nn}${root}
13 oozie.libpath=${nn}${root}/lib
14
15
16 cbaseColumnFamily=content
17
18 beginDays=29
19 endDays=26

3. workflow.xml:

 1 <?xml version="1.0" encoding="utf-8"?>
 2 <workflow-app xmlns=‘uri:oozie:workflow:0.4‘ name=‘hotlink‘>
 3
 4     <start to="sina"/>
 5
 6     <action name="sina">
 7         <sub-workflow>
 8             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
 9             <propagate-configuration/>
10             <configuration>
11                 <property>
12                     <name>platform</name>
13                     <value>sina</value>
14                 </property>
15
16             </configuration>
17         </sub-workflow>
18         <ok to="twitter"/>
19         <error to="twitter"/>
20     </action>
21
22     <action name="twitter">
23         <sub-workflow>
24             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
25             <propagate-configuration/>
26             <configuration>
27                 <property>
28                     <name>platform</name>
29                     <value>twitter</value>
30                 </property>
31
32             </configuration>
33         </sub-workflow>
34         <ok to="tencent"/>
35         <error to="tencent"/>
36     </action>
37
38     <action name="tencent">
39         <sub-workflow>
40             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
41             <propagate-configuration/>
42             <configuration>
43                 <property>
44                     <name>platform</name>
45                     <value>tencent</value>
46                 </property>
47
48             </configuration>
49         </sub-workflow>
50         <ok to="netease"/>
51         <error to="netease"/>
52     </action>
53
54     <action name="netease">
55         <sub-workflow>
56             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
57             <propagate-configuration/>
58             <configuration>
59                 <property>
60                     <name>platform</name>
61                     <value>netease</value>
62                 </property>
63
64             </configuration>
65         </sub-workflow>
66         <ok to="sohu"/>
67         <error to="sohu"/>
68     </action>
69
70     <action name="sohu">
71         <sub-workflow>
72             <app-path>${root}/workflow-hotlink-mr.xml</app-path>
73             <propagate-configuration/>
74             <configuration>
75                 <property>
76                     <name>platform</name>
77                     <value>sohu</value>
78                 </property>
79
80             </configuration>
81         </sub-workflow>
82         <ok to="end"/>
83         <error to="fail"/>
84     </action>
85
86     <kill name="fail">
87         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
88     </kill>
89
90     <end name="end"/>
91
92 </workflow-app>

4. workflow-hotlink-mr:

  1 <?xml version="1.0" encoding="utf-8"?>
  2 <workflow-app xmlns=‘uri:oozie:workflow:0.4‘ name=‘hotlink‘>
  3
  4     <start to="count" />
  5
  6     <action name="count">
  7         <map-reduce>
  8             <job-tracker>${rm}</job-tracker>
  9             <name-node>${nn}</name-node>
 10             <prepare>
 11                 <delete path="${nn}${root}/output/${platform}/count"/>
 12             </prepare>
 13             <configuration>
 14                 <property>
 15                     <name>mapred.mapper.new-api</name>
 16                     <value>true</value>
 17                 </property>
 18                 <property>
 19                     <name>mapred.reducer.new-api</name>
 20                     <value>true</value>
 21                 </property>
 22                 <property>
 23                     <name>mapreduce.map.class</name>
 24                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerMapper</value>
 25                 </property>
 26                 <property>
 27                     <name>mapreduce.reduce.class</name>
 28                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkCount$InnerReducer</value>
 29                 </property>
 30                 <property>
 31                     <name>mapreduce.inputformat.class</name>
 32                     <value>org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat</value>
 33                 </property>
 34                 <property>
 35                     <name>mapreduce.outputformat.class</name>
 36                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
 37                 </property>
 38                 <property>
 39                     <name>mapred.mapoutput.key.class</name>
 40                     <value>org.apache.hadoop.io.Text</value>
 41                 </property>
 42                 <property>
 43                     <name>mapred.mapoutput.value.class</name>
 44                     <value>org.apache.hadoop.io.LongWritable</value>
 45                 </property>
 46                 <property>
 47                     <name>mapred.output.key.class</name>
 48                     <value>org.apache.hadoop.io.LongWritable</value>
 49                 </property>
 50                 <property>
 51                     <name>mapred.output.value.class</name>
 52                     <value>org.apache.hadoop.io.Text</value>
 53                 </property>
 54                 <property>
 55                     <name>mapreduce.job.reduces</name>
 56                     <value>12</value>
 57                 </property>
 58
 59                 <property>
 60                     <name>mapred.input.pathFilter.class</name>
 61                     <value>com.eefung.hstore.mr.input.TimeFramePathFilter</value>
 62                 </property>
 63
 64                 <property>
 65                     <name>source.time.frame.begin</name>
 66                     <value>${beginDays}</value>
 67                 </property>
 68                 <property>
 69                     <name>source.time.frame.end</name>
 70                     <value>${endDays}</value>
 71                 </property>
 72
 73                 <property>
 74                     <name>mapred.input.dir</name>
 75                     <value>${root}/input/${platform}/*</value>
 76                 </property>
 77
 78                 <property>
 79                     <name>mapred.output.dir</name>
 80                     <value>${root}/output/${platform}/count</value>
 81                 </property>
 82             </configuration>
 83         </map-reduce>
 84         <ok to="sort"/>
 85         <error to="fail"/>
 86     </action>
 87
 88     <action name="sort">
 89         <map-reduce>
 90             <job-tracker>${rm}</job-tracker>
 91             <name-node>${nn}</name-node>
 92             <prepare>
 93                 <delete path="${nn}${root}/output/${platform}/sort"/>
 94             </prepare>
 95             <configuration>
 96                 <property>
 97                     <name>mapred.mapper.new-api</name>
 98                     <value>true</value>
 99                 </property>
100                 <property>
101                     <name>mapred.reducer.new-api</name>
102                     <value>true</value>
103                 </property>
104
105                 <property>
106                     <name>mapred.input.dir</name>
107                     <value>${root}/output/${platform}/count/*</value>
108                 </property>
109                 <property>
110                     <name>mapred.output.dir</name>
111                     <value>${root}/output/${platform}/sort</value>
112                 </property>
113
114                 <property>
115                     <name>mapreduce.map.class</name>
116                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerMapper</value>
117                 </property>
118                 <property>
119                     <name>mapreduce.reduce.class</name>
120                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerReducer</value>
121                 </property>
122
123                 <property>
124                     <name>mapreduce.compare.class</name>
125                     <value>com.eefung.hstore.mr.job.hotLink.HotLinkSort$InnerComparator</value>
126                 </property>
127                 <property>
128                     <name>mapreduce.inputformat.class</name>
129                     <value>org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat</value>
130                 </property>
131                 <property>
132                     <name>mapreduce.outputformat.class</name>
133                     <value>org.apache.hadoop.mapreduce.lib.output.TextOutputFormat</value>
134                 </property>
135                 <property>
136                     <name>mapred.mapoutput.key.class</name>
137                     <value>org.apache.hadoop.io.LongWritable</value>
138                 </property>
139                 <property>
140                     <name>mapred.mapoutput.value.class</name>
141                     <value>org.apache.hadoop.io.Text</value>
142                 </property>
143                 <property>
144                     <name>mapred.output.key.class</name>
145                     <value>org.apache.hadoop.io.Text</value>
146                 </property>
147                 <property>
148                     <name>mapred.output.value.class</name>
149                     <value>org.apache.hadoop.io.LongWritable</value>
150                 </property>
151                 <property>
152                     <name>mapreduce.job.reduces</name>
153                     <value>1</value>
154                 </property>
155
156
157             </configuration>
158
159         </map-reduce>
160         <ok to="end"/>
161         <error to="fail"/>
162     </action>
163
164
165
166     <kill name="fail">
167         <message>Failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
168     </kill>
169     <end name="end"/>
170 </workflow-app>

运行:


1.

时间: 2024-08-28 10:54:00

mapreduce 程序实例:hotlink统计程序及其oozie 调度的相关文章

运行Hadoop自带的wordcount单词统计程序

0.前言 前面一篇<Hadoop初体验:快速搭建Hadoop伪分布式环境>搭建了一个Hadoop的环境,现在就使用Hadoop自带的wordcount程序来做单词统计的案例. 1.使用示例程序实现单词统计 (1)wordcount程序 wordcount程序在hadoop的share目录下,如下: [[email protected] mapreduce]# pwd /usr/local/hadoop/share/hadoop/mapreduce [[email protected] mapr

Python学习笔记四:列表,购物车程序实例

列表 切片 中括号,逗号分隔,可以一次取出多个元素,起始位置包括,结束位置不包括(顾头不顾尾) 如果取最后一个,而且不知道列表长度,可以使用负数(-1是最后一个,以此类推) 如果取最后几个,记住从左往右数着取值,顾头不顾尾,所以如果取最后两个应该是[-2:] 从前取,如果是从0 开始,也可以省略 追加元素 a_list.append(value) 插入元素到任意位置 a_list.insert(index,value) 修改元素 a_list[index]=value 删除元素 a_list.r

Hadoop基础学习(一)分析、编写并运行WordCount词频统计程序

前面已经在我的Ubuntu单机上面搭建好了伪分布模式的HBase环境,其中包括了Hadoop的运行环境. 详见我的这篇博文:http://blog.csdn.net/jiyiqinlovexx/article/details/29208703 我的目的主要是学习HBase,下一步打算学习的是将HBase作为Hadoop作业的输入和输出. 但是好像以前在南大上学时学习的Hadoop都忘记得差不多了,所以找到以前上课做的几个实验:wordCount,PageRank以及InversedIndex.

Java实现的一个词频统计程序

import java.util.HashMap; import java.util.Iterator; public class WordCount { public static void main(String[] args) { String[] text=new String[]{"the weather is good ","today is good","today has good weather","good weat

MapReduce计算框架高级特性程序运行并发度

2019/2/19 星期二 MapReduce计算框架高级特性程序运行并发度 所谓的并发度,就是在MapReduce执行程序的过程中有多少个map task进程和reduce task进程,来一起完成程序的处理. MapReduce就是把业务处理逻辑变成分布式来处理. reduce task 数量的决定机制 //全局的聚合操作 由业务场景决定1.业务逻辑需要2.数据量大小设置方法:job.setNumReduceTasks(5) //reduce task的数量不能够任意的指定,比如:我们在一大

微信小程序实例教程(一)

序言 开始开发应用号之前,先看看官方公布的「小程序」教程吧!(以下内容来自微信官方公布的「小程序」开发指南) 本文档将带你一步步创建完成一个微信小程序,并可以在手机上体验该小程序的实际效果.这个小程序的首页将会显示欢迎语以及当前用户的微信头像,点击头像,可以在新开的页面中查看当前小程序的启动日志. 1. 获取微信小程序的 AppID 首先,我们需要拥有一个帐号,如果你能看到该文档,我们应当已经邀请并为你创建好一个帐号.注意不可直接使用服务号或订阅号的 AppID. 利用提供的帐号,登录https

使用 Bluemix? Live Sync 快速更新 Bluemix 上运行的应用程序实例

如果您要构建 Node.js 应用程序,那么可以使用 IBM® Bluemix® Live Sync 快速更新 Bluemix 上的应用程序实例,并像在桌面上进行操作一样进行开发,而无需重新部署.执行更改后,您可以立即在运行中的 Bluemix 应用程序中看到该更改.Bluemix Live Sync 可从命令行以及在 Web IDE 中运行.您可以使用 Bluemix Live Sync 来调试以 Node.js 编写的应用程序. Bluemix Live Sync 由三个功能部件组成 桌面同

linux 统计 程序 运行时间

测试 代码运行时间 linux 中的 <sys/time.h> 中 有个函数可以获取当前时间,精确到 微秒 ---->  gettimeofday() 1 #include <sys/time.h>       // int gettimeofday(struct timeval *tv, struct timezone *tz); 2 /********************************************* 3 * struct timeval 4 *

Linux Epoll介绍和程序实例

1. Epoll是何方神圣? Epoll但是当前在Linux下开发大规模并发网络程序的热门人选,Epoll 在Linux2.6内核中正式引入,和select类似,事实上都I/O多路复用技术而已,并没有什么神奇的. 事实上在Linux下设计并发网络程序,向来不缺少方法,比方典型的Apache模型(Process Per Connection,简称PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那为何还要再引入Epoll这个东东呢?那还是有得说说