1,代码
package mr; import java.io.IOException; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * 使用ArrayWritable */ public class TrafficApp4 { public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job job = Job.getInstance(conf , TrafficApp4.class.getSimpleName()); job.setJarByClass(TrafficApp4.class); FileInputFormat.setInputPaths(job, args[0]); job.setMapperClass(TrafficMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongArrayWritable.class); job.setReducerClass(TrafficReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongArrayWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } public static class TrafficMapper extends Mapper<LongWritable, Text, Text, LongArrayWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongArrayWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] splited = line.split("\t"); String phonenumber = splited[1]; String upPackNum = splited[6]; String downPackNum = splited[7]; String upPayLoad = splited[8]; String downPayLoad = splited[9]; Text k2 = new Text(phonenumber); LongArrayWritable v2 = new LongArrayWritable(upPackNum, downPackNum, upPayLoad, downPayLoad); context.write(k2, v2); } } public static class TrafficReducer extends Reducer<Text, LongArrayWritable, Text, LongArrayWritable>{ @Override protected void reduce(Text k2, Iterable<LongArrayWritable> v2s, Reducer<Text, LongArrayWritable, Text, LongArrayWritable>.Context context) throws IOException, InterruptedException { long upPackNum = 0L; long downPackNum = 0L; long upPayLoad = 0L; long downPayLoad = 0L; for (LongArrayWritable v2 : v2s) { Writable[] values = v2.get(); upPackNum += ((LongWritable)values[0]).get(); downPackNum += ((LongWritable)values[1]).get(); upPayLoad += ((LongWritable)values[2]).get(); downPayLoad += ((LongWritable)values[3]).get(); } LongArrayWritable v3 = new LongArrayWritable(upPackNum, downPackNum, upPayLoad, downPayLoad); context.write(k2, v3); } } public static class LongArrayWritable extends ArrayWritable{ /** * 在调用的时候,首先调用该方法,然后调用set(Writable[]) */ public LongArrayWritable() { super(LongWritable.class); } /** * 直接调用该方法即可 * @param values */ public LongArrayWritable(LongWritable[] values) { super(LongWritable.class, values); } /** * 直接调用该方法即可 * @param upPackNum * @param downPackNum * @param upPayLoad * @param downPayLoad */ public LongArrayWritable(Long upPackNum, Long downPackNum, Long upPayLoad, Long downPayLoad) { super(LongWritable.class); LongWritable[] values = new LongWritable[4]; values[0] = new LongWritable(upPackNum); values[1] = new LongWritable(downPackNum); values[2] = new LongWritable(upPayLoad); values[3] = new LongWritable(downPayLoad); super.set(values); } /** * 直接调用该方法即可 * @param upPackNum * @param downPackNum * @param upPayLoad * @param downPayLoad */ public LongArrayWritable(String upPackNum, String downPackNum, String upPayLoad, String downPayLoad) { super(LongWritable.class); LongWritable[] values = new LongWritable[4]; values[0] = new LongWritable(Long.parseLong(upPackNum)); values[1] = new LongWritable(Long.parseLong(downPackNum)); values[2] = new LongWritable(Long.parseLong(upPayLoad)); values[3] = new LongWritable(Long.parseLong(downPayLoad)); super.set(values); } @Override public String toString() { String[] array = super.toStrings(); return StringUtils.join(array, "\t"); } } }
2,ArrayWritable的API
org.apache.hadoop.io
Class ArrayWritable
java.lang.Object
org.apache.hadoop.io.ArrayWritable
- 已实现的接口:
- Writable
public class ArrayWritableextends Objectimplements Writable
A Writable for arrays containing instances of a class. The elements of this writable must all be instances of the same class. If this writable will be the input for a Reducer, you will need to create a subclass that sets the value to be of the proper type. For example: public class IntArrayWritable extends ArrayWritable { public IntArrayWritable() { super(IntWritable.class); } }
构造方法摘要 | |
---|---|
ArrayWritable(Class<? extends Writable> valueClass) |
|
ArrayWritable(Class<? extends Writable> valueClass, Writable[] values) |
|
ArrayWritable(String[] strings) |
方法摘要 | |
---|---|
Writable[] |
get() |
Class |
getValueClass() |
void |
readFields(DataInput in) Deserialize the fields of this object from in . |
void |
set(Writable[] values) |
Object |
toArray() |
String[] |
toStrings() |
void |
write(DataOutput out) Serialize the fields of this object to out . |
Methods inherited from class java.lang.Object |
---|
clone, equals, finalize, getClass, hashCode, notify, notifyAll, toString, wait, wait, wait |
构造方法详细信息 |
---|
ArrayWritable
public ArrayWritable(Class<? extends Writable> valueClass)
ArrayWritable
public ArrayWritable(Class<? extends Writable> valueClass, Writable[] values)
ArrayWritable
public ArrayWritable(String[] strings)
方法详细信息 |
---|
getValueClass
public Class getValueClass()
toStrings
public String[] toStrings()
toArray
public Object toArray()
set
public void set(Writable[] values)
get
public Writable[] get()
readFields
public void readFields(DataInput in) throws IOException
- Description copied from interface:
Writable
- Deserialize the fields of this object from
in
.For efficiency, implementations should attempt to re-use storage in the existing object where possible.
-
- Specified by:
readFields
in interfaceWritable
-
- Parameters:
in
-DataInput
to deseriablize this object from.- Throws:
IOException
write
public void write(DataOutput out) throws IOException
- Description copied from interface:
Writable
- Serialize the fields of this object to
out
. -
- Specified by:
write
in interfaceWritable
-
- Parameters:
out
-DataOuput
to serialize this object into.- Throws:
IOException
时间: 2024-10-05 20:34:30