hive执行流程(3)-Driver类分析1Driver类整体流程

Driver类是对

org.apache.hadoop.hive.ql.processors.CommandProcessor.java

接口的实现,重写了run方法,定义了常见sql的执行方式.

public class Driver implements CommandProcessor

具体的方法调用顺序:

run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal--->
compile--analyzer(BaseSemanticAnalyzer)--->execute

其中compile和execute是两个比较重要的方法:

compile用来完成语法和语义的分析,生成执行计划

execute执行物理计划,即提交相应的mapredjob

通过打印perflog可以看到Driver类的简单地时序图:

下面来看下Driver类的几个常用的方法实现:

1)createTxnManager 用来获取目前设置的用于实现lock的类,比如:

org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager

2)checkConcurrency 用来判断当前hive设置是否支持并发控制:

boolean supportConcurrency = conf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

主要是通过判断hive.support.concurrency参数,默认是false

3)getClusterStatus 调用JobClient类的getClusterStatus方法来获取集群的状态:

public ClusterStatus getClusterStatus() throws Exception {
    ClusterStatus cs;
    try {
      JobConf job = new JobConf(conf , ExecDriver.class);
      JobClient jc = new JobClient(job);
      cs = jc.getClusterStatus();
    } catch (Exception e) {
      e.printStackTrace();
      throw e;
    }
    LOG.info( "Returning cluster status: " + cs.toString());
    return cs;
  }

4)getSchema   //返回表的schema信息

5)

doAuthorization/doAuthorizationV2/getHivePrivObjects

用来在开启权限验证情况下对sql的权限检测操作

6)

getLockObjects/acquireReadWriteLocks/releaseLocks

都是和锁相关的方法 ,其中getLockObjects用来获取锁的对象(锁的路径,锁的模式等),最终返回一个包含所有锁的list,acquireReadWriteLocks用来控制获取锁,releaseLocks用来释放锁:

getLockObjects:
  private List<HiveLockObj> getLockObjects(Database d, Table t, Partition p, HiveLockMode mode)
      throws SemanticException {
    List<HiveLockObj> locks = new LinkedList<HiveLockObj>();
    HiveLockObjectData lockData =
      new HiveLockObjectData( plan.getQueryId(),
                             String. valueOf(System.currentTimeMillis ()),
                             "IMPLICIT",
                             plan.getQueryStr());
    if (d != null) {
      locks.add( new HiveLockObj(new HiveLockObject(d.getName(), lockData), mode));  //数据库层面的锁
      return locks;
    }
    if (t != null) {  // 表层面的锁
      locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
      locks.add( new HiveLockObj(new HiveLockObject(t, lockData), mode));
      mode = HiveLockMode.SHARED;
      locks.add( new HiveLockObj(new HiveLockObject(t.getDbName(), lockData), mode));
      return locks;
    }
    if (p != null) { //分区层面的锁
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
      if (!(p instanceof DummyPartition)) {
        locks.add( new HiveLockObj(new HiveLockObject(p, lockData), mode));
      }
      // All the parents are locked in shared mode
      mode = HiveLockMode.SHARED;
      // For dummy partitions, only partition name is needed
      String name = p.getName();
      if (p instanceof DummyPartition) {
        name = p.getName().split( "@")[2];
      }
      String partialName = "";
      String[] partns = name.split( "/");
      int len = p instanceof DummyPartition ? partns.length : partns.length - 1;
      Map<String, String> partialSpec = new LinkedHashMap<String, String>();
      for ( int idx = 0; idx < len; idx++) {
        String partn = partns[idx];
        partialName += partn;
        String[] nameValue = partn.split( "=");
        assert(nameValue.length == 2);
        partialSpec.put(nameValue[0], nameValue[1]);
        try {
          locks.add( new HiveLockObj(
                      new HiveLockObject(new DummyPartition(p.getTable(), p.getTable().getDbName()
                                                            + "/" + p.getTable().getTableName()
                                                            + "/" + partialName,
                                                              partialSpec), lockData), mode));
          partialName += "/";
        } catch (HiveException e) {
          throw new SemanticException(e.getMessage());
        }
      }
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable(), lockData), mode));
      locks.add( new HiveLockObj(new HiveLockObject(p.getTable().getDbName(), lockData), mode));
    }
    return locks;
  }

acquireReadWriteLocks调用了锁具体实现类的acquireLocks方法

releaseLocks调用了锁具体实现类的releaseLocks方法

7)

run方法是Driver类的入口方法,调用了runInternal方法,我们主要来看runInternal的方法,大体步骤:

运行hive.exec.driver.run.hooks中设置的hook,
运行HiveDriverRunHook相关类的的preDriverRun方法---->检测是否支持并发,并获取并发实现的类
--->compileInternal---->运行锁相关的操作(判断是否只对mapred job进行锁,获取锁等)
---->调用execute---->释放锁--->运行HiveDriverRunHook相关类的的postDriverRun方法
---->返回CommandProcessorResponse对象

相关代码:

private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
      throws CommandNeedRetryException {
    errorMessage = null;
    SQLState = null;
    downstreamError = null;
    if (!validateConfVariables()) {
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(conf , command);
    // Get all the driver run hooks and pre-execute them.
    List<HiveDriverRunHook> driverRunHooks;
    try {               //运行hive.exec.driver.run.hooks中设置的hook
      driverRunHooks = getHooks(HiveConf.ConfVars.HIVE_DRIVER_RUN_HOOKS, 
          HiveDriverRunHook. class);         
      for (HiveDriverRunHook driverRunHook : driverRunHooks) {
          driverRunHook.preDriverRun(hookContext); //运行HiveDriverRunHook相关类的的preDriverRun方法
      }
    } catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    // Reset the perf logger
    PerfLogger perfLogger = PerfLogger.getPerfLogger( true);
    perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.DRIVER_RUN);
    perfLogger.PerfLogBegin( CLASS_NAME, PerfLogger.TIME_TO_SUBMIT);
    int ret;
    boolean requireLock = false;
    boolean ckLock = false;
    try {
      ckLock = checkConcurrency();  //检测是否支持并发,并获取并发实现的类,比如常用的 org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager
      createTxnManager();
    } catch (SemanticException e) {
      errorMessage = "FAILED: Error in semantic analysis: " + e.getMessage();
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage, "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      ret = 10;
      return new CommandProcessorResponse(ret, errorMessage , SQLState );
    }
    ret = recordValidTxns();
    if (ret != 0) return new CommandProcessorResponse(ret, errorMessage, SQLState);
    if (!alreadyCompiled) {
      ret = compileInternal(command);  //调用compileInternal方法
      if (ret != 0) {
        return new CommandProcessorResponse(ret, errorMessage, SQLState);
      }
    }
    // the reason that we set the txn manager for the cxt here is because each
    // query has its own ctx object. The txn mgr is shared across the
    // same instance of Driver, which can run multiple queries.
    ctx.setHiveTxnManager( txnMgr);
    if (ckLock) {  //断是否只对mapred job进行锁,参数hive.lock.mapred.only.operation,默认为false
      boolean lockOnlyMapred = HiveConf.getBoolVar( conf, HiveConf.ConfVars.HIVE_LOCK_MAPRED_ONLY);
      if(lockOnlyMapred) {
        Queue<Task<? extends Serializable>> taskQueue = new LinkedList<Task<? extends Serializable>>();
        taskQueue.addAll( plan.getRootTasks());
        while (taskQueue.peek() != null) {
          Task<? extends Serializable> tsk = taskQueue.remove();
          requireLock = requireLock || tsk.requireLock();
          if(requireLock) {
            break;
          }
          if (tsk instanceof ConditionalTask) {
            taskQueue.addAll(((ConditionalTask)tsk).getListTasks());
          }
          if(tsk.getChildTasks()!= null) {
            taskQueue.addAll(tsk.getChildTasks());
          }
          // does not add back up task here, because back up task should be the same
          // type of the original task.
        }
      } else {
        requireLock = true;
      }
    }
    if (requireLock) { //获取锁
      ret = acquireReadWriteLocks();
      if (ret != 0) {
        try {
          releaseLocks( ctx.getHiveLocks());
        } catch (LockException e) {
          // Not much to do here
        }
        return new CommandProcessorResponse(ret, errorMessage, SQLState);
      }
    }
    ret = execute(); //job运行
    if (ret != 0) {
      //if needRequireLock is false, the release here will do nothing because there is no lock
      try {
        releaseLocks( ctx.getHiveLocks());
      } catch (LockException e) {
        // Nothing to do here
      }
      return new CommandProcessorResponse(ret, errorMessage , SQLState );
    }
    //if needRequireLock is false, the release here will do nothing because there is no lock
    try {
      releaseLocks( ctx.getHiveLocks());
    } catch (LockException e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    perfLogger.PerfLogEnd( CLASS_NAME, PerfLogger.DRIVER_RUN);
    perfLogger.close(LOG, plan);
    // Take all the driver run hooks and post-execute them.
    try {
      for (HiveDriverRunHook driverRunHook : driverRunHooks) {  //运行HiveDriverRunHook相关类的的postDriverRun方法
          driverRunHook.postDriverRun(hookContext);
      }
    } catch (Exception e) {
      errorMessage = "FAILED: Hive Internal Error: " + Utilities.getNameMessage(e);
      SQLState = ErrorMsg. findSQLState(e.getMessage());
      downstreamError = e;
      console.printError( errorMessage + "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return new CommandProcessorResponse(12, errorMessage , SQLState );
    }
    return new CommandProcessorResponse(ret);
  }

8)

再来看下compileInternal方法
  private static final Object compileMonitor = new Object();
  private int compileInternal(String command) {
    int ret;
    synchronized ( compileMonitor) {
      ret = compile(command);  //调用compile方法
    }
    if (ret != 0) {
      try {
        releaseLocks( ctx.getHiveLocks());
      } catch (LockException e) {
        LOG.warn("Exception in releasing locks. "
            + org.apache.hadoop.util.StringUtils.stringifyException(e));
      }
    }
    return ret;
  }

调用了compile方法,compile方法分析命令,生成Task,关于compile的具体实现后面详细讲解

9.execute方法,提交task并等待task运行完毕,并打印task运行的信息,比如消耗的时间等

(这里信息也比较多,后面单独讲解

时间: 2024-10-22 21:01:53

hive执行流程(3)-Driver类分析1Driver类整体流程的相关文章

从ViewRootImpl类分析View绘制的流程(一)

[出处:从ViewRootImpl类分析View绘制的流程 CSDN 废墟的树] 从上两篇博客 <从setContentView方法分析Android加载布局流程> 和 <从LayoutInflater分析XML布局解析成View的树形结构的过程> 中我们了解到Activity视图UI是怎么添加到Activity的根布局DecorView上面的. 我们知道Activity中的PhoneView对象帮我们创建了一个PhoneView内部类DecorView(父类为FrameLayou

从ViewRootImpl类分析View绘制的流程

[转载请注明出处:从ViewRootImpl类分析View绘制的流程 CSDN 废墟的树] 从上两篇博客 <从setContentView方法分析Android加载布局流程> 和 <从LayoutInflater分析XML布局解析成View的树形结构的过程> 中我们了解到Activity视图UI是怎么添加到Activity的根布局DecorView上面的. 我们知道Activity中的PhoneView对象帮我们创建了一个PhoneView内部类DecorView(父类为Frame

(转)linux内存源码分析 - 内存回收(整体流程)

http://www.cnblogs.com/tolimit/p/5435068.html------------linux内存源码分析 - 内存回收(整体流程) 概述 当linux系统内存压力就大时,就会对系统的每个压力大的zone进程内存回收,内存回收主要是针对匿名页和文件页进行的.对于匿名页,内存回收过程中会筛选出一些不经常使用的匿名页,将它们写入到swap分区中,然后作为空闲页框释放到伙伴系统.而对于文件页,内存回收过程中也会筛选出一些不经常使用的文件页,如果此文件页中保存的内容与磁盘中

linux内存源码分析 - 内存回收(整体流程)

本文为原创,转载请注明:http://www.cnblogs.com/tolimit/ 概述 当linux系统内存压力就大时,就会对系统的每个压力大的zone进程内存回收,内存回收主要是针对匿名页和文件页进行的.对于匿名页,内存回收过程中会筛选出一些不经常使用的匿名页,将它们写入到swap分区中,然后作为空闲页框释放到伙伴系统.而对于文件页,内存回收过程中也会筛选出一些不经常使用的文件页,如果此文件页中保存的内容与磁盘中文件对应内容一致,说明此文件页是一个干净的文件页,就不需要进行回写,直接将此

Cordova Android源码分析系列二(CordovaWebView相关类分析)

本篇文章是Cordova Android源码分析系列文章的第二篇,主要分析CordovaWebView和CordovaWebViewClient类,通过分析代码可以知道Web网页加载的过程,错误出来,多线程处理等. CordovaWebView类分析 CordovaWebView类继承了Android WebView类,这是一个很自然的实现,共1000多行代码.包含了PluginManager pluginManager,BroadcastReceiver receiver,CordovaInt

hive执行流程(2)-CommandProcessor类

在 上一篇的CliDriver 类中介绍了CliDriver 类会引用到CommandProcessor相关类,主要是根据命令来判断具体实现类,比如通过本地的hive cli启动时,运行hive的命令(非list/source/shell命令等)时在processCmd方法中有如下实现: try {         CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);// 根据命令判断具体的Co

hive认证相关类分析

目前的hive版本是支持authentication和authorization的(再加上计费就是3A了,哈哈), 在hive的java.org.apache.hadoop.hive.conf.HiveConf类中定义的权限相关的设置项有: HIVE_AUTHORIZATION_ENABLED("hive.security.authorization.enabled", false),   //是否开启权限验证 HIVE_AUTHORIZATION_MANAGER("hive

hive的shims相关类分析

在hive的源码中经常可以看到shims相关的类,shims相关类是用来兼容不同的hadoop和hive版本的,以HadoopShims为例org.apache.hadoop.hive.shims.HadoopShims是一个接口,具体的实现类为 org.apache.hadoop.hive.shims.Hadoop20Shims org.apache.hadoop.hive.shims.Hadoop20SShims org.apache.hadoop.hive.shims.Hadoop23Sh

Hive学习之路 (二十)Hive 执行过程实例分析

一.Hive 执行过程概述 1.概述 (1) Hive 将 HQL 转换成一组操作符(Operator),比如 GroupByOperator, JoinOperator 等 (2)操作符 Operator 是 Hive 的最小处理单元 (3)每个操作符代表一个 HDFS 操作或者 MapReduce 作业 (4)Hive 通过 ExecMapper 和 ExecReducer 执行 MapReduce 程序,执行模式有本地模式和分 布式两种模式 2.Hive 操作符列表 3.Hive 编译器的