FileSystem实例化过程

HDFS案例代码

Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration);

InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));
OutputStream out = new FileOutputStream(new File("log4j_download.properties"));
IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流

FileSystem.java

static final Cache CACHE = new Cache();

public static FileSystem get(URI uri, Configuration conf) throws IOException {
    String scheme = uri.getScheme();   //hdfs
    String authority = uri.getAuthority();  //hadoop000:8020

    return CACHE.get(uri, conf);
}

FileSystem get(URI uri, Configuration conf) throws IOException{
    Key key = new Key(uri, conf);
    return getInternal(uri, conf, key);
}

private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{
    FileSystem fs;
    synchronized (this) {
        fs = map.get(key);
    }

    //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例
    if (fs != null) {
        return fs;
    }

    fs = createFileSystem(uri, conf);
    synchronized (this) {
        FileSystem oldfs = map.get(key);
        ... //放入到CACHE中秋
        return fs;
    }
}

private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
    Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem
    FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf);
    fs.initialize(uri, conf); //初始化DistributedFileSystem
    return fs;
}

public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException {
    if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false
        loadFileSystems();
    }
    Class<? extends FileSystem> clazz = null;
    if (conf != null) {
        clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性
    }
    if (clazz == null) {
        clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem
    }
    if (clazz == null) {
        throw new IOException("No FileSystem for scheme: " + scheme);
    }
    return clazz;
}

private static void loadFileSystems() {
    synchronized (FileSystem.class) {
        if (!FILE_SYSTEMS_LOADED) {
            ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class);
            for (FileSystem fs : serviceLoader) {
                SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass());
            }
            FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过
        }
    }
}

loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:

file=class org.apache.hadoop.fs.LocalFileSystem,
ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem,
hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem,
hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem,
webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem,
s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem,
viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem,
swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem,
har=class org.apache.hadoop.fs.HarFileSystem,
s3=class org.apache.hadoop.fs.s3.S3FileSystem,
hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem

DistributedFileSystem.java

DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient

@Override
public void initialize(URI uri, Configuration conf) throws IOException {
    super.initialize(uri, conf);
    setConf(conf);

    String host = uri.getHost();  //hadoop000

    this.dfs = new DFSClient(uri, conf, statistics);
    this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
    this.workingDir = getHomeDirectory();
}

DFSClient.java

final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口

public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException {

    NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class);
    this.dtService = proxyInfo.getDelegationTokenService();
    this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB
}

NameNodeProxies.java

public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException {
    Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface);
    return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);
}

public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface,
    UserGroupInformation ugi, boolean withRetries) throws IOException {
    Text dtService = SecurityUtil.buildTokenService(nnAddr);

    T proxy;
    if (xface == ClientProtocol.class) {
      proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries);
    } ...
    return new ProxyAndInfo<T>(proxy, dtService);
}

private static ClientProtocol createNNProxyWithClientProtocol(
    InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException {

    //Client与NameNode的RPC交互接口
    final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
    ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
        ClientNamenodeProtocolPB.class, version, address, ugi, conf,
        NetUtils.getDefaultSocketFactory(conf),
        org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy)
            .getProxy();

    if (withRetries) {
        //使用jdk的动态代理创建实例
        proxy = (ClientNamenodeProtocolPB) RetryProxy.create(
          ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>(
              ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy);
    }
    return new ClientNamenodeProtocolTranslatorPB(proxy);
}

RetryProxy.java

public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) {
    return Proxy.newProxyInstance(
        proxyProvider.getInterface().getClassLoader(),
        new Class<?>[] { iface },
        new RetryInvocationHandler<T>(proxyProvider, retryPolicy)
    );
}

获取FileSystem实例源码分析总结:

1、FileSystem.get通过反射实例化了一个DistributedFileSystem;

2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;

3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;

4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。

时间: 2024-08-28 03:22:20

FileSystem实例化过程的相关文章

深夜睡不着,第二篇随笔,说说js的创建实例化过程

媳妇白天加班太累了,我呢,白天睡太多了,晚上太过于亢奋,自己一个人偷偷的拿着笔记本到客厅写博客~ 上一篇可能很多人看到了觉得就是个joke,那个真的是一个joke,但是在实际开发过程中,很多年轻的coder对于写不写分号很不以为然,要知道,真实生产环境下的代码要远比我栗子中给的代码要复杂得多,因此很有可能不用我的误导,你就看不出来,因此浪费了一下午的宝贵开发时间,所以写代码还是要规范一些. 第二篇文章我依旧不想讲太过于深入的技术,还是说两个“花边新闻”,聊以自慰罢了,看官有兴致你就看,没兴致也可

Mybatis深入之DataSource实例化过程

Mybatis深入之DataSource实例化过程 简介 主要介绍Mybatis启动过程中DataSource实例化的过程.为后面解析一个完整SQL执行过程做个前章. Mybatis中DataSource体系 MybatisDataSource整体简介 Mybatis中关于数据库的类都在org.apache.ibatis.datasource包中 Mybatis配置文件中关于数据库的配置: <environments default="development"> <e

java4android (继承中的子类实例化过程)

生成子类的过程 见代码: class Person { String name; int age; Person(){ System.out.print("Person的无参数构造函数"); } Person(String name,int age){ this.name = name; this.age = age; System.out.print("Person的有2个参数的构造函数"); } void eat(){ System.out.print(&quo

【Spring源码分析】非懒加载的Bean实例化过程(下篇)

doCreateBean方法 上文[Spring源码分析]非懒加载的Bean实例化过程(上篇),分析了单例的Bean初始化流程,并跟踪代码进入了主流程,看到了Bean是如何被实例化出来的.先贴一下AbstractAutowireCapableBeanFactory的doCreateBean方法代码: 1 protected Object doCreateBean(final String beanName, final RootBeanDefinition mbd, final Object[]

面向对象(子父类中构造函数的特点-子类实例化过程)

/* 3,子父类中的构造函数. 在对子类对象进行初始化时,父类的构造函数也会运行, 那是因为子类的构造函数默认第一行有一条隐式的语句 super(); super():会访问父类中空参数的构造函数.而且子类中所有的构造函数默认第一行都是super(); 为什么子类一定要访问父类中的构造函数. 因为父类中的数据子类可以直接获取.所以子类对象在建立时,需要先查看父类是如何对这些数据进行初始化的.//指在在父类构造函数初始化 所以子类在对象初始化时,要先访问一下父类中的构造函数. 如果要访问父类中指定

一个对象的实例化过程【重点】

一.过程  Person p = new Person();  1,JVM会去读取指定路径下的Person.class文件,并加载进内存,    并会先加载Person的父类(如果有直接父类的情况下)  2,在堆内存中开辟空间,分配地址.  3,并在对象空间中,对对象中的属性进行默认初始化  4,调用对应的构造函数,进行初始化  5,在构造函数中,第一行会先调用父类中的构造函数进行初始化.  6,父类初始化完毕后,再对子类的属性,进行显示初始化.  7,指定构造函数的特定初始化  8,初始化完毕

python基础8之类的实例化过程剖析

一.概述 之前我们说关于python中的类,都一脸懵逼,都想说,类这么牛逼到底是什么,什么才是类?下面我们就来讲讲,什么是类?它具有哪些特性. 二.类的语法 2.1 语法 class dog(object): #用class定义类 "dog class" #对类的说明 def __init__(self,name): #构造函数或者是构造方法,也可以叫初始化方法 self.name = name def sayhi(self): #类方法 "sayhi funcation&q

【python】-- 类的实例化过程、特征、共有属性和私有属性

实例化过程 1.类的定义和语法 class dog(object): #用class定义类 "dog class" #对类的说明 def __init__(self,name): #构造函数或者是构造方法,也可以叫初始化方法 self.name = name def sayhi(self): #类方法 "sayhi funcation" #对类方法的说明 print("hello,i am a dog,my name is ",self.name

10. JavaSE-子类实例化过程 & 构造方法间调用

子类的实例化过程: 子类中所有的构造函数默认都会访问父类中空参数的构造函数. class Parent { Parent(){ System.out.println("parent class run"); } } class Children extends Parent { Children(){ //super();//调用的就是父类空参数的构造函数 System.out.println("children class run"); //return; /*说