HDFS 工具类

读取HDFS上文件数据

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringWriter;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.Progressable;
/**
 * @author 作者 E-mail:
 * @version 创建时间:2016年3月8日 上午9:37:49 类说明
 * 读取hdfs文件数据
 */
public class ReadHDFSDatas {

    static Configuration conf = new Configuration();
    /**
     *
     *
     * @param location
     * @param conf
     * @return
     * @throws Exception
     */
    public static List<String> readLines( Path location, Configuration conf )
        throws Exception {
        // StringBuffer sb = new StringBuffer();
        FileSystem fileSystem = FileSystem.get( location.toUri(), conf );
        CompressionCodecFactory factory = new CompressionCodecFactory( conf );
        FileStatus[] items = fileSystem.listStatus( location );
        if ( items == null )
            return new ArrayList<String>();
        List<String> results = new ArrayList<String>();
        for ( FileStatus item : items ) {

            // ignoring files like _SUCCESS
            if ( item.getPath().getName().startsWith( "_" ) ) {
                continue;
            }
            CompressionCodec codec = factory.getCodec( item.getPath() );
            InputStream stream = null;

            if ( codec != null ) {
                stream = codec.createInputStream( fileSystem.open( item.getPath() ) );
            }
            else {
                stream = fileSystem.open( item.getPath() );
            }

            StringWriter writer = new StringWriter();
            IOUtils.copy( stream, writer, "UTF-8" );
            String raw = writer.toString();
            // String[] resulting = raw.split( "\n" );
            for ( String str : raw.split( "\t" ) ) {
                results.add( str );
                System.out.println( "start..." + results + "....." );
            }
        }
        return results;
    }

    public String ReadFile( String hdfs )
        throws IOException {
        StringBuffer sb = new StringBuffer();
        FileSystem fs = FileSystem.get( URI.create( hdfs ), conf );
        FSDataInputStream hdfsInStream = fs.open( new Path( hdfs ) );
        try {
            fs = FileSystem.get( conf );
            hdfsInStream = fs.open( new Path( hdfs ) );
            byte[] b = new byte[10240];
            int numBytes = 0;
            // Windows os error
            while ( ( numBytes = hdfsInStream.read( b ) ) > 0 ) {
                numBytes = hdfsInStream.read( b );

            }

        }
        catch ( IOException e ) {

            e.printStackTrace();
        }
        hdfsInStream.close();
        fs.close();
        return sb.toString();
    }

    /**
     *
     * @param filePath
     * @return
     * @throws IOException
     */
    public static String getFile( String filePath ) throws IOException {
        String line = "";
        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
            Path pathq = new Path( filePath );
            FSDataInputStream fsr = fs.open( pathq );

            while ( line != null ) {
                line = fsr.readLine();
                if ( line != null ) {
                    System.out.println( line );
                }
            }

        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
        return line;
    }

    /*
     *
     */
    public static List<String> getDatas( String filePath )  {
       List<String> list = new ArrayList<String>();

        try {
            Configuration conf = new Configuration();
            FileSystem fs = FileSystem.get( URI.create( filePath ), conf );
            Path pathq = new Path( filePath );
            FSDataInputStream fsr = fs.open( pathq );
            String line ="";
            while ( line != null ) {
                line = fsr.readLine();
                if ( line != null ) {

                    list.add( line );
                }
            }
        }
        catch ( Exception e ) {
            e.printStackTrace();
        }
        return list;
    }
    public static void main( String[] args ){
        //String hdfs = "hdfs://node4:9000/hive/warehouse/u_data/u.data";
        //String  hdfs = "/datas/t1";
        String  hdfs = "/datas/u.data";
        Path path = new Path( hdfs );
        // String hdfs = "/datas";
        // String hdfs = "/hive/warehouse/u_data/u.data";
      //  getFile(hdfs);
        /**
         * userid INT,
        movieid INT,
        rating INT,
        weekday INT)

         */
        List<String> listDatas = getDatas(hdfs);
        for (int i = 0; i < listDatas.size(); i++){
                String[] split = listDatas.get(i).split("\t");
                String userid = split[0];
                String movieid = split[1];
                String rating = split[2];
                String weekday = split[3];
                String makeRowKey = RegionSeverSplit.makeRowKey(userid);          // 用put API实现批量入库
                //System.out.println("userid--"+ userid + ".."+ "movieid--"+ movieid + ".." +"rating--"+ rating + ".."+"weekday--"+ weekday + "....");
                HBaseUtils.addRows("t1", makeRowKey, "f1", "weekday-rating", (movieid+"-"+rating+"-"+weekday).getBytes());
        }
        System.out.println("success......");
    }
}

HBase 随机生成rowkey 前置处理

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

import org.apache.commons.codec.binary.Hex;

public class RegionSeverSplit {

    public  static String makeRowKey(String id){
         String md5_content = null;
            try {
                MessageDigest messageDigest = MessageDigest.getInstance("MD5");
                messageDigest.reset();
                messageDigest.update(id.getBytes());
                byte[] bytes = messageDigest.digest();
                md5_content = new String(Hex.encodeHex(bytes));
            } catch (NoSuchAlgorithmException e1) {
                e1.printStackTrace();
            }
            //turn right md5
            String right_md5_id = Integer.toHexString(Integer.parseInt(md5_content.substring(0,7),16)>>1);
            while(right_md5_id.length()<7){
                right_md5_id = "0" + right_md5_id;
            }
            return right_md5_id + "-" + id;
    }
    public static void main(String[] args){
        String rowky = makeRowKey("asdfasdf");
        System.out.println(rowky);
    }
}

HBase Util工具类,用put方式批量或者单条数据入库

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.mapreduce.InputSplit;

import cn.tansun.bd.hdfs.ReadHDFSDatas;

/**
 *
 * @author root
 *
 */

public class HBaseUtils {
    private static HBaseAdmin hadmin = null;
    private static Configuration conf;
    private static HTable htable = null;

    static {
        conf = new Configuration();
        String filePath = "hbase-site.xml";
        Path path = new Path(filePath);
        conf.addResource(path);
        conf = HBaseConfiguration.create(conf);
    }

    /**
     * insert one row
     *
     * @param tableName
     * @param rowkey
     * @param columnFinaly
     * @param columnName
     * @param values
     * @return
     */
    public static boolean addRow(String tableName, String rowkey,
            String columnFinaly, String columnName, byte[] values) {
        boolean flag = true;
        if (tableName != null) {
            HTablePool hTpool = new HTablePool(conf, 1000);
            HTableInterface table = hTpool.getTable(tableName);
            Put put = new Put(rowkey.getBytes());
            put.addColumn(columnFinaly.getBytes(), columnName.getBytes(),
                    values);
            try {
                table.put(put);
                System.out.print("addRow success..." + "tableName....."
                        + tableName);
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        } else {
            System.out.println("  please select tableName");
        }

        return flag;
    }

    public static void main(String[] args) {
        /*String makeRowKey = RegionSeverSplit.makeRowKey("adcdfef");
        String tableName = "student";
        String columnfianly = "info";
        String columnName = "name";
        String values = "zhangsan";
        addRow(tableName, makeRowKey, columnfianly, columnName,
                values.getBytes());*/
        ReadHDFSDatas readh = new ReadHDFSDatas();
        String hdfs = "/datas/u.data";
        List<String> getDatas = readh.getDatas(hdfs);
        for (int i = 0; i < getDatas.size(); i++){
            if (i < 100){
                System.out.println(getDatas.get(i));
            }
        }
    }

    /**
     * put many rows
     *
     * @param tableName
     * @param rowkey
     * @param columnFinaly
     * @param columnName
     * @param values
     * @return
     */
    public static List<Put> addRows(String tableName, String rowkey,
            String columnFinaly, String columnName, byte[] values) {
        List<Put> lists  = null;
        long start = System.currentTimeMillis();
        if (tableName != null || rowkey != null) {
            HTablePool hTablePool = new HTablePool(conf, 1000);
            HTableInterface table = hTablePool.getTable(tableName);
            try {
                table.setAutoFlush(false);
                table.setWriteBufferSize(1024 * 1024 * 1);
                lists = new ArrayList<Put>();
                Random random = new Random();
                byte[] buffers = new byte[256];
                int count = 100;
                for (int i = 0; i < count; i++){
                    Put put = new Put(rowkey.getBytes());
                    random.nextBytes(buffers);
                    put.add(columnFinaly.getBytes(), columnName.toString().getBytes(), values);
                    put.getDurability();
                    //table.setAutoFlush(false);
                    if ( i % 100 == 0){

                        lists.add(put);
                        try {
                            table.batch(lists);
                        } catch (InterruptedException e) {
                            System.out.println("error......");
                            e.printStackTrace();
                        }
                        table.put(lists);
                        lists.clear();
                        table.flushCommits();
                    }
                }
            } catch (IOException e) {

                e.printStackTrace();
            }

        } else {
            System.out.println("..tableName  not null");
        }
        long end = System.currentTimeMillis();
        long times = end - start;
        System.out.println(times * 1.0 / 1000 +"..... finsh........"  );
        return lists;
    }

    /**
     * read datas by fileName
     * @param fileName
     * @return
     */
    public List<String> getFileDatas(String fileName){

        return null;
    } 

    /**
     * read hdfs datas by fileName
     * @param fileName
     * @return
     */
    public static List<String> getHdfsDatas(String fileName){

    /*    List<String> getDatas = ReadHDFSDatas.getDatas(fileName);
        for (int i = 0; i < getDatas.size(); i++){
            if (i < 100){
                System.out.println(getDatas.get(i));
            }
        }
        return getDatas;*/
        return null;
    }
    /**
     *
     * @param startKey
     * @param endKey
     * @return
     */
    public List<InputSplit> getSplits(byte[] startKey, byte[] endKey) {
        return null;
    }
}
时间: 2024-08-27 01:39:51

HDFS 工具类的相关文章

hadoop的dfs工具类一个

开始没搞定插件问题,就弄了个dsf操作类,后面搞定了插件问题,这玩意也就聊胜于无了,还是丢这里算了. 首先是一个配置,ztool.hadoop.properties hadoop.home.dir=G:/hadoop/hadoop-2.4.1 hadoop.user.name=hadoop hadoop.server.ip=192.168.117.128 hadoop.server.hdfs.port=9000 前面两个属性后面代码会有说明的. 属性文件的读取,方法多了,一般用commons-c

Arrays工具类

Arraysd的静态方法能够方便的对数组进行操作,每个方法也加了注释 : 程序: import java.util.*;public class Array{        public static void main(String[] args){                int[]  arr={1,3,4,2};                System.out.println("排序前:");                printArray(arr);//打印原数组

常用工具类(System,Runtime,Date,Calendar,Math)

一.Sy 一个java.lang包中的静态工具类. 三大字段: static PrintStream err "标准"错误输出流. static InputStream in "标准"输入流. static PrintStream out "标准"输出流. 其他常用方法: 描述系统信息: 获取系统属性信息: static Properties getProperties(): (Properties是Hashtable的子类,也就是Map 的子类

iOS 中的正则匹配(工具类)

正则表达式 正则表达式是对字符串操作的一种逻辑公式, 用事先定义好的一些特定字符.及这些特定字符的组合, 组成一个"规则字符串", 这个"规则字符串"用来表达对字符串的一种过滤逻辑, 正则表达式就是用于描述这些规则的工具, 或者说, 正则表达式就是记录文本规则的代码. 在开发中, 我们经常会有查找符合某些复杂规则的字符串的需要, 比如数据校验: 判断用户的输入是否合法(如:用户注册的时候,QQ号码,电话号码,邮箱是否符合要求) 下面让我们先来看看正则匹配常用的一些字

(九十五)音效播放方法和工具类的制作

音效通过AVFoundation框架实现,是通过函数而不是方法,因此需要进行桥接等操作,具体步骤如下. 进行音效播放,首先要得到音效的URL(只能是本地音频),然后转换为音效ID(唯一),通过ID播放音效. [音效播放方法] ①导入框架主头文件 #import <AVFoundation/AVFoundation.h> ②通过Bundle拿到本地音效,然后调用AudioServicesCreateSystemSoundID函数得到音效ID,ID为0代表无效,以此为依据可进行懒加载 @inter

spring endpoint工具类

工具类代码 @Controller public class EndpointDocController {     private final RequestMappingHandlerMapping handlerMapping;     @Autowired     public EndpointDocController(RequestMappingHandlerMapping handlerMapping) {         this.handlerMapping = handler

web常用的工具类总结

数据库的链接的操作类 package utils; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class DBConnection { private static final String DBDRIVER = "com.m

字符串工具类(指定字符串的长度和判断是否为空等方法)

package com.sec.util; /** * 字符串工具类 * @author Administrator * */public class StringUtil { /** * 过滤<,>,\n 字符串的方法 * @param input * @return */ public static String filterHTML(String input){ if(input == null || input.length() == 0){ return input; } input

java并发的艺术-读书笔记-第八章常用的并发工具类

jdk中提供了几个非常有用的工具类,分别是CountDownLatch,CyclicBarrier和semaphore exchanger CountDownLatch:允许一个或者多个线程等待其他线程完成操作 public class CountDownLatchTest{ static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args){ new Thread(new Runnabl