大数据阶段的重要课程划分
离线分析 : hadoop生态圈 HDFS, MapReduce(概念偏多), hive(底层是MapReduce), 离线业务分析80%都是使用hive
实时分析 : spark
数据结构 : 二叉树(面试) 动态规划, redis数据库, SSM三大框架
1. spring
2. springMVC
3. mybatis
HDFSAPI
HDFS创建目录
@Test
public void mkdirTest() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 创建目录
fs.mkdirs(new Path("/customMk"));
//3 关闭资源
fs.close();
System.out.println("创建完成");
}
HDFS删除目录
@Test
public void deleteMk() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 创建目录
fs.delete(new Path("/customMk"),true);
//3 关闭资源
fs.close();
System.out.println("删除完成");
}
HDFS修改文件名
@Test
public void testRename() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 操作HDFS API 修改名称
fs.rename(new Path("/a.txt"), new Path("/d.txt"));
//3 关闭资源
fs.close();
System.out.println("修改完成");
}
获得文件详细信息
@Test
public void getFileInfo() throws IOException {
//1 获得文件系统的连接
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
/*
*
* while(String str:strs){
*
* System.out.println(str);
* }
*
* */
RemoteIterator<LocatedFileStatus> listfiles=fs.listFiles(new Path("/"), true);
while(listfiles.hasNext()) {
LocatedFileStatus status=listfiles.next();
//输出文件名称
System.out.println(status.getPath().getName());
//输出文件块
System.out.println(status.getBlockSize());
//文件的长度
System.out.println(status.getLen());
//文件的权限
System.out.println(status.getPermission());
//文件的副本数量
System.out.println(status.getReplication());
//文件的所属者
System.out.println(status.getOwner());
}
}
hadoop环境变量
- 解压hadoop2.7.2到一个非中文的路径下
- 为hadoop设置环境变量** HADOOP_HOME**
- 配置path路径(不详细介绍,网上很多)%HADOOP_HOME%/bin
- 把编译过后的本地库文件拷贝到hadoop目录下的bin文件夹下
通过IO流操作HDFS
字符流和字节流
- 字节流: 字节流什么都能读 , 字节, 图片 , 音乐, 视频, 文件
- 字符流: 只能读取文本
- 字节流直接操作的是文件的本身, 字符流操作之中还存在缓冲区, 使用普通字节流处理中文的时候想要读取很多行可能出现乱码
- 使用字节流操作文本的时候, 如果不关闭资源, 同样可以把内容输出到文本中, 但是使用字符流操作文本的话, 如果没有关闭资源, 或者没有清空缓存区. 内容时不会输出到文本中的
IO流上传
@Test
public void filePut() throws IllegalArgumentException, IOException {
//1 获得文件系统
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
// 2创建输入流
FileInputStream input=new FileInputStream(new File("c:/hello.txt"));
// 3获得输出流
FSDataOutputStream fos=fs.create(new Path("/hello21.txt"));
// 流拷贝 把input中的内容交给 output输出
IOUtils.copyBytes(input, fos, config);
// 5关闭资源
IOUtils.closeStream(input);
IOUtils.closeStream(fos);
}
IO流下载
@Test
public void getFile() throws IllegalArgumentException, IOException {
//1 获得文件系统
Configuration config=new Configuration();
config.set("fs.defaultFS", "hdfs://10.0.152.47:8020");
FileSystem fs=FileSystem.get(config);
//2 获得输入流,从HDFS读取
FSDataInputStream input=fs.open(new Path("/hello21.txt"));
//3 获得输出流
FileOutputStream output=new FileOutputStream(new File("c:/hello21.txt"));
//4 流拷贝
IOUtils.copyBytes(input, output, config);
IOUtils.closeStream(input);
IOUtils.closeStream(output);
}
MapReduce核心思想
- MapReduce优点:
- 编写分布式程序简单, 可以简单的实现两个接口, 皆可以开发出一套分布式计 算程序
- 由于是在hadoop上运行的, 可以很方便的横向扩展
- MapReduce缺点:
- 延迟高, 不能实时进行分析处理, 只能做离线分析
- MapReduce分阶段处理:
- 文本文件 --> inputFormat --> map --> shuffle --> reduce --> outputFormat
WordCount单词统计案例
map阶段
对数据进行切片, 每一个切片对应一个mapTask , 假如一个文件被切成了10个切片, 就存在10个mapTask任务并行运行, 互不干扰
reduce阶段
把每个mapTask阶段的输出进行整合
步骤:
- 文本 : inputFormat 输出到 map<K,V>
java python hadoop
hdfs html css
javascript scala
scala css hdfs
- FileInputFormat 会读取一行文本输出给map 切分为 <>k ,v > 对, key 是文本中的偏移量, V 是文本中的内容 <0, java python hadoop>
<1, hdfs html css >
<2, javascript scala > ... <10 , scala css hdfs >
String word = value.toString(); ---->java python hadoop String[ ] words = word.split(" ");----> [java , python , hadoop]
- 接下来 , map 要输出给reduce <K, V > K:文本的内容, V: 单词出现的次数
- Reduce进行接收 <K, V > K:文本的内容, V: 单词出现的次数
在整个MapReduce运行的时候存在如下进程: 1. mapTask 2. reduceTask 3. MRAPPMaster 任务管理的进程
hadoop的序列化
如果要进行对象的传输, 则传输的内容必须进行序列化, 所以hadoop就创建了一些序列化类型
对象 | 序列化 |
---|---|
Long | longWritable |
Int | IntWrieable |
String | Text |
Wordcount代码
本地运行
1. 编写WordCountMap
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
*
* @author wubo
*
*Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
*
*输入 key 文本中偏移量
*value 文本中的内容
*
*输出 key 是文本的内容
*
*value 是单词出现的次数
*/
public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text k=new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
//1 获取一行的数据
String line=value.toString();
//2 切割 按照空格切分
String[] words=line.split(" ");
for(String word:words) {
k.set(word); //把String类型的word 转换为Text类型
//3 输出到Reduce
context.write(k, new IntWritable(1));
}
}
//需要实现Map方法编写业务逻辑
}
2. 编写WordCountReduce
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
*
* @author wubo
*
*
*hello 1
*hadoop 1
*
*hadoop 1
*
*hadoop 2
*
*把相同key的values进行累加
*/
public class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
// TODO Auto-generated method stub
int sum=0;
for(IntWritable count:values) {
sum+=count.get();
}
//输出
context.write(key, new IntWritable(sum));
}
}
3. 编写Driver
package org.qianfeng.wordcount;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Driver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1 获得配置信息
Configuration config=new Configuration();
// 实例化 job类 并且把配置信息传给job
Job job=Job.getInstance(config);
// 通过反射机制 加载主类的位置
job.setJarByClass(Driver.class);
//设置map和reduce类
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
//设置map的输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//设置redue的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//设置文件的输入 输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]]));
FileOutputFormat.setOutputPath(job, new Path(args[1]]));
//提交任务
boolean result=job.waitForCompletion(true);
System.exit(result?0:1);
}
}
打成jar包在集群上运行
1. 导出jar包
2. 上传jar包到Linux 在集群中运行
- 上传的方法很多, 这里不说
- 在Linux中启动hadoop
- 上传文件到HDFS
$HADOOP_HOME/bin/hdfs dfs -put a.txt /a.txt
- 输入如下命令
$HADOOP_HOME/bin/hadoop jar wc.jar /a.txt /output
- 打开文件查看结果
$HADOOP_HOME/bin/hdfs dfs -cat /output/pa...
原文地址:https://www.cnblogs.com/xiayangdream/p/9975158.html
时间: 2024-10-09 17:42:32