Hadoop HDFS Java API

[toc]


Hadoop HDFS Java API

主要是Java操作HDFS的一些常用代码,下面直接给出代码:

package com.uplooking.bigdata.hdfs;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 列出目录的内容:listStatus
 * 读取文件:open
 * 创建目录:mkdirs
 * 创建文件:create
 * 删除文件或目录:delete
 * 显示文件存储位置:getFileBlockLocations
 */
public class HDFSTest {
    private FileSystem fs;
    private DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm");

    /**
     * 初始化资源
     * @throws Exception
     */
    @Before
    public void setUp() throws Exception {
        URI uri = new URI("hdfs://uplooking01:9000");
        Configuration configuration = new Configuration();
        fs = FileSystem.get(uri, configuration);
    }

    /**
     * 列出目录的内容:listStatus
     * 模仿:
     * $ hdfs dfs -ls /
     * -rw-r--r--   1 uplooking supergroup         28 2018-02-28 12:29 /hello
     * drwxr-xr-x   - uplooking supergroup          0 2018-02-28 12:31 /output
     * drwx------   - uplooking supergroup          0 2018-02-28 12:31 /tmp
     *
     * @throws IOException
     */
    @Test
    public void testList() throws IOException {
        FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : fileStatuses) {
            // 先定义好需要判断才能确定的项
            String prefix = "d";
            String repliaction = "-";

            // 获取文件类型
            if (fileStatus.isFile()) {
                prefix = "-";
            }

            // 获取权限列表
            FsPermission permission = fileStatus.getPermission();
            String uacl = permission.getUserAction().SYMBOL;
            String gacl = permission.getGroupAction().SYMBOL;
            String oacl = permission.getOtherAction().SYMBOL;
            String acl = uacl + gacl + oacl;

            // 获取复制因子数
            if (fileStatus.isFile()) {
                repliaction = fileStatus.getReplication() + "";
            }

            // 获取文件属主
            String owner = fileStatus.getOwner();
            // 获取文件属组
            String group = fileStatus.getGroup();

            // 获取文件大小
            long len = fileStatus.getLen();
            // 获取文件修改时间
            String mTime = df.format(new Date(fileStatus.getModificationTime()));
            // 获取文件路径
            Path path = fileStatus.getPath();

            // 格式化输出
            System.out.println(prefix + acl + "\t" + repliaction + "\t" + owner + "  " + group + "\t" + mTime + "\t" + path);
        }
    }

    /**
     * 读取文件:open
     *
     * @throws IOException
     */
    @Test
    public void testOpen() throws IOException {
        FSDataInputStream fis = fs.open(new Path("hdfs://uplooking01:9000/hello"));
        // 方式1:
       /* byte[] bytes = new byte[1024];
        int len = 0;
        while ((len = fis.read(bytes)) != -1) {
            System.out.println(new String(bytes, 0, len));
        }
        fis.close();*/

        // 方式2:
        /*BufferedReader br = new BufferedReader(new InputStreamReader(fis));
        String line = null;
        while ((line = br.readLine()) != null) {
            System.out.println(line);
        }
        fis.close();*/

        // 方式3:
        IOUtils.copyBytes(fis, System.out, 1024, false);
    }

    /**
     * 创建目录:mkdirs
     *
     * @throws IOException
     */
    @Test
    public void testMkdir() throws IOException {
        boolean ret = fs.mkdirs(new Path("/input/hdfs"));
        System.out.println(ret ? "创建目录成功" : "创建目录失败");
    }

    /**
     * 创建文件:create
     *
     * @throws IOException
     */
    @Test
    public void testCreate() throws IOException {
        // 第二个参数为是否覆盖,Files are overwritten by default
        FSDataOutputStream fos = fs.create(new Path("/input/hdfs/word.txt"), false);
        fos.write("hello\n".getBytes());
        fos.write("xpleaf\n".getBytes());
        fos.close();
    }

    /**
     * 删除文件或目录:delete
     *
     * @throws IOException
     */
    @Test
    public void testDelete() throws IOException {
        // 第二个参数为是否递归删除(当删除目录时)
        boolean ret = fs.delete(new Path("/input/hdfs/word.txt"), false);
        System.out.println(ret ? "删除成功" : "删除失败");
    }

    /**
     * 显示文件存储位置:getFileBlockLocations
     *
     * @throws IOException
     */
    @Test
    public void testLocations() throws IOException {
        Path path = new Path("/hadoop-2.6.4.tar.gz");
        FileStatus fileStatus = fs.getFileStatus(path);
        // 参数分别为:文件路径   偏移起始位置  文件长度
        BlockLocation[] locations = fs.getFileBlockLocations(path, 0, fileStatus.getLen());
        System.out.println(locations);
        for (BlockLocation location : locations) {
            System.out.println(location);
        }
        /**
         * 0,134217728,uplooking01          (偏移量从0开始,大小为128MB的块存储在节点uplooking01上)
           134217728,61798247,uplooking01   (偏移量从128M开始,大小为59M的块(就是剩余大小)存储在节点uplooking01上)
           可以看到,两个块都只存在uplooking01上的,这是因为这里的hadoop环境是伪分布式的
         */
    }

    /**
     * 释放资源
     * @throws IOException
     */
    @After
    public void cleanUp() throws IOException {
        fs.close();
    }
}

原文地址:http://blog.51cto.com/xpleaf/2074023

时间: 2024-11-09 03:52:36

Hadoop HDFS Java API的相关文章

HDFS Java API 常用操作

package com.luogankun.hadoop.hdfs.api; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.InputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.

【Hadoop】HA 场景下访问 HDFS JAVA API Client

客户端需要指定ns名称,节点配置,ConfiguredFailoverProxyProvider等信息. 代码示例: package cn.itacst.hadoop.hdfs; import java.io.FileInputStream; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; impor

HDFS Java API的使用举例

HDFS是Hadoop应用程序使用的主要分布式存储.HDFS集群主要由管理文件系统元数据的NameNode和存储实际数据的DataNodes组成,HDFS架构图描述了NameNode,DataNode和客户端之间的基本交互.客户端联系NameNode进行文件元数据或文件修改,并直接使用DataNodes执行实际的文件I / O. Hadoop支持shell命令直接与HDFS进行交互,同时也支持JAVA API对HDFS的操作,例如,文件的创建.删除.上传.下载.重命名等. HDFS中的文件操作主

Hadoop HDFS编程 API入门系列之HDFS_HA(五)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs3; import java.io.FileInputStream;import java.io.InputStream;import java.io.OutputStream;import java.net.URI; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSyst

Hadoop HDFS编程 API入门系列之简单综合版本1(四)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs4; import java.io.IOException; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus

Hadoop HDFS Java编程

import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.net.URI; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apac

hadoop学习记录(二)HDFS java api

FSDateinputStream 对象 FileSystem对象中的open()方法返回的是FSDateInputStream对象,改类继承了java.io.DateInoutStream接口.支持随机访问 Seekable接口 支持在文件中找到指定位置,并提供一个查询当前位置相对于文件起始位置偏移量的查询方法. public interface Seekable{ //seek()可以移到文件中任意一个绝对位置 void seek(long pos); long getPos(); bool

HDFS Java API使用之读取上传文件

package com.ibeifeng.hadoop.senior.hdfs; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOu

Hadoop HDFS编程 API入门系列之HdfsUtil版本1(六)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs2; import java.io.FileOutputStream;import java.io.IOException; import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;