大数据学习——hdfs客户端流式操作代码的实现

package cn.itcast.bigdata.hdfs.diceng;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

/**
 * 相对那些封装好的方法而言的更底层一些的操作方式
 * 上层那些mapreduce   spark等运算框架,去hdfs中获取数据的时候,就是调的这种底层的api
 *
 * @author
 */
public class StreamAccess {

    FileSystem fs = null;

    @Before
    public void init() throws Exception {

        Configuration conf = new Configuration();
        System.setProperty("HADOOP_USER_NAME", "root");
        conf.set("fs.defaultFS", "hdfs://mini1:9000");
        fs = FileSystem.get(conf);
//        fs = FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "root");

    }

    @Test
    public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{

        //先获取一个文件的输入流----针对hdfs上的
        FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));

        //再构造一个文件的输出流----针对本地的
        FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));

        //再将输入流中数据传输到输出流
        IOUtils.copyBytes(in, out, 4096);

    }

    @Test
    public void testUploadByStream() throws Exception{

        //hdfs文件的输出流
        FSDataOutputStream fsout = fs.create(new Path("/aaa.txt"));

        //本地文件的输入流
        FileInputStream fsin = new FileInputStream("c:/111.txt");

        IOUtils.copyBytes(fsin, fsout,4096);

    }

    /**
     * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
     * 用于上层分布式运算框架并发处理数据
     * @throws IllegalArgumentException
     * @throws IOException
     */
    @Test
    public void testRandomAccess() throws IllegalArgumentException, IOException{
        //先获取一个文件的输入流----针对hdfs上的
        FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));

        //可以将流的起始偏移量进行自定义
        in.seek(22);

        //再构造一个文件的输出流----针对本地的
        FileOutputStream out = new FileOutputStream(new File("d:/iloveyou.line.2.txt"));

        IOUtils.copyBytes(in,out,19L,true);

    }

    /**
     * 读取指定的block
     * @throws IOException
     * @throws IllegalArgumentException
     */
    @Test
    public void testCat() throws IllegalArgumentException, IOException{

        FSDataInputStream in = fs.open(new Path("/weblog/input/access.log.10"));
        //拿到文件信息
        FileStatus[] listStatus = fs.listStatus(new Path("/weblog/input/access.log.10"));
        //获取这个文件的所有block的信息
        BlockLocation[] fileBlockLocations = fs.getFileBlockLocations(listStatus[0], 0L, listStatus[0].getLen());

        //第一个block的长度
        long length = fileBlockLocations[0].getLength();
        //第一个block的起始偏移量
        long offset = fileBlockLocations[0].getOffset();

        System.out.println(length);
        System.out.println(offset);

        //获取第一个block写入输出流
//        IOUtils.copyBytes(in, System.out, (int)length);
        byte[] b = new byte[4096];

        FileOutputStream os = new FileOutputStream(new File("d:/block0"));
        while(in.read(offset, b, 0, 4096)!=-1){
            os.write(b);
            offset += 4096;
            if(offset>length) return;
        };

        os.flush();
        os.close();
        in.close();

    }

}

原文地址:https://www.cnblogs.com/feifeicui/p/10216842.html

时间: 2024-07-31 14:46:54

大数据学习——hdfs客户端流式操作代码的实现的相关文章

大数据学习——hdfs客户端操作

package cn.itcast.hdfs; import org.apache.commons.io.IOUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.junit.Before; import org.junit.Test; import java.io.File; import java.io.FileNotFoundException; impor

大数据学习之ubuntu的基础操作

时隔两个星期,学习了和复习了大数据的一些基本操作,第一系列是ubuntu的基础! 基础涉及到: 1.virtualBOX的安装和使用 2.FTP工具传输 3.root用户密码设置和vim的安装 4.创建用户并设置权限:这里使用Hadoop用户 5.ssh免密码登入的设置 6.Java环境的安装和配置 7.文件权限的修改和操作 原文地址:https://www.cnblogs.com/R-HLC0401/p/10085690.html

大数据学习:Scala隐式转换和并发编程(DT大数据梦工厂)

很多Spark代码中使用了隐式转换.隐式参数.隐式类.隐式对象 如果不掌握,基本在读写复杂代码的时候读不懂 并发编程,怎么样进行高效并发,相互之间怎么通信,Spark这种分布式并发肯定非常重要 (Actor.Akka) ==========隐式转换函数============ 可以手动指定将某种类型的对象转换成其它类型的对象或者类 转换原因:假设制定好接口 比如File,我们想要File.dtSpark的方法,在JAVA中不行 如果在Scala里面我们可以进行升级,将File编程其它类型,就用之

大数据读书笔记(2)-流式计算

早期和当前的"流式计算"系统分别称为"连续查询处理类"和"可扩展数据流平台类"计算系统. 流式计算系统的特点: 1)低延迟 2)极佳的系统容错性 3)极强的系统扩展能力 4)灵活强大的应用逻辑表达能力 目前典型的流式计算系统: S4,storm,millwheel,samza,d-stream,hadoop online,mupd8等. 其中storm和millwheel是各方面比较突出的. 流式计算系统架构: 常见的流式计算系统架构分为两种:主

大数据学习——点击流日志每天都10T,在业务应用服务器上,需要准实时上传至(Hadoop HDFS)上

点击流日志每天都10T,在业务应用服务器上,需要准实时上传至(Hadoop HDFS)上 1需求说明 点击流日志每天都10T,在业务应用服务器上,需要准实时上传至(Hadoop HDFS)上 2需求分析 一般上传文件都是在凌晨24点操作,由于很多种类的业务数据都要在晚上进行传输,为了减轻服务器的压力,避开高峰期. 如果需要伪实时的上传,则采用定时上传的方式 3技术分析 HDFS SHELL:  hadoop fs  –put   xxxx.log  /data    还可以使用 Java Api

大数据学习总结(8)大数据场景

大数据场景一.各种标签查询 查询要素:人.事.物.单位 查询范围:A范围.B范围.... 查询结果:pic.name.data from 1.痛点:对所有文本皆有实时查询需求2.难点:传统SQL使用WHERE子句匹配LIKE关键词,在庞大的数据字段中搜索某些想要的字,需遍历所有数据页或者索引页,查询效率底,当出现千万级以上数据时,耗时较高,无法满足实时要求3.方案:使用全文检索方案,分布式架构,即使PB级量级也可做到毫秒级查询 大数据场景二.客户事件查询 查询条件:城市.区域.时间跨度(2017

大数据学习总结(7)we should...

大数据场景一.各种标签查询 查询要素:人.事.物.单位 查询范围:A范围.B范围.... 查询结果:pic.name.data from 1.痛点:对所有文本皆有实时查询需求2.难点:传统SQL使用WHERE子句匹配LIKE关键词,在庞大的数据字段中搜索某些想要的字,需遍历所有数据页或者索引页,查询效率底,当出现千万级以上数据时,耗时较高,无法满足实时要求3.方案:使用全文检索方案,分布式架构,即使PB级量级也可做到毫秒级查询 大数据场景二.客户事件查询 查询条件:城市.区域.时间跨度(2017

好程序员大数据学习路线分享Scala系列之集合操作函数

好程序员大数据学习路线继续为大家分享Scala系列之集合操作函数4.6 集合的重要函数4.6.1sum/max/min/count在序列中查找最大或最小值是一个极常见的需求,如下:val numbers = Seq(11, 2, 5, 1, 6, 3, 9) numbers.max //11 numbers.min //1 更高级的例子,其中包含一个书的序列case class Book(title: String, pages: Int) val books = Seq( Book("Futu

大数据学习路径

大数据学习路径我vx ①⑤零零③④③⑥⑨③① 备注 A 更多大数据第一阶段linux+搜索+hadoop体系 Linux基础→shell编程→高并发架构→lucene,solr搜索→hadoop体系→HDFS→mapreduce→hbase→zookeeper→hive→flume→sqoop→项目实战一 第二阶段机器学习 R语言→mahout→项目实战二 第三阶段storm流式计算 kafka→storm→redis→项目实战三 第四阶段spark内存计算 scala编程→spark core