hive执行流程(2)-CommandProcessor类

在 上一篇的CliDriver 类中介绍了CliDriver 类会引用到CommandProcessor相关类,主要是根据命令来判断具体实现类,比如通过本地的hive cli启动时,运行hive的命令(非list/source/shell命令等)时在processCmd方法中有如下实现:

try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);// 根据命令判断具体的CommandProcessor 实现类
        ret = processLocalCmd(cmd, proc, ss);
      } catch (SQLException e) {
        console.printError( "Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }

具体的决定什么样的命令对应什么样的具体实现类由 CommandProcessorFactory 规定:如果是set,reset,dfs,add delete,compile等命令,返回对应的CommandProcessor实现类。其余有效命令比如select,insert 都是返回Driver类。

CommandProcessor相关类在org.apache.hadoop.hive.ql.processors包中,类的具体的uml图如下:

简单看下几个类的实现:

1.HiveCommand类,是一个迭代类,定义了非sql的一些语句

public enum HiveCommand {
  SET(),
  RESET(),
  DFS(),
  ADD(),
  DELETE(),
  COMPILE();
  private static final Set<String> COMMANDS = new HashSet<String>();
  static {
    for (HiveCommand command : HiveCommand. values()) {
      COMMANDS.add(command.name());
    }
  }
  public static HiveCommand find(String[] command) {
    if (null == command){
      return null;
    }
    String cmd = command[0];
    if (cmd != null) {
      cmd = cmd.trim().toUpperCase();
      if (command. length > 1 && "role".equalsIgnoreCase(command[1])) {
        // special handling for set role r1 statement
        return null ;
      } else if (COMMANDS .contains(cmd)) {
        return HiveCommand. valueOf(cmd);
      }
    }
    return null;
  }
}

2.CommandProcessorFactory 类,主要用于获取具体的命令实现类

主要定义了get和getForHiveCommand方法

方法调用get----->getForHiveCommand,其中getForHiveCommand会调HiveCommand 类,HiveCommand 类是一个枚举类型,定义了一些命令。

getForHiveCommand方法中:
  public static CommandProcessor getForHiveCommand(String[] cmd, HiveConf conf)
      throws SQLException {
    HiveCommand hiveCommand = HiveCommand.find(cmd);  // sql语句返回值为null
    if (hiveCommand == null || isBlank(cmd[0])) {
      return null;
    }
    if (conf == null) {
      conf = new HiveConf();
    }
    Set<String> availableCommands = new HashSet<String>();
    for (String availableCommand : conf.getVar(HiveConf.ConfVars.HIVE_SECURITY_COMMAND_WHITELIST).split( ",")) {
      availableCommands.add(availableCommand.toLowerCase().trim());
    }
    if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
      throw new SQLException("Insufficient privileges to execute " + cmd[0], "42000");
    }
    switch (hiveCommand) {  // 每种语句对应的具体的processor类
      case SET:
        return new SetProcessor();
      case RESET:
        return new ResetProcessor();
      case DFS:
        SessionState ss = SessionState.get();
        return new DfsProcessor(ss.getConf());
      case ADD:
        return new AddResourceProcessor();
      case DELETE:
        return new DeleteResourceProcessor();
      case COMPILE:
        return new CompileProcessor();
      default:
        throw new AssertionError("Unknown HiveCommand " + hiveCommand);
    }
  }
get方法:
  public static CommandProcessor get(String[] cmd, HiveConf conf)
      throws SQLException {
    CommandProcessor result = getForHiveCommand(cmd, conf);
    if (result != null) {
      return result;  // 如果result不为空,即命令在HiveCommand 的迭代器中定义的话,直接返回对应的结果
    }
    if (isBlank(cmd[0])) {
      return null;
    } else {  // 为空的话返回Driver类的实例
      if (conf == null) {
        return new Driver();    
      }
      Driver drv = mapDrivers.get(conf);
      if (drv == null) {
        drv = new Driver();
        mapDrivers.put(conf, drv);
      }
      drv.init();
      return drv;
    }
  }

3.CommandProcessorResponse类封装了processor的返回信息,比如返回码,错误信息等。

4.CommandProcessor 类是一个接口,具体的实现类由下面几个:

AddResourceProcessor/CompileProcessor/DeleteResourceProcessor/DfsProcessor/ResetProcessor/SetProcessor/Driver

主要实现方法在各个实现类的run方法中,run方法返回一个CommandProcessorResponse的对象。

下面简单的说下常用的几个实现类:

a.AddResourceProcessor类是处理add xxx命令的。

主要有两个步骤:

1)判断命令的合法性(长度,类型是否在FILE,JAR,ARCHIVE 3种之内)

2)调用SessionState的add_resource方法(

SessionState.add_resource方法---->调用SessionState.downloadResource--->
调用FileSystem的copyToLocalFile方法,把文件下载到本地

)

 public CommandProcessorResponse run(String command) {
    SessionState ss = SessionState.get();
    command = new VariableSubstitution().substitute(ss.getConf(),command);
    String[] tokens = command.split( "\\s+");
    SessionState.ResourceType t;
    if (tokens. length < 2
        || (t = SessionState.find_resource_type(tokens[0])) == null) {
      console.printError( "Usage: add ["
          + StringUtils.join(SessionState .ResourceType.values(), "|" )
          + "] <value> [<value>]*" );
      return new CommandProcessorResponse(1);
    }
    for ( int i = 1; i < tokens.length ; i++) {
      String resourceFile = ss.add_resource(t, tokens[i]);
      if(resourceFile == null){
        String errMsg = tokens[i]+ " does not exist.";
        return new CommandProcessorResponse(1,errMsg,null);
      }
    }
    return new CommandProcessorResponse(0);
  }

b.相反的DeleteResourceProcessor是用来处理delete  xxx命令的。

最终调用了SessionState的delete_resource方法,把resource从HashMap中去掉。

SessionState的delete_resource方法
  public boolean delete_resource(ResourceType t, String value) {
    if (resource_map.get(t) == null) {
      return false;
    }
    if (t. hook != null) {
      if (!t. hook.postHook( resource_map.get(t), value)) {
        return false;
      }
    }
    return ( resource_map.get(t).remove(value));
  }

c.DfsProcessor类用来处理dfs 命令,即已“!dfs”开头的命令,最终调用了FsShell的run方法

d.SetProcessor类用来处理set xxx等命令,可以用来设置参数,变量等。

设置参数时

1)以system: 开头的调用了 System.getProperties().setProperty方法。

比如

hive> set system:user.name=xxxx;
hive> set system:user.name;    
system:user.name=xxxx

2)以hiveconf:开头:

调用了HiveConf的verifyAndSet方法

3)以hivevar:开头:

ss.getHiveVariables().put方法

Driver的实现比较复杂,放在下篇讲解。

时间: 2024-10-25 19:00:02

hive执行流程(2)-CommandProcessor类的相关文章

hive执行流程(3)-Driver类分析1Driver类整体流程

Driver类是对 org.apache.hadoop.hive.ql.processors.CommandProcessor.java 接口的实现,重写了run方法,定义了常见sql的执行方式. public class Driver implements CommandProcessor 具体的方法调用顺序: run--->runInternal--->(createTxnManager+recordValidTxns)----->compileInternal---> com

hive执行流程分析

转自:http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163 入口:bin/hive脚本中,环境检查后执行ext中的cli.sh,进入主类:CliDriver.main. CliDriver.main:进入cli.processLine,处理分号";"分割为一条一条语句,再进processCmd. processCmd:处理quit/exit,再处理source,处理!,处理list:else建立CommandP

HIVE 执行流程

Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口 原文地址:https://www.cnblogs.com/xiangyuguan/p/11381323.html

Hive SQL的执行流程

[为什么要了解hive执行流程] .当我们写了一个sql,但是执行起来很慢,这时如果我们知道这个sql的底层执行流程是怎样的,就会比较容易去优化 .如果我们在面试中被问及对hive的理解,如果说就是写sql会显得很片面,如果我们了解hive的执行流程,就会知道,虽然表面上是写sql,但是在从hive的sql,到最终出来执行结果,中间经历了MR流程,其中MR的map,combiner,shuffle,reduce具体是执行了hive的那个部分,这样就会比较全面. [分析基于hadoop之上的SQL

Hive SQL执行流程分析

转自 http://www.tuicool.com/articles/qyUzQj 最近在研究Impala,还是先回顾下Hive的SQL执行流程吧. Hive有三种用户接口: cli (Command line interface) bin/hive或bin/hive –service cli 命令行方式(默认) hive-server/hive-server2 bin/hive –service hiveserver 或bin/hive –service hiveserver2 通过JDBC/

hive之执行流程

一.简介 hive运行的本质就是将hql语句,转换为一组操作符 operator.这里的 operator 代表 mapreduce操作和hdfs的操作,是hive执行hql语句的最小单位. 二.几个典型语句的分析 1.join 2.group by 3.order by shuffle 的排序,二次排序 4.去重 三.注意 以上这几个关键字的出现,都会转化为mapreduce. 原文地址:https://www.cnblogs.com/zhangxiaofan/p/11110448.html

spark-sql执行流程分析

spark-sql 架构 图1 图1是sparksql的执行架构,主要包括逻辑计划和物理计划几个阶段,下面对流程详细分析. sql执行流程 总体流程 parser:基于antlr框架对 sql解析,生成抽象语法树 变量替换,通过正则表达式找出符合规则的字符串,替换成系统缓存环境的变量 SQLConf中的`spark.sql.variable.substitute`,默认是可用的:参考` SparkSqlParser` parser:将antlr的tree转成spark catalyst的Logi

(一)熟悉执行流程——基于ThinkPHP3.2的内容管理框架OneThink学习

ThinkPHP作为国内具有代表性的PHP框架,经过多年的发展,受到越来越多公司与开发者的青睐.我也在忙里偷闲中抽出部分时间,来学习这个优秀的框架.在开始学习这个框架时,最好通过实例来学习,更容易结合实际的生产情况,促进学习的效果:这里我就选择由ThinkPHP团队开发的基于ThinkPHP3.2的内容管理框架OneThink来学习,从了解它的执行流程→熟悉流程中各个细节→了解模版标签→自己实际去使用标签→再了解它的实际执行过程……通过这样一个流程来熟悉如何基于ThinkPHP开发出一套CMS系

使用Caffe进行手写数字识别执行流程解析

之前在 http://blog.csdn.net/fengbingchun/article/details/50987185 中仿照Caffe中的examples实现对手写数字进行识别,这里详细介绍下其执行流程并精简了实现代码,使用Caffe对MNIST数据集进行train的文章可以参考  http://blog.csdn.net/fengbingchun/article/details/68065338 : 1.   先注册所有层,执行layer_factory.hpp中类LayerRegis