Java 读取HDFS文件系统

最近有个需求,计算用户画像。

系统大概有800W的用户量,算每个用户的一些数据。

数据量比较大,算用hive还是毫无压力的,但是写的oracle,在给出数据给前端,就比较难受了。

然后换了种解决方法:

  1.hive计算,写的HDFS

  2.API读出来,写到hbase(hdfs和hbase的版本不匹配,没办法用sqoop 直接导)

然后问题就来了。

需要写个API,读HDFS上的文件。

主要类:ReadHDFS 

public class ReadHDFS {

    public static void main(String[]args){

        long startLong = System.currentTimeMillis();
        HDFSReadLog.writeLog("start read file");
        String path;
        if (args.length > 1) {
//            path = args[0];
            Constant.init(args[0],args[1]);
        }
        HDFSReadLog.writeLog(Constant.PATH);

        try {
            getFile(Constant.URI + Constant.PATH);
        } catch (IOException e) {
            e.printStackTrace();
        }

        long endLong = System.currentTimeMillis();
        HDFSReadLog.writeLog("cost " + (endLong -startLong)/1000 + " seconds");
        HDFSReadLog.writeLog("cost " + (endLong -startLong)/1000/60 + " minute");
    }

    private static void getFile(String filePath) throws IOException {

        FileSystem fs = FileSystem.get(URI.create(filePath), HDFSConf.getConf());
        Path path = new Path(filePath);
        if (fs.exists(path) && fs.isDirectory(path)) {

            FileStatus[] stats = fs.listStatus(path);
            FSDataInputStream is;
            FileStatus stat;
            byte[] buffer;
            int index;
            StringBuilder lastStr = new StringBuilder();
            for(FileStatus file : stats){
                try{
                    HDFSReadLog.writeLog("start read : " + file.getPath());
                    is = fs.open(file.getPath());
                    stat = fs.getFileStatus(path);
                    int sum  = is.available();
                    if(sum == 0){
                        HDFSReadLog.writeLog("have no data : " + file.getPath() );
                        continue;
                    }
                    HDFSReadLog.writeLog("there have  : " + sum + " bytes" );
                    buffer = new byte[sum];            // 注意一点,如果文件太大了,可能会内存不够用。在本机测得时候,读一个100多M的文件,导致内存不够。
                    is.readFully(0,buffer);
                    String result = Bytes.toString(buffer);
                    // 写到 hbase
                    WriteHBase.writeHbase(result);

                    is.close();
                    HDFSReadLog.writeLog("read : " + file.getPath() + " end");
                }catch (IOException e){
                    e.printStackTrace();
                    HDFSReadLog.writeLog("read " + file.getPath() +" error");
                    HDFSReadLog.writeLog(e.getMessage());
                }
            }
            HDFSReadLog.writeLog("Read End");
            fs.close();

        }else {
            HDFSReadLog.writeLog(path + " is not exists");
        }

    }
}

配置类:HDFSConfie(赶紧没什么用,url和path配好了,不需要配置就可以读)

public class HDFSConf {

    public static Configuration conf = null;
    public static Configuration getConf(){
        if (conf == null){
            conf = new Configuration();
            String path  = Constant.getSysEnv("HADOOP_HOME")+"/etc/hadoop/";
            HDFSReadLog.writeLog("Get hadoop home : " + Constant.getSysEnv("HADOOP_HOME"));
            // hdfs conf
            conf.addResource(path+"core-site.xml");
            conf.addResource(path+"hdfs-site.xml");
            conf.addResource(path+"mapred-site.xml");
            conf.addResource(path+"yarn-site.xml");
        }
        return conf;
    }

}

一些常量:

 url : hdfs:ip:prot

 path : HDFS的路径

注: 考虑到读的表,可能不止有一个文件,做了循环。

看下篇,往hbase写数据

时间: 2024-10-10 20:42:48

Java 读取HDFS文件系统的相关文章

贴一段java读取hdfs 解压gz zip tar.gz保存到hdfs的代码

package main.java; import java.io.*;import java.util.LinkedList;import java.util.List;import java.util.zip.*; import org.apache.commons.compress.archivers.ArchiveException; import org.apache.commons.compress.archivers.ArchiveInputStream; import org.a

hadoop系列二:HDFS文件系统的命令及JAVA客户端API

转载请在页首明显处注明作者与出处 http://www.cnblogs.com/zhuxiaojie/p/6391518.html 一:说明 此为大数据系列的一些博文,有空的话会陆续更新,包含大数据的一些内容,如hadoop,spark,storm,机器学习等. 当前使用的hadoop版本为2.6.4 上一篇:hadoop系列一:hadoop集群安装 二:HDFS的shell命令 上一章说完了安装HADOOP集群部分,这一张讲HDFS. 其实基本上操作都是通过JAVA API来操作,所以这里的s

HDFS文件系统基操--Java实现

Java实现对HDFS文件系统的基本操作 1.准备好jar包 2.创建一个类 1. 测试连接 @Test //测试是否连接成功 public void test() { //添加配置 ==> core-site.xml Configuration conf = new Configuration(); //配置默认地址端口 conf.set("fs.defaultFS", "hdfs://192.168.1.105:9000"); try { //加载配置 F

Java API 读取HDFS目录下的所有文件

/** * 获取1号店生鲜食品的分类id字符串 * @param filePath * @return */ public String getYHDSXCategoryIdStr(String filePath) { final String DELIMITER = new String(new byte[]{1}); final String INNER_DELIMITER = ","; // 遍历目录下的所有文件 BufferedReader br = null; try { F

Java API 读取HDFS的单文件

HDFS上的单文件: -bash-3.2$ hadoop fs -ls /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category Found 1 items -rw-r--r-- 2 deploy supergroup 520 2014-08-14 17:03 /user/pms/ouyangyewei/data/input/combineorder/repeat_rec_category/repeatRecCategor

记录一次读取hdfs文件时出现的问题java.net.ConnectException: Connection refused

公司的hadoop集群是之前的同事搭建的,我(小白一个)在spark shell中读取hdfs上的文件时,执行以下指令 >>> word=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt") >>> word.first() 报错:java.net.ConnectException: Call From hadoop/133.0.123.130 to localhost:9000 fail

Hadoop HDFS (3) JAVA访问HDFS

现在我们来深入了解一下Hadoop的FileSystem类.这个类是用来跟Hadoop的文件系统进行交互的.虽然我们这里主要是针对HDFS,但是我们还是应该让我们的代码只使用抽象类FileSystem,这样我们的代码就可以跟任何一个Hadoop的文件系统交互了.在写测试代码时,我们可以用本地文件系统测试,部署时使用HDFS,只需配置一下,不需要修改代码了. 在Hadoop 1.x以后的版本中引入了一个新的文件系统接口叫FileContext,一个FileContext实例可以处理多种文件系统,而

java读写HDFS

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件.删除文件.读取文件内容等操作.下面记录一下使用JAVA API对HDFS中的文件进行操作的过程. 对分HDFS中的文件操作主要涉及一下几个类: Configuration类:该类的对象封转了客户端或者服务器的配置. FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作.FileSystem fs = FileSystem.get(conf);通过FileSystem的静态

Java操作HDFS开发环境搭建以及HDFS的读写流程

Java操作HDFS开发环境搭建 在之前我们已经介绍了如何在Linux上进行HDFS伪分布式环境的搭建,也介绍了hdfs中一些常用的命令.但是要如何在代码层面进行操作呢?这是本节将要介绍的内容: 1.首先使用IDEA创建一个maven工程: maven默认是不支持cdh的仓库的,需要在pom.xml中配置cdh的仓库,如下: <repositories> <repository> <id>cloudera</id> <url>https://re