1、通过Java往hdfs写avro文件
1 import java.io.File; 2 import java.io.IOException; 3 import java.io.OutputStream; 4 import java.nio.ByteBuffer; 5 6 import org.apache.avro.Schema; 7 import org.apache.avro.file.CodecFactory; 8 import org.apache.avro.file.DataFileWriter; 9 import org.apache.avro.generic.GenericData; 10 import org.apache.avro.generic.GenericDatumWriter; 11 import org.apache.avro.generic.GenericRecord; 12 import org.apache.commons.io.FileUtils; 13 import org.apache.hadoop.conf.Configuration; 14 import org.apache.hadoop.fs.FileSystem; 15 import org.apache.hadoop.fs.Path; 16 import org.apache.hadoop.io.IOUtils; 17 18 public class HdfsAvroTest { 19 20 public static final String SCHEMA_JSON = "{\"type\": \"record\",\"name\": \"SmallFilesTest\", " 21 + "\"fields\": [" 22 + "{\"name\":\"" 23 + "username" 24 + "\",\"type\":\"string\"}," 25 + "{\"name\":\"" 26 + "password" 27 + "\", \"type\":\"string\"}]}"; 28 public static final Schema SCHEMA = new Schema.Parser().parse(SCHEMA_JSON); 29 30 public static void writeToAvro(File srcPath, OutputStream outputStream) 31 throws IOException { 32 DataFileWriter<Object> writer = new DataFileWriter<Object>( 33 new GenericDatumWriter<Object>()).setSyncInterval(100); 34 writer.setCodec(CodecFactory.snappyCodec()); 35 writer.create(SCHEMA, outputStream); 36 for (Object obj : FileUtils.listFiles(srcPath, null, false)) { 37 File file = (File) obj; 38 String filename = file.getAbsolutePath(); 39 byte content[] = FileUtils.readFileToByteArray(file); 40 GenericRecord record = new GenericData.Record(SCHEMA); 41 record.put("username", filename); 42 record.put("password", ByteBuffer.wrap(content)); 43 writer.append(record); 44 } 45 IOUtils.cleanup(null, writer); 46 IOUtils.cleanup(null, outputStream); 47 } 48 49 public static void main(String[] args) throws Exception { 50 Configuration config = new Configuration(); 51 FileSystem hdfs = FileSystem.get(config); 52 File sourceDir = new File(args[0]); 53 Path destFile = new Path(args[1]); 54 OutputStream os = hdfs.create(destFile); 55 writeToAvro(sourceDir, os); 56 } 57 }
2、Java读hdfs上的avro文件
1 import java.io.IOException; 2 import java.io.InputStream; 3 4 import org.apache.avro.file.DataFileStream; 5 import org.apache.avro.generic.GenericDatumReader; 6 import org.apache.avro.generic.GenericRecord; 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FileSystem; 9 import org.apache.hadoop.fs.Path; 10 import org.apache.hadoop.io.IOUtils; 11 12 public class HdfsReadAvro { 13 14 15 public static void readFromAvro(InputStream is) throws IOException { 16 DataFileStream<Object> reader = new DataFileStream<Object>(is, 17 new GenericDatumReader<Object>()); 18 for (Object o : reader) { 19 GenericRecord r = (GenericRecord) o; 20 System.out.println(r.get("username")+ ":"+r.get("password")); 21 } 22 IOUtils.cleanup(null, is); 23 IOUtils.cleanup(null, reader); 24 } 25 26 public static void main(String[] args) throws Exception { 27 Configuration config = new Configuration(); 28 FileSystem hdfs = FileSystem.get(config); 29 Path destFile = new Path(args[0]); 30 InputStream is = hdfs.open(destFile); 31 readFromAvro(is); 32 } 33 }
时间: 2024-11-07 14:10:30