客户端用java api 远程操作HDFS以及远程提交MR任务(源码和异常处理)

两个类,一个HDFS文件操作类,一个是wordcount 词数统计类,都是从网上看来的。上代码:

package mapreduce;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
/**
 * file operation on HDFS
 * @author liuxingjiaofu
 *
 */
public class HDFS_File {
    //read the file from HDFS
    public void ReadFile(Configuration conf, String FileName){
      try{
            FileSystem hdfs = FileSystem.get(conf);
            FSDataInputStream dis = hdfs.open(new Path(FileName));
            IOUtils.copyBytes(dis, System.out, 4096, false);
             dis.close();
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //copy the file from HDFS to local
    public void GetFile(Configuration conf, String srcFile, String dstFile){
        try {
              FileSystem hdfs = FileSystem.get(conf);
              Path srcPath = new Path(srcFile);
              Path dstPath = new Path(dstFile);
              hdfs.copyToLocalFile(true,srcPath, dstPath);
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //copy the local file to HDFS
    public void PutFile(Configuration conf, String srcFile, String dstFile){
    try {
          FileSystem hdfs = FileSystem.get(conf);
          Path srcPath = new Path(srcFile);
          Path dstPath = new Path(dstFile);
          hdfs.copyFromLocalFile(srcPath, dstPath);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    //create the new file
    public FSDataOutputStream CreateFile(Configuration conf, String FileName){
    try {
          FileSystem hdfs = FileSystem.get(conf);
          Path path = new Path(FileName);
          FSDataOutputStream outputStream = hdfs.create(path);
          return outputStream;
        } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
        }
        return null;
    }
    //rename the file name
    public boolean ReNameFile(Configuration conf, String srcName, String dstName){
    try {
            Configuration config = new Configuration();
            FileSystem hdfs = FileSystem.get(config);
            Path fromPath = new Path(srcName);
            Path toPath = new Path(dstName);
            boolean isRenamed = hdfs.rename(fromPath, toPath);
            return isRenamed;
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }
    //delete the file
    // tyep = true, delete the directory
    // type = false, delete the file
    public boolean DelFile(Configuration conf, String FileName, boolean type){
        try {
              FileSystem hdfs = FileSystem.get(conf);
              Path path = new Path(FileName);
              boolean isDeleted = hdfs.delete(path, type);
              return isDeleted;
        }catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return false;
    }
    //Get HDFS file last modification time
    public long GetFileModTime(Configuration conf, String FileName){
    try{
              FileSystem hdfs = FileSystem.get(conf);
              Path path = new Path(FileName);
              FileStatus fileStatus = hdfs.getFileStatus(path);
              long modificationTime = fileStatus.getModificationTime();
              return modificationTime;
        }catch(IOException e){
            e.printStackTrace();
        }
        return 0;
    }
    //check if a file  exists in HDFS
    public boolean CheckFileExist(Configuration conf, String FileName){
    try{
              FileSystem hdfs = FileSystem.get(conf);
              Path path = new Path(FileName);
              boolean isExists = hdfs.exists(path);
              return isExists;
        }catch(IOException e){
            e.printStackTrace();
        }
        return false;
    }
    //Get the locations of a file in the HDFS cluster
    public List<String []> GetFileBolckHost(Configuration conf, String FileName){
        try{
              List<String []> list = new ArrayList<String []>();
              FileSystem hdfs = FileSystem.get(conf);
              Path path = new Path(FileName);
              FileStatus fileStatus = hdfs.getFileStatus(path);

              BlockLocation[] blkLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen());

              int blkCount = blkLocations.length;
              for (int i=0; i < blkCount; i++) {
                String[] hosts = blkLocations[i].getHosts();
                list.add(hosts);
               }
              return list;
            }catch(IOException e){
                e.printStackTrace();
            }
            return null;
    }
    //Get a list of all the nodes host names in the HDFS cluster
    // have no authorization to do this operation
    public String[] GetAllNodeName(Configuration conf){
        try{
              FileSystem fs = FileSystem.get(conf);
              DistributedFileSystem hdfs = (DistributedFileSystem) fs;
              DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
              String[] names = new String[dataNodeStats.length];
              for (int i = 0; i < dataNodeStats.length; i++) {
                  names[i] = dataNodeStats[i].getHostName();
              }
              return names;
        }catch(IOException e){
            System.out.println("error!!!!");
            e.printStackTrace();
        }
        return null;
    }
}

wordcount.java:

package mapreduce;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class mywordcount {
    public static  class wordcountMapper extends
        Mapper<LongWritable, Text, Text, IntWritable>{
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException{
            String line = value.toString();
            StringTokenizer itr = new StringTokenizer(line);
            while(itr.hasMoreElements()){
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }
    public static  class wordcountReducer extends
        Reducer<Text, IntWritable, Text, IntWritable>{
        public void reduce(Text key, Iterable<IntWritable>values, Context context)throws IOException, InterruptedException{
            int sum = 0;
            for (IntWritable str : values){
                sum += str.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }
    /**
     * 2 args, the file you want to count words from and the directory you want to save the result
     * @param args /home/hadooper/testmp/testtext /home/hadooper/testmp/testresult
     * @throws Exception
     */
    public static  void main(String args[])throws Exception{
        //首先定义两个临时文件夹,这里可以使用随机函数+文件名,这样重名的几率就很小。
        String dstFile = "temp_src";
        String srcFile = "temp_dst";
        //这里生成文件操作对象。
        HDFS_File file = new HDFS_File();

        Configuration conf = new Configuration();
        // must!!!  config the fs.default.name be the same to the value in core-site.xml
        conf.set("fs.default.name","hdfs://node1");
        conf.set("mapred.job.tracker","node1:54311");

        //从本地上传文件到HDFS,可以是文件也可以是目录
        file.PutFile(conf, args[0], dstFile);

        System.out.println("up ok");
        Job job = new Job(conf, "mywordcount");
        job.setJarByClass(mywordcount.class);

        job.setInputFormatClass(TextInputFormat.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(wordcountMapper.class);
        job.setReducerClass(wordcountReducer.class);
        job.setCombinerClass(wordcountReducer.class);
        //注意这里的输入输出都应该是在HDFS下的文件或目录
        FileInputFormat.setInputPaths(job, new Path(dstFile));
        FileOutputFormat.setOutputPath(job, new Path(srcFile));
//开始运行
        job.waitForCompletion(true);
        //从HDFS取回文件保存至本地
        file.GetFile(conf, srcFile, args[1]);
        System.out.println("down the result ok!");
//删除临时文件或目录
        file.DelFile(conf, dstFile, true);
        file.DelFile(conf, srcFile, true);
        System.out.println("delete file on hdfs ok!");
    }
}

期间,遇到几个错误:

1.HDFS版本问题--Call to node1/172.*.*.*:8020 failed on local exception: java.io.EOFException

main() {…… 
  Configuration conf = new Configuration();
  conf.set("fs.default.name","hdfs://node1");//与conf/core-site里的值对应,必须
  HDFS_File file = new HDFS_File();
  //print all the node name
  String[] host_name = file.GetAllNodeName(conf); 
……}
public String[] GetAllNodeName(Configuration conf){
  try{
    // Configuration config = new Configuration();
     FileSystem fs = FileSystem.get(conf);
     DistributedFileSystem hdfs = (DistributedFileSystem) fs;
     DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();
     String[] names = new String[dataNodeStats.length];
     for (int i = 0; i < dataNodeStats.length; i++) {
         names[i] = dataNodeStats[i].getHostName();
     }
     return names;
  }catch(IOException e){
   System.out.println("eeeeeeeeeeeeeeeeeeeerror!!!!");
   e.printStackTrace();
  }
  return null;
 }
异常:
eeeeeeeeeeeeeeeeeeeerror!!!!
java.io.IOException: Call to node1/172.10.39.250:8020 failed on local exception: java.io.EOFException
 at org.apache.hadoop.ipc.Client.wrapException(Client.java:775)
 at org.apache.hadoop.ipc.Client.call(Client.java:743)
 at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:220)
 at $Proxy0.getProtocolVersion(Unknown Source)
 at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:359)
 at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:112)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:213)
 at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:176)
 at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82)
 at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1378)
 at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66)
 at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1390)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:196)
 at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:95)
 at mapreduce.HDFS_File.GetAllNodeName(HDFS_File.java:151)
 at mapreduce.File_Operation.main(File_Operation.java:15)
Caused by: java.io.EOFException
 at java.io.DataInputStream.readInt(DataInputStream.java:392)
 at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:501)
 at org.apache.hadoop.ipc.Client$Connection.run(Client.java:446)
Exception in thread "main" java.lang.NullPointerException
 at mapreduce.File_Operation.main(File_Operation.java:16)
原因:版本问题,确保java中的jar包跟hadoop集群的jar包是相同版本的
2.HDFS
权限问题

org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security.AccessControlException: Permission denied: user=hadooper, access=WRITE, inode="/user":root:supergroup:drwxr-xr-x

解决方案之
(1 added this entry to conf/hdfs-site.xml
<property>
<name>dfs.permissions</name>
<value>false</value>
</property> 
(2.放开 要写入目录 hadoop 目录的权限 , 命令如下 :$ hadoop fs -chmod 777 /user/
我用的是第2种方案

3.HDFS 2011-12-20 17:00:32 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

在Hadoop的配置文件core-site.xml中可以设置是否使用本地库: 
<property>
  <name>hadoop.native.lib</name>
  <value>true</value>
  <description>Should native hadoop libraries, if present, be used.</description>
</property>

Hadoop默认的配置为启用本地库。 
另外,可以在环境变量中设置使用本地库的位置:
export JAVA_LIBRARY_PATH=/path/to/hadoop-native-libs
有的时候也会发现Hadoop自带的本地库无法使用,这种情况下就需要自己去编译本地库了。在$HADOOP_HOME目录下,使用如下命令即可:
ant compile-native
编译完成后,可以在$HADOOP_HOME/build/native目录下找到相应的文件,然后指定文件的路径或者移动编译好的文件到默认目录下即可
我试了下,那个是64位的,我电脑是32位的,没有源代码,编译不了,那只好一段段程序的试,找出哪段代码出了这个警告,我的是
  try {
     FileSystem hdfs = FileSystem.get(conf);
     Path srcPath = new Path(srcFile);
     Path dstPath = new Path(dstFile);
     hdfs.copyToLocalFile(true,srcPath, dstPath);//定位到此句
  }catch (IOException e) {
 到了此步,便只能如此了,为什么呢,java不是跨平台的吗

4.MR-jar包缺失

ClassNotFoundException: org.codehaus.jackson.map.JsonMappingException
NoClassDefFoundError: org/apache/commons/httpclient/HttpMethod

添加jar包到java工程中

jackson-core-asl-1.5.2.jar
jackson-mapper-asl-1.5.2.jar

commons-httpclient-3.0.1.jar

我是不习惯将所有Jar包都加到工程里,觉得这样很容易便加多了,浪费时空。
完成第一次mapreduce,不错!

5.远程的JOB挂掉了,居然还能运行成功,发现是mapred.job.tracker属性没设,默认在local下运行,其值在namenode的mapred-site.xml中看

conf.set("mapred.job.tracker","node1:54311");

配置完了,运行可以初始化,但是找不到mapper类:

信息: Task Id : attempt_201112221123_0010_m_000000_0, Status : FAILED
java.lang.RuntimeException: java.lang.ClassNotFoundException: mapreduce.mywordcount$wordcountMapper
 at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:996)
 at org.apache.hadoop.mapreduce.JobContext.getMapperClass(JobContext.java:212)
 at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:611)
 at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
 at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:396)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
 at org.apache.hadoop.mapred.Child.main(Child.java:264)

将程序打成jar包放到hadoop集群的jobtracker上可用,正常,结果也正确,但是在客户端运行却报上述错误,暂时还没解决。

总结

1.远程操作HDFS文件以及远程提交MR任务,必须配置的两项(其他暂时还没发现):

conf.set("fs.default.name","hdfs://node1");//与conf/core-site.xml里的值对应,必须 
conf.set("mapred.job.tracker","node1:54311");//mapred-site.xml

2.耐心分析问题,解决问题

时间: 2024-12-24 03:05:18

客户端用java api 远程操作HDFS以及远程提交MR任务(源码和异常处理)的相关文章

java画图程序_图片用字母画出来_源码发布_版本二

在上一个版本:java画图程序_图片用字母画出来_源码发布 基础上,增加了图片同比例缩放,使得大像素图片可以很好地显示画在Notepad++中. 项目结构: 运行效果1: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_result1.png 运行效果2: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_res

Java学习-025-类名或方法名应用之一 -- 调试源码

上文讲述了如何获取类名和方法名,敬请参阅: Java学习-024-获取当前类名或方法名二三文 . 通常在应用开发中,调试或查看是哪个文件中的方法调用了当前文件的此方法,因而在实际的应用中需要获取相应的包名.类名.方法名.行数,从而快速定位,及统计方法被调用的次数,生成类方法关系链. 相信爱钻研的小主们,通过上篇文章,已经懂得了,如何获取主调方法.从调方法.那我直接上码了,敬请各位小主参阅,若有不足之处,敬请各位大神指正,不胜感激! GetClassMethodName.java 源码内容如下所示

java画图程序_图片用字母画出来_源码发布

在之前写了一篇blog:java画图程序_图片用字母画出来 主要是把一些调试的截图发布出来,现在程序调试我认为可以了(当然,你如果还想调试的话,也可以下载源码自己调试). 就把源码发布出来. 项目结构: 资源文件: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356471/o_imagehandler_resource.png 运行效果: 原图:http://images.cnblogs.com/cnblogs_com/hongten/356

【小白的java成长系列】——String类的深入分析(基于源码)

接着前面面向对象来说吧~今天来说说String类..其实String类也包含很多面向对象的知识的~ 首先来问一个问题:我们在开发过程中,如果要使用一个类的话,就要创建对象,这句话没什么问题吧~在实际开发的时候确实是这样的,只有创建了对象才能真正的去使用一个普通的类,我们一般创建对象,几乎所有的类创建对象都是要通过new关键字来创建的~ 问题就来了..为什么我们的String可以直接写成String str = "abc";这样子呢? 当然String类也可以通过new来创建对象的...

Java开源生鲜电商平台-用户表的设计(源码可下载)

Java开源生鲜电商平台-用户表的设计(源码可下载) 说明:由于该系统属于B2B平台,不设计到B2C的架构. 角色分析:买家与卖家. 由于买家与卖家所填写的资料都不一样,需要建立两站表进行维护,比如:buyer,seller. 这样进行数据库的解耦,任何一方的变动都互不影响,但是我想集中式管理,以及一些业务个性化要求,我就增加了一个users表.表结构如下: 账号唯一键,所以做了唯一键索引, 账号的准确性采用手机短信验证. 根据类型区分买家与卖家,登陆的时候,采用的就是users这种表进行维护

Java开源生鲜电商平台-销售管理设计与架构(源码可下载)

Java开源生鲜电商平台-销售管理设计与架构(源码可下载) 说明:在Java开源生鲜电商平台中,销售人员我们称为跟餐饮店老板沟通与下载APP的一类地推人员.(所谓地推指的就是一个一个上门拜访.) 由于销售人员有以下几类特性: 1. 时间随意性,他们并不类似技术或者性质人员,需要天天呆在办公室,他们是需要去外面,时间上具有随意性. 2. 行动随意性 ,他们的行动过于随意,每天也不用来打卡,每天就是按照计划去拜访客户,然后推销生鲜电商APP,让客户来进行下单,那么行为很随意,站在公司的角度 我们是没

HDFS Java API 常用操作

package com.luogankun.hadoop.hdfs.api; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.

【最新】最流行的java后台框架 springmvc mybaits 集代码生成器 SSM SSH 项目源码

获取[下载地址]   QQ: 313596790   [免费支持更新]A 代码生成器(开发利器);全部是源码     增删改查的处理类,service层,mybatis的xml,SQL( mysql   和oracle)脚本,   jsp页面 都生成   就不用写搬砖的代码了,生成的放到项目里,可以直接运行B 阿里巴巴数据库连接池druid;  数据库连接池  阿里巴巴的 druid.Druid在监控.可扩展性.稳定性和性能方面都有明显的优势C 安全权限框架shiro ;  Shiro 是一个用

最流行的java后台框架 springmvc mybaits 集代码生成器 SSM SSH 项目源码

获取[下载地址]   QQ: 313596790   [免费支持更新]A 代码生成器(开发利器);全部是源码     增删改查的处理类,service层,mybatis的xml,SQL( mysql   和oracle)脚本,   jsp页面 都生成   就不用写搬砖的代码了,生成的放到项目里,可以直接运行B 阿里巴巴数据库连接池druid;  数据库连接池  阿里巴巴的 druid.Druid在监控.可扩展性.稳定性和性能方面都有明显的优势C 安全权限框架shiro ;  Shiro 是一个用