用java api读取HDFS文件

import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import bean.TableStatistic;

@Controller
@RequestMapping("/dfview")
public class DataFrameViewController extends BaseController {

    private ConcurrentMap<String, UserGroupInformation> cache = new ConcurrentHashMap<String, UserGroupInformation>();
    private ConcurrentMap<String, FileSystem> fileSystemCache = new ConcurrentHashMap<String, FileSystem>();
    private Configuration hadoopConf = new Configuration();
    private static final String HDFS_JSON_NAME = "jsonObj";

    @RequestMapping(value = "/getDFviewOfColumn", method = { RequestMethod.GET })
    @ResponseBody
    public TableStatistic getDFviewOfTable(String tableName)
            throws Exception {
        String user = "bi";
        String dirpath = "/user/cbt/datax/temp_transfer/zzzdes";
        Path homePath = new Path(dirpath);
        FileSystem fs = this.createFileSystem(user);
        FileStatus[] stats = fs.listStatus(homePath);
        StringBuffer txtContent = new StringBuffer();
        for (int i = 0; i < stats.length; ++i) {
            if (stats[i].isFile()) {
                FileStatus file = stats[i];
                if( HDFS_JSON_NAME.equalsIgnoreCase(file.getPath().getName())){
                    InputStream in = fs.open(file.getPath());
                    byte[] b = new byte[1];
                    while (in.read(b) != -1)
                    {
                    // 字符串拼接
                    txtContent.append(new String(b));
                    }
                    in.close();
                    break;
                }
            }
        }
        TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class);
        return ts;
    }

    public static void main(String[] args) throws Exception {
        DataFrameViewController aaa = new DataFrameViewController();
        FileSystem fs = aaa.createFileSystem("bi");
        Path homePath = new Path("/user/cbt/datax/temp_transfer/zzzdes");
        System.out.println("***********************************");
        FileStatus[] stats = fs.listStatus(homePath);
        for (int i = 0; i < stats.length; ++i) {
            if (stats[i].isFile()) {
                FileStatus file = stats[i];
                StringBuffer txtContent = new StringBuffer();
                if( "jsonObj".equalsIgnoreCase(file.getPath().getName())){
                    InputStream in = fs.open(file.getPath());
                    byte[] b = new byte[1];
                    while (in.read(b) != -1)
                    {
                    // 字符串拼接
                    txtContent.append(new String(b));
                    }
//                    IOUtils.copyBytes(fs.open(file.getPath()), System.out, 4096,false);
                    in.close();
//                    fs.close();
                }
                System.out.print(txtContent.toString());
                System.out
                        .println("************************************************");
                JSONObject jb = JSON.parseObject(txtContent.toString());
                System.out.println("********!!!!! : "  + jb.get("colUnique"));
                TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class);
                System.out.println("********!!!!! : "  + ts.getColUnique().toString());

            } else if (stats[i].isDirectory()) {
                System.out.println(stats[i].getPath().toString());
            } else if (stats[i].isSymlink()) {
                System.out.println("&&&&&&&&" + stats[i].getPath().toString());
            }

        }
        FsStatus fsStatus = fs.getStatus(homePath);
    }

    public FileSystem createFileSystem(String user) throws Exception {
        final Configuration conf = loadHadoopConf();
        conf.set("hadoop.job.ugi", user);
//        conf.set("HADOOP_USER_NAME", user);
        if (fileSystemCache.get(user) != null) {
            return fileSystemCache.get(user);
        }
        UserGroupInformation ugi = getProxyUser(user);
        FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
            public FileSystem run() throws Exception {
                return FileSystem.get(conf);
            }
        });
        fileSystemCache.put(user, fs);
        return fs;
    }

    public static final ThreadLocal<SimpleDateFormat> appDateFormat = new ThreadLocal<SimpleDateFormat>() {
        @Override
        public SimpleDateFormat initialValue() {
            SimpleDateFormat dateformat = new java.text.SimpleDateFormat(
                    "yyyy-MM-dd HH:mm:ss");
            return dateformat;
        }
    };

    private static final String[] HADOOP_CONF_FILES = { "core-site.xml",
            "hdfs-site.xml" };

    private Configuration loadHadoopConf() {
        if (hadoopConf != null) {
            return hadoopConf;
        }
        Configuration conf = new Configuration();
        for (String fileName : HADOOP_CONF_FILES) {
            try {
                InputStream inputStream = DataFrameViewController.class
                        .getClassLoader().getResourceAsStream(fileName);
                conf.addResource(inputStream);
            } catch (Exception ex) {
            }
        }
        return conf;
    }

    public void destroy() {
        for (UserGroupInformation ugi : cache.values()) {
            try {
                FileSystem.closeAllForUGI(ugi);
            } catch (IOException ioe) {
//                 Logger.error("Exception occurred while closing filesystems for "
//                 + ugi.getUserName(), ioe);
            }
        }
        cache.clear();
    }

    private UserGroupInformation getProxyUser(String user) throws IOException {
        cache.putIfAbsent(user, UserGroupInformation.createRemoteUser(user));
        return cache.get(user);
    }
}
时间: 2024-12-23 16:43:12

用java api读取HDFS文件的相关文章

Java API 读取HDFS目录下的所有文件

/** * 获取1号店生鲜食品的分类id字符串 * @param filePath * @return */ public String getYHDSXCategoryIdStr(String filePath) { final String DELIMITER = new String(new byte[]{1}); final String INNER_DELIMITER = ","; // 遍历目录下的所有文件 BufferedReader br = null; try { F

Java API 读取HDFS的单文件

HDFS上的单文件: -bash-3.2$ hadoop fs -ls /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category Found 1 items -rw-r--r-- 2 deploy supergroup 520 2014-08-14 17:03 /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category/repeatRecCategor

记录一次读取hdfs文件时出现的问题java.net.ConnectException: Connection refused

公司的hadoop集群是之前的同事搭建的,我(小白一个)在spark shell中读取hdfs上的文件时,执行以下指令 >>> word=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt") >>> word.first() 报错:java.net.ConnectException: Call From hadoop/133.0.123.130 to localhost:9000 fail

使用java api操作Hadoop文件 Robbin

1 package cn.hadoop.fs; 2 3 import java.io.IOException; 4 import java.io.InputStream; 5 import java.net.URI; 6 import java.net.URISyntaxException; 7 8 import org.apache.hadoop.conf.Configuration; 9 import org.apache.hadoop.fs.FSDataOutputStream; 10 i

Java API操作HDFS

HDFS是存储数据的分布式文件系统,对HDFS的操作,就是对文件系统的操作,除了用HDFS的shell命令对文件系统进行操作,我们也可以利用Java API对文件系统进行操作,比如文件的创建.删除.修改权限等等,还有文件夹的创建.删除.重命名等等. 使用Java API对文件系统进行操作主要涉及以下几个类: 1.Configuration类:该类的对象封装了客户端或者服务端的配置. 2.FileSystem类:该类的对象是一个文件系统对象,可以利用该对象的一些方法来对文件进行操作,FileSys

Hadoop读书笔记(三)Java API操作HDFS

Hadoop读书笔记(一)Hadoop介绍:http://blog.csdn.net/caicongyang/article/details/39898629 Hadoop读书笔记(二)HDFS的shell操作:http://blog.csdn.net/caicongyang/article/details/41253927 JAVA URL 操作HDFS OperateByURL.java package hdfs; import java.io.InputStream; import jav

使用Java API操作hdfs

如题 我就是一个标题党  就是使用JavaApi操作HDFS,使用的是MAVEN,操作的环境是Linux 首先要配置好Maven环境,我使用的是已经有的仓库,如果你下载的jar包 速度慢,可以改变Maven 下载jar包的镜像站改为 阿里云. 贴一下  pom.xml 使用到的jar包 <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifa

五种方式让你在java中读取properties文件内容不再是难题

一.背景 最近,在项目开发的过程中,遇到需要在properties文件中定义一些自定义的变量,以供java程序动态的读取,修改变量,不再需要修改代码的问题.就借此机会把Spring+SpringMVC+Mybatis整合开发的项目中通过java程序读取properties文件内容的方式进行了梳理和分析,先和大家共享. 二.项目环境介绍 Spring 4.2.6.RELEASE SpringMvc 4.2.6.RELEASE Mybatis 3.2.8 Maven 3.3.9 Jdk 1.7 Id

java中读取特殊文件的类型

java中读取特殊文件的类型: 第一种方法(字符拼接读取): public static String getType(String s){ String s1=s.substring(s.indexOf(".")+1,s.length()); if(s1.indexOf(".")>0){ s1=getTypeName(s1); } else{ return s1; } return s1;} 第二种方法(利用java中自带的格式库): new Mimetyp