Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)

  不多说,直接上代码。

 代码

package zhouls.bigdata.myWholeHadoop.HDFS.hdfs7;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
/**
* function 合并小文件至 HDFS
*
*
*/
public class MergeSmallFilesToHDFS
{
private static FileSystem fs = null; //定义文件系统对象,是HDFS上的
private static FileSystem local = null; //定义文件系统对象,是本地上的
/**
* @function main
* @param args
* @throws IOException
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException,URISyntaxException
{
list();
}

/**
*
* @throws IOException
* @throws URISyntaxException
*/
public static void list() throws IOException, URISyntaxException
{
// 读取hadoop配置文件
Configuration conf = new Configuration();
// 文件系统访问接口和创建FileSystem对象,在本地上运行模式
URI uri = new URI("hdfs://HadoopMaster:9000");
fs = FileSystem.get(uri, conf);
// 获得本地文件系统
local = FileSystem.getLocal(conf);
// 过滤目录下的 svn 文件
FileStatus[] dirstatus = local.globStatus(new Path("D://Data/tvdata/*"),new RegexExcludePathFilter("^.*svn$"));
//获取D:\Data\tvdata目录下的所有文件路径
Path[] dirs = FileUtil.stat2Paths(dirstatus);
FSDataOutputStream out = null;
FSDataInputStream in = null;
for (Path dir : dirs)
{//比如拿2012-09-17为例
//将文件夹名称2012-09-17的-去掉,直接,得到20120901文件夹名称
String fileName = dir.getName().replace("-", "");//文件名称
//只接受20120917日期目录下的.txt文件
FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
// 获得20120917日期目录下的所有文件
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
// 输出路径
Path block = new Path("hdfs://HadoopMaster:9000/middle/tv/"+ fileName + ".txt");
System.out.println("合并后的文件名称:"+fileName+".txt");
// 打开输出流
out = fs.create(block);
//循环20120917日期目录下的所有文件
for (Path p : listedPaths)
{
in = local.open(p);// 打开输入流
IOUtils.copyBytes(in, out, 4096, false); // 复制数据
// 关闭输入流
in.close();
}
if (out != null)
{
// 关闭输出流
out.close();
}
//当循环完20120917日期目录下的所有文件之后,接着依次20120918,20120919,,,
}
}

/**
*
* @function 过滤 regex 格式的文件
*
*/
public static class RegexExcludePathFilter implements PathFilter
{
private final String regex;

public RegexExcludePathFilter(String regex)
{
this.regex = regex;
}

public boolean accept(Path path)
{
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}

}

/**
*
* @function 接受 regex 格式的文件
*
*/
public static class RegexAcceptPathFilter implements PathFilter
{
private final String regex;

public RegexAcceptPathFilter(String regex)
{
this.regex = regex;
}

public boolean accept(Path path)
{
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}

}
}

时间: 2024-10-05 05:21:15

Hadoop HDFS编程 API入门系列之合并小文件到HDFS(三)的相关文章

Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce.ParseTVDataCompressAndCounter; import java.net.URI; import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Co

Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(九)

下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUnit 框架 MRUnit是Cloudera公司专为Hadoop MapReduce写的单元测试框架,API非常简洁实用.MRUnit针对不同测试对象使用不同的Driver: MapDriver:针对单独的Map测试  ReduceDriver:针对单独的Reduce测试    MapReduceDri

Hadoop HDFS编程 API入门系列之从本地上传文件到HDFS(一)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs5; import java.io.IOException; import java.net.URI;import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.ha

Hadoop HDFS编程 API入门系列之路径过滤上传多个文件到HDFS(二)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs6; import java.io.IOException;import java.net.URI;import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;import org.apa

Hadoop HDFS编程 API入门系列之HDFS_HA(五)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs3; import java.io.FileInputStream;import java.io.InputStream;import java.io.OutputStream;import java.net.URI; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSyst

Hadoop HDFS编程 API入门系列之HdfsUtil版本1(六)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs2; import java.io.FileOutputStream;import java.io.IOException; import org.apache.commons.io.IOUtils;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FSDataInputStream;

Hadoop HDFS编程 API入门系列之简单综合版本1(四)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs4; import java.io.IOException; import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.BlockLocation;import org.apache.hadoop.fs.FileStatus

Hadoop HDFS编程 API入门系列之HdfsUtil版本2(七)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.HDFS.hdfs1; import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.net.URI; import org.apache.commons.io.IOUtils;im

Hadoop HDFS编程 API入门系列之RPC版本1(八)

不多说,直接上代码. 代码 package zhouls.bigdata.myWholeHadoop.RPC.rpc1; import java.io.IOException;import java.net.InetSocketAddress; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.ipc.RPC; public class LoginController { public static void