Hadoop之HDFS(客户端操作) 2 HDFS的API操作 3 HDFS的I/O流操作

2 HDFS的API操作

2.1 HDFS文件上传(测试参数优先级)

  1.编写源代码

        // 文件上传
    @Test
    public void testPut() throws Exception {

        Configuration conf = new Configuration();
        conf.set("dfs.replication", "2");
        // 1.获取fs对象
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        // 2.执行上传API
        fs.copyFromLocalFile(new Path("D:\\Ztest\\yema.png"), new Path("/diyo/dashen/dengzhiyong/yema3.png"));

        // 3.关闭资源
        fs.close();
        System.out.println("上传over");
    }    

  2.将hdfs-site.xml拷贝到项目的根目录下

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
    <property>
        <name>dfs.replication</name>
        <value>1</value>
    </property>
</configuration>

  3.参数优先级

  参数优先级排序:(1)客户端代码中设置的值 >(2)ClassPath下的用户自定义配置文件 >(3)然后是服务器的默认配置

2.2 HDFS文件下载

        // 文件下载
    @Test
    public void testGet() throws Exception {
     //1 获取文件系统 
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
     //2 执行下载操作
//        fs.copyToLocalFile(new Path("/diyo/dashen/dengzhiyong/yema3.png"), new Path("D:\\Ztest\\yema2.png"));
        // delSrc是否删除源,路径,路径,useRawLocalFileSystem是否使用本地校验true(不产生crc校验)
        fs.copyToLocalFile(false, new Path("/diyo/dashen/dengzhiyong/yema3.png"), new Path("D:\\Ztest\\yema3.png"),
                true);
     //3 关闭资源
        fs.close();

        System.out.println("下载over");
    }    

2.3 HDFS文件夹删除

    // 文件/文件夹删除
    @Test
    public void testRmdir() throws Exception {
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        // 删除(recursive:true :递归删除)
        fs.delete(new Path("/diyo/dashen/dengzhiyong/yema3.png"), true);
        fs.close();
        System.out.println("删除over");
    }

2.4 HDFS文件名更改

    // 更改文件名
    @Test
    public void testReName() throws Exception {
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        fs.rename(new Path("/diyo/dashen/dengzhiyong/yema2.png"), new Path("/diyo/dashen/dengzhiyong/yema3.png"));

        fs.close();
        System.out.println("重命名over");
    }

2.5 HDFS文件详情查看

    // 查看文件详情:名称、权限、长度、块信息
    @Test
    public void testListFile() throws Exception {

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/Diyo"), true);
        while (listFiles.hasNext()) { // 迭代器 : 有没有文件信息
            LocatedFileStatus fileStatus = listFiles.next(); // 如果有,拿到信息
            // 名称
            String name = fileStatus.getPath().getName();
            System.out.println("name:\t" + name);
            // 权限
            FsPermission permission = fileStatus.getPermission();
            System.out.println("permission:\t" + permission);
            // 长度
            long len = fileStatus.getLen();
            System.out.println("len:\t" + len);
            // 分组
            String group = fileStatus.getGroup();
            System.out.println("group:\t" + group);

            // 块信息(数组是因为有多个副本)
            BlockLocation[] blockLocations = fileStatus.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                System.out.println("blockLocation:\t" + blockLocation);
                String[] hosts = blockLocation.getHosts();
                for (String host : hosts) {
                    System.out.println("host:\t" + host);
                }
            }
            System.out.println("-----------------");
        }
    }

2.6 HDFS文件和文件夹判断

    // 文件和文件夹的判断
    @Test
    public void testListStatus() throws Exception {

        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : listStatus) {
            if (fileStatus.isFile()) {
                System.out.println("文件-:" + fileStatus.getPath().getName());
            }
            if (fileStatus.isDirectory()) {
                System.out.println("文件夹r:/" + fileStatus.getPath().getName());
                fs.listFiles(fileStatus.getPath(), true);
            }
        }

        /*
         * RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"),
         * true); while (listFiles.hasNext()) { LocatedFileStatus fileStatus =
         * listFiles.next(); // fileStatus.getPath();
         *
         * FileStatus[] listStatus = fs.listStatus(fileStatus.getPath());
         *
         * for (FileStatus status : listStatus) { if (status.isFile()) {
         * System.out.println("文件-:" + status.getPath().getName()); } else {
         * System.out.println("文件夹d:" + status.getPath().getName()); } } }
         */
        fs.close();
        System.out.println("判断over");
    }

2.7 HDFS查看文件内容目录结构

    //查看文件内容
    @Test
    public void testCatFileContext() throws Exception{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
        FSDataInputStream fdis = fs.open(new Path("/xsync"));
        int len = 0;
        while((len = fdis.read())!=-1) {
            System.out.print((char)len);
        }
    }

    //查看目录结构
    @Test
    public void showTree() throws Exception{
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus sta : listStatus) {
            if (sta.isFile() && sta.getLen() > 0) {
                showDetail(sta);
//                System.out.println("------------");
            }else if (sta.isDirectory()) {
                showDetail(sta);

            }
        }

    }
    private void showDetail(FileStatus sta) {
        System.out.println
            (sta.getPath()+"\t"+
            sta.getLen()+"\t"+
            sta.getOwner()+"\t"+
            sta.getAccessTime());
    }

3 HDFS的I/O流操作

3.1 HDFS文件上传

  1.需求:把本地文件上传到HDFS根目录

  2.编写代码

@Test
public void putFileToHDFS() throws IOException, InterruptedException, URISyntaxException {

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 创建输入流
    FileInputStream fis = new FileInputStream(new File("e:/banhua.txt"));

    // 3 获取输出流
    FSDataOutputStream fos = fs.create(new Path("/banhua.txt"));

    // 4 流对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 5 关闭资源
    IOUtils.closeStream(fos);
    IOUtils.closeStream(fis);
    fs.close();
}

3.2 HDFS文件下载

  1.需求:从HDFS上下载banhua.txt文件到本地e盘上

  2.编写代码

// 文件下载
@Test
public void getFileFromHDFS() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 获取输入流
    FSDataInputStream fis = fs.open(new Path("/banhua.txt"));

    // 3 获取输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/banhua.txt"));

    // 4 流的对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 5 关闭资源
    IOUtils.closeStream(fos);
    IOUtils.closeStream(fis);
    fs.close();
}

3.3 定位文件读取

  1.需求:分块读取HDFS上的大文件,比如根目录下的/hadoop-2.7.2.tar.gz

  2.编写代码

  (1)下载第一块

@Test
public void readFileSeek1() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 获取输入流
    FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

    // 3 创建输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part1"));

    // 4 流的拷贝
    byte[] buf = new byte[1024];

    for(int i =0 ; i < 1024 * 128; i++){
        fis.read(buf);
        fos.write(buf);
    }

    // 5关闭资源
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
fs.close();
}

  (2)下载第二块

@Test
public void readFileSeek2() throws IOException, InterruptedException, URISyntaxException{

    // 1 获取文件系统
    Configuration configuration = new Configuration();
    FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "atguigu");

    // 2 打开输入流
    FSDataInputStream fis = fs.open(new Path("/hadoop-2.7.2.tar.gz"));

    // 3 定位输入数据位置
    fis.seek(1024*1024*128);

    // 4 创建输出流
    FileOutputStream fos = new FileOutputStream(new File("e:/hadoop-2.7.2.tar.gz.part2"));

    // 5 流的对拷
    IOUtils.copyBytes(fis, fos, configuration);

    // 6 关闭资源
    IOUtils.closeStream(fis);
    IOUtils.closeStream(fos);
}

  3)合并文件

  在Window命令窗口中进入到目录E:\,然后执行如下命令,对数据进行合并

  type hadoop-2.7.2.tar.gz.part2 >> hadoop-2.7.2.tar.gz.part1

  合并完成后,将hadoop-2.7.2.tar.gz.part1重新命名为hadoop-2.7.2.tar.gz。解压发现该tar

个人代码:

package com.diyo.hdfs;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSIO {

    // 从本地上传到HDFS
    @Test
    public void testputFileToHDFS() throws Exception {

        // 1 获取对象
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        // 2 获取输入流
        FileInputStream fis = new FileInputStream("D:/Ztest/yema.png");

        // 3 获取输出流
        FSDataOutputStream fos = fs.create(new Path("/newyama.png"));

        // 4 流的对拷
        IOUtils.copyBytes(fis, fos, conf);

        // 5 关闭资源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();
        System.out.println("over");
    }

    // 从HDFS下载到本地
    @Test
    public void testgetFileFromHDFS() throws Exception {
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        FSDataInputStream fis = fs.open(new Path("/newyama.png"));

        FileOutputStream fos = new FileOutputStream("d:/Ztest/newyema.png");

        IOUtils.copyBytes(fis, fos, conf);

        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();
        System.out.println("over");
    }

    // 定位文件读取(下载第一块)
    @Test
    public void testReadFileSeek1() throws Exception {

        Configuration conf = new Configuration();
        // 获取对象
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        // 获取输入流
        FSDataInputStream fis = fs.open(new Path("/hadoop-3.1.0.tar.gz"));

        // 获取输出流
        FileOutputStream fos = new FileOutputStream("d:/Ztest/hadoop-3.1.0.tar.gz.part1");

        // 流的对拷
        byte[] buf = new byte[1024];
        for (int i = 0; i < 1024 * 128; i++) {
            fis.read(buf);
            fos.write(buf);
        }

        // 关闭资源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();

        System.out.println("over");
    }

    // 定位文件读取(下载第二块)
    @Test
    public void testReadFileSeek2() throws Exception {

        Configuration conf = new Configuration();
        // 获取对象
        FileSystem fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");

        // 获取输入流
        FSDataInputStream fis = fs.open(new Path("/hadoop-3.1.0.tar.gz"));

        // 设置指定读取的起点
        fis.seek(1024*1024*128);

        // 获取输出流
        FileOutputStream fos = new FileOutputStream("d:/Ztest/hadoop-3.1.0.tar.gz.part2");

        // 流的对拷
        IOUtils.copyBytes(fis, fos, conf);

        //关闭资源
        IOUtils.closeStream(fos);
        IOUtils.closeStream(fis);
        fs.close();

        System.out.println("over");
    }
}

原文地址:https://www.cnblogs.com/Diyo/p/11355695.html

时间: 2024-11-03 23:13:13

Hadoop之HDFS(客户端操作) 2 HDFS的API操作 3 HDFS的I/O流操作的相关文章

Hadoop之HDFS客户端操作

1. HDFS 客户端环境准备 1.1 windows 平台搭建 hadoop 2.8.5 2. 创建Maven工程 # pom.xml <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>RELEASE</version> </dependency> <dep

IO流操作

一.IO流 1.  IO流是用来处理设备之间的数据传输,java是通过流的方式完成对数据的传输. 2.  操作的基本规律 1>明确源和目的. 2>操作的数据是否是纯文本. 3>当体系明确后,明确要使用哪个具体的对象. 二.IO流结构 IO流 |---字节流 |--输入流抽象基类:InputStream |--输出流抽象基类:Outputtream |---字符流 |--输入流抽象基类:Reader |--输出流抽象基类:Writer 注:由这四个类派生出来的子类名称都是以其父类名作为子类

还看不懂同事的代码?超强的 Stream 流操作姿势还不学习一下

Java 8 新特性系列文章索引. Jdk14都要出了,还不能使用 Optional优雅的处理空指针? Jdk14 都要出了,Jdk8 的时间处理姿势还不了解一下? 还看不懂同事的代码?Lambda 表达式.函数接口了解一下 前言 我们都知道 Lambda 和 Stream 是 Java 8 的两大亮点功能,在前面的文章里已经介绍过 Lambda 相关知识,这次介绍下 Java 8 的 Stream 流操作.它完全不同于 java.io 包的 Input/Output Stream ,也不是大数

高效 告别996,开启java高效编程之门 3-6流操作分类

1 重点 理解流程操作分类 常用的方法 2 Stream流操作分类: 2.1 流操作分类之中间操作(Intermediate): 无状态操作——filter/map/peek等有状态操作——dictinct/sorted/limit等 2.2 流操作分类之终端操作(Termina1):非短路操作——forEach/collect/count等短路操作——anyMatch/findFirst/findAny等 2.3 有状态操作无状态操作区别: 无状态操作: 比如map或者filter会从输入流中

Hadoop之HDFS(客户端操作) 1.环境准备

HDFS客户端操作 1.HDFS客户端环境准备 1.根据自己电脑的操作系统拷贝对应的编译后的hadoop jar包到非中文路径(例如:D:\Develop\hadoop-2.7.2),如图所示. 图 编译后的hadoop jar包 2.配置HADOOP_HOME环境变量,如图所示. 图  配置HADOOP_HOME环境变量 3. 配置Path环境变量,如图所示. 图  配置Path环境变量 4.创建一个Maven工程HdfsClientDemo 5.导入相应的依赖坐标+日志添加 <depende

大数据技术之_04_Hadoop学习_01_HDFS_HDFS概述+HDFS的Shell操作(开发重点)+HDFS客户端操作(开发重点)+HDFS的数据流(面试重点)+NameNode和SecondaryNameNode(面试开发重点)

第1章 HDFS概述1.1 HDFS产出背景及定义1.2 HDFS优缺点1.3 HDFS组成架构1.4 HDFS文件块大小(面试重点)第2章 HDFS的Shell操作(开发重点)第3章 HDFS客户端操作(开发重点)3.1 HDFS客户端环境准备3.2 HDFS的API操作3.2.1 HDFS文件上传(测试参数优先级)3.2.2 HDFS文件下载3.2.3 HDFS文件夹删除3.2.4 HDFS文件名更改3.2.5 HDFS文件详情查看3.2.6 HDFS文件和文件夹判断3.3 HDFS的I/O

【Hadoop】HDFS客户端开发示例

1.原理.步骤 2.HDFS客户端示例代码 package com.ares.hadoop.hdfs; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.URISyntaxException; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.C

HDFS客户端操作

客户端的理解 hdfs的客户端有多种形式: 1.网页形式 2.命令行形式 3.客户端在哪里运行,没有约束,只要运行客户端的机器能够跟hdfs集群联网 文件的切块大小和存储的副本数量,都是由客户端决定! 所谓的由客户端决定,是通过配置参数来定的 hdfs的客户端会读以下两个参数,来决定切块大小.副本数量: 切块大小的参数: dfs.blocksize 副本数量的参数: dfs.replication 上面两个参数应该配置在客户端机器的hadoop目录中的hdfs-site.xml中配置 <prop

使用HDFS客户端java api读取hadoop集群上的信息

本文介绍使用hdfs java api的配置方法. 1.先解决依赖,pom <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.2</version> <scope>provided</scope> </dependency> 2.配置文