Java读写hdfs上的avro文件

1、通过Java往hdfs写avro文件

 1 import java.io.File;
 2 import java.io.IOException;
 3 import java.io.OutputStream;
 4 import java.nio.ByteBuffer;
 5
 6 import org.apache.avro.Schema;
 7 import org.apache.avro.file.CodecFactory;
 8 import org.apache.avro.file.DataFileWriter;
 9 import org.apache.avro.generic.GenericData;
10 import org.apache.avro.generic.GenericDatumWriter;
11 import org.apache.avro.generic.GenericRecord;
12 import org.apache.commons.io.FileUtils;
13 import org.apache.hadoop.conf.Configuration;
14 import org.apache.hadoop.fs.FileSystem;
15 import org.apache.hadoop.fs.Path;
16 import org.apache.hadoop.io.IOUtils;
17
18 public class HdfsAvroTest {
19
20     public static final String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", "
21             + "\"fields\": ["
22             + "{\"name\":\""
23             + "username"
24             + "\",\"type\":\"string\"},"
25             + "{\"name\":\""
26             + "password"
27             + "\", \"type\":\"string\"}]}";
28     public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON);
29
30     public static void writeToAvro(File srcPath, OutputStream outputStream)
31             throws IOException {
32         DataFileWriter<Object> writer = new DataFileWriter<Object>(
33                 new GenericDatumWriter<Object>()).setSyncInterval(100);
34         writer.setCodec(CodecFactory.snappyCodec());
35         writer.create(SCHEMA, outputStream);
36         for (Object obj : FileUtils.listFiles(srcPath, null, false)) {
37             File file = (File) obj;
38             String filename = file.getAbsolutePath();
39             byte content[] = FileUtils.readFileToByteArray(file);
40             GenericRecord record = new GenericData.Record(SCHEMA);
41             record.put("username", filename);
42             record.put("password", ByteBuffer.wrap(content));
43             writer.append(record);
44         }
45         IOUtils.cleanup(null, writer);
46         IOUtils.cleanup(null, outputStream);
47     }
48
49     public static void main(String[] args) throws Exception {
50         Configuration config = new Configuration();
51         FileSystem hdfs = FileSystem.get(config);
52         File sourceDir = new File(args[0]);
53         Path destFile = new Path(args[1]);
54         OutputStream os = hdfs.create(destFile);
55         writeToAvro(sourceDir, os);
56     }
57 }

2、Java读hdfs上的avro文件

 1 import java.io.IOException;
 2 import java.io.InputStream;
 3
 4 import org.apache.avro.file.DataFileStream;
 5 import org.apache.avro.generic.GenericDatumReader;
 6 import org.apache.avro.generic.GenericRecord;
 7 import org.apache.hadoop.conf.Configuration;
 8 import org.apache.hadoop.fs.FileSystem;
 9 import org.apache.hadoop.fs.Path;
10 import org.apache.hadoop.io.IOUtils;
11
12 public class HdfsReadAvro {
13
14
15     public static void readFromAvro(InputStream is) throws IOException {
16         DataFileStream<Object> reader = new DataFileStream<Object>(is,
17                 new GenericDatumReader<Object>());
18         for (Object o : reader) {
19             GenericRecord r = (GenericRecord) o;
20             System.out.println(r.get("username")+ ":"+r.get("password"));
21         }
22         IOUtils.cleanup(null, is);
23         IOUtils.cleanup(null, reader);
24     }
25
26     public static void main(String[] args) throws Exception {
27         Configuration config = new Configuration();
28         FileSystem hdfs = FileSystem.get(config);
29         Path destFile = new Path(args[0]);
30         InputStream is = hdfs.open(destFile);
31         readFromAvro(is);
32     }
33 }
时间: 2024-11-07 14:10:30

Java读写hdfs上的avro文件的相关文章

java读写HDFS

HDFS是一个分布式文件系统,既然是文件系统,就可以对其文件进行操作,比如说新建文件.删除文件.读取文件内容等操作.下面记录一下使用JAVA API对HDFS中的文件进行操作的过程. 对分HDFS中的文件操作主要涉及一下几个类: Configuration类:该类的对象封转了客户端或者服务器的配置. FileSystem类:该类的对象是一个文件系统对象,可以用该对象的一些方法来对文件进行操作.FileSystem fs = FileSystem.get(conf);通过FileSystem的静态

hbase 启动不了,删除了hdfs 上的hbase文件夹和文件

无意将hdfs上的hbase文件(hbase的rootDir为hdfs:master1:9000/hbase)删掉了,重启hbase和ZK都不行 解决方案:使用hbase的ZK 即设置 hbase-env.sh 中的  HBASE_MANAGES_ZK 为 true 在重启hbase 即可生成hbase文件 之后在使用单独的ZK来管理hbase 即可

Delphi调用JAVA的WebService上传XML文件(XE10.2+WIN764)

相关资料:1.http://blog.csdn.net/luojianfeng/article/details/512198902.http://blog.csdn.net/avsuper/article/details/8764165 注意事项: 1.生成WSDL文件时,D7可能有的无法生成,用XE版本可以全部生成. 返回字节流: 1 function GetByte(AString: string): TByteDynArray; 2 var 3 sByte: TByteDynArray;

java web图片上传和文件上传

图片上传和文件上传本质上是一样的,图片本身也是文件.文件上传就是将图片上传到服务器,方式虽然有很多,但底层的实现都是文件的读写操作. 注意事项 1.form表单一定要写属性enctype="multipart/form-data" 2.为了能保证文件能上传成功file控件的name属性值要和你提交的控制层变量名一致, 例如空间名是file那么你要在后台这样定义 private File file; //file控件名 private String fileContentType;//图

Java通过ftp上传Linux文件权限问题

背景:Java使用ftp上传文件,此文件允许别的用户去访问.Linux上ftp如下: lftp-3.7.11-4.el5ftp-0.17-35.el5tftp-server-0.49-2 ftp服务器为lftp,可以看到,支持site命令: 所以,我们需要在ftp连接成功后发送命令使用"site umask 022",如下: ftpClient.sendCommand("site umask 022"). 注:权限最高为666,rwrwrw 022代表的是644,r

java读写hdfs简单demo

环境:eclipse + eclipse hadoop插件, hadoop + rhel6.4 package test; import java.io.IOException; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.ha

Java 利用FTP上传,下载文件,遍历文件目录

Java实现FTP上传下载文件的工具包有很多,这里我采用Java自带的API,实现FTP上传下载文件.另外JDK1.7以前的版本与其之后版本的API有了较大的改变了. 例如: JDK1.7之前 JDK1.7 ftpClient = new FtpClinet() ftpClient = FtpClient.create(ip) ftpclient.login(user,password) ftpclient.login(user,null,password) ftpclient.binary()

HDFS设计思路,HDFS使用,查看集群状态,HDFS,HDFS上传文件,HDFS下载文件,yarn web管理界面信息查看,运行一个mapreduce程序,mapreduce的demo

26 集群使用初步 HDFS的设计思路 l 设计思想 分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析: l 在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务 l 重点概念:文件切块,副本存放,元数据 26.1 HDFS使用 1.查看集群状态 命令:   hdfs  dfsadmin –report 可以看出,集群共有3个datanode可用 也可打开web控制台查看HDFS集群

Spark学习笔记——读写HDFS

使用Spark读写HDFS中的parquet文件 文件夹中的parquet文件 build.sbt文件 name := "spark-hbase" version := "1.0" scalaVersion := "2.11.8" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", &quo