hive入口​CliDriver分析

CliDriver类是hive的入口类。

首先CliDriver类会通过OptionsProcessor类来parse输入的命令。比如解析-e,-s,-h等参数,然后把对应的值存放到对应的CliSessionState类的属性中,最后应用于CliDriver类中。

比如在executeDriver方法中,根据CliSessionState的属性对命令进行处理

CliDriver cli = new CliDriver();
    cli.setHiveVariables(oproc.getHiveVariables());  // 有变量相关的设置时
 
    // use the specified database if specified
    cli.processSelectDatabase(ss); 
    // Execute -i init files (always in silent mode)
    cli.processInitFiles(ss); // 指定了-i和加载.hiverc文件
    if (ss. execString != null ) {  // 指定了 -e时
      int cmdProcessStatus = cli.processLine(ss. execString);  
      return cmdProcessStatus;
    }
    try {   // 指定了-f时
      if (ss. fileName != null) {
        return cli.processFile(ss.fileName );
      }
    } catch (FileNotFoundException e) {
      System. err.println("Could not open input file for reading. (" + e.getMessage() + ")" );
      return 3;
    }

在CliDriver类方法的调用顺序主要有下面几种

1)add xxx/set/compile/reset等命令

main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd--对应processor类的run方法

2)sql命令

main-->run--->executeDriver---->processLine--->processCmd--->processLocalCmd---->Driver类run方法

3)shell命令

main-->run--->executeDriver---->processLine--->processCmd

其中CliDriver类中最重要的方法是processCmd,其定义了不同的命令不同的执行方式:

具体实现:

public int processCmd(String cmd) {
    CliSessionState ss = (CliSessionState) SessionState.get();
    ss.setLastCommand(cmd);
    // Flush the print stream, so it doesn‘t include output from the last command
    ss.err.flush();
    String cmd_trimmed = cmd.trim();
    String[] tokens = tokenizeCmd(cmd_trimmed);
    int ret = 0;
    if (cmd_trimmed.toLowerCase().equals( "quit") || cmd_trimmed.toLowerCase().equals("exit" )) { //如果是quit或者是exit,则直接退出jvm
      ss.close();
      System.exit(0);
    } else if (tokens[0].equalsIgnoreCase("source" )) {  // 如果是source xxx的情况,则按文件处理(调用processFile方法)
      String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
      File sourceFile = new File(cmd_1);
      if (! sourceFile.isFile()){
        console.printError( "File: "+ cmd_1 + " is not a file." );
        ret = 1;
      } else {
        try {
          this.processFile(cmd_1);
        } catch (IOException e) {
          console.printError( "Failed processing file "+ cmd_1 +" " + e.getLocalizedMessage(),
            stringifyException(e));
          ret = 1;
        }
      }
    } else if (cmd_trimmed.startsWith("!" )) {  // 以!开头的,做为shell命令执行,最终调用Runtime.getRuntime().exec(shell_cmd)
      String shell_cmd = cmd_trimmed.substring(1);
      shell_cmd = new VariableSubstitution().substitute(ss.getConf(), shell_cmd);  //这里也会进行变量替换
      // shell_cmd = "/bin/bash -c \‘" + shell_cmd + "\‘";
      try {
        Process executor = Runtime. getRuntime().exec(shell_cmd);
        StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, ss.out);
        StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, ss.err);
        outPrinter.start();
        errPrinter.start();
        ret = executor.waitFor();
        if (ret != 0) {
          console.printError( "Command failed with exit code = " + ret);
        }
      } catch (Exception e) {
        console.printError( "Exception raised from Shell command " + e.getLocalizedMessage(),
            stringifyException(e));
        ret = 1;
      }
    } else if (tokens[0].toLowerCase().equals("list" )) { // list命令时,调用SessionState的list_resource方法
      SessionState.ResourceType t;
      if (tokens. length < 2 || (t = SessionState.find_resource_type(tokens[1])) == null) {
        console.printError( "Usage: list ["
            + StringUtils.join(SessionState.ResourceType.values(), "|") + "] [<value> [<value>]*]");
        ret = 1;
      } else {
        List<String> filter = null;
        if (tokens.length >= 3) {
          System. arraycopy(tokens, 2, tokens, 0, tokens.length - 2);
          filter = Arrays. asList(tokens);
        }
        Set<String> s = ss.list_resource(t, filter);
        if (s != null && !s.isEmpty()) {
          ss.out.println(StringUtils.join(s, "\n"));
        }
      }
    } else if (ss.isRemoteMode()) { // remote mode -- connecting to remote hive server   //如果是远程模式,即hiveserver,调用HiveClient类的execute方法
      HiveClient client = ss.getClient();
      PrintStream out = ss.out;
      PrintStream err = ss.err;
      try {
        client.execute(cmd_trimmed);
        List<String> results;
        do {
          results = client.fetchN( LINES_TO_FETCH);
          for (String line : results) {
            out.println(line);
          }
        } while (results.size() == LINES_TO_FETCH);
      } catch (HiveServerException e) {
        ret = e.getErrorCode();
        if (ret != 0) { // OK if ret == 0 -- reached the EOF
          String errMsg = e.getMessage();
          if (errMsg == null) {
            errMsg = e.toString();
          }
          ret = e.getErrorCode();
          err.println( "[Hive Error]: " + errMsg);
        }
      } catch (TException e) {
        String errMsg = e.getMessage();
        if (errMsg == null) {
          errMsg = e.toString();
        }
        ret = -10002;
        err.println( "[Thrift Error]: " + errMsg);
      } finally {
        try {
          client.clean();
        } catch (TException e) {
          String errMsg = e.getMessage();
          if (errMsg == null) {
            errMsg = e.toString();
          }
          err.println( "[Thrift Error]: Hive server is not cleaned due to thrift exception: "
              + errMsg);
        }
      }
    } else { // local mode   // 剩下的情况都作为local模式,比如add xxx,set xxxx,select/insert xxx/show tables/create table,databse/use xxx等命令。
      try {
        CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf);  //会先根据命令获取对应的CommandProcessor 实现类
        ret = processLocalCmd(cmd, proc, ss);  //并调用processLocalCmd方法
      } catch (SQLException e) {
        console.printError( "Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
          org.apache.hadoop.util.StringUtils.stringifyException(e));
        ret = 1;
      }
    }
    return ret;
  }

而processLocalCmd方法会将CommandProcessor的实例作为参数传入,并根据不同的CommandProcessor实现类,来调用不同的类的run方法。

  int processLocalCmd (String cmd, CommandProcessor proc, CliSessionState ss)

hive入口​CliDriver分析,布布扣,bubuko.com

时间: 2024-11-03 21:10:01

hive入口​CliDriver分析的相关文章

hive执行流程分析

转自:http://blog.csdn.net/gexiaobaohelloworld/article/details/7719163 入口:bin/hive脚本中,环境检查后执行ext中的cli.sh,进入主类:CliDriver.main. CliDriver.main:进入cli.processLine,处理分号";"分割为一条一条语句,再进processCmd. processCmd:处理quit/exit,再处理source,处理!,处理list:else建立CommandP

通过hiveserver远程服务构建hive web查询分析工具

(1)hive 三种启动方式及用途,本文主要关注通过hiveserver(可jdbc连接)的方式启动 1, hive  命令行模式,直接输入/hive/bin/hive的执行程序,或者输入 hive --service cli 用于linux平台命令行查询,查询语句基本跟mysql查询语句类似 2, hive  web界面的启动方式,hive --service hwi 用于通过浏览器来访问hive,提供基本的基于web的hive查询服务,可以看作是hive数据平台的demo, 具体用法可见:h

大数据技术之_18_大数据离线平台_04_数据分析 + Hive 之 hourly 分析 + 常用 Maven 仓库地址

二十.数据分析20.1.统计表20.2.目标20.3.代码实现20.3.1.Mapper20.3.2.Reducer20.3.3.Runner20.3.4.测试二十一.Hive 之 hourly 分析21.1.目标21.2.目标解析21.3.创建 Mysql 结果表21.4.Hive 分析21.4.1.创建 Hive 外部表,关联 HBase 数据表21.4.2.创建临时表用于存放 pageview 和 launch 事件的数据(即存放过滤数据)21.4.3.提取 e_pv 和 e_l 事件数据

Flume+Hadoop+Hive的离线分析系统基本架构

PS:历史原因作者账号名为:ymh198816,但事实上作者的生日并不是1988年1月6日 最近在学习大数据的离线分析技术,所以在这里通过做一个简单的网站点击流数据分析离线系统来和大家一起梳理一下离线分析系统的架构模型.当然这个架构模型只能是离线分析技术的一个简单的入门级架构,实际生产环境中的大数据离线分析技术还涉及到很多细节的处理和高可用的架构.这篇文章的目的只是带大家入个门,让大家对离线分析技术有一个简单的认识,并和大家一起做学习交流. 离线分析系统的结构图 整个离线分析的总体架构就是使用F

Hive综合案例分析之开窗函数使用

知识点: 1.Hive的窗口和分析函数进阶 CUME_DIST 小于等于当前行值的行数 / 总行数 PERCENT_RANK 当前rank值-1 / 总行数-1 NTILE 将窗口分成n片 LEAD(col, n, default) 窗口内下n行值 LAG(col, n , default) 窗口内上n行值 FIRST_VALUE 窗口内第一个值 LAST_VALUE 窗口内最后一个值 2.分析函数中包含三个分析子句 分组(Partition By) 排序(Order By) 窗口(Window

Flume-NG + HDFS + HIVE 日志收集分析

最近做了一个POC,目的是系统日志的收集和分析,此前有使用过splunk,虽然用户体验很好,但一是价格昂贵,二是不适合后期开发(splunk已经推出了SDK,后期开发已经变得非常容易).在收集TB级别的日志量上flume-ng是更好的选择,因为后面的存储是扩展性极佳的HDFS.先简要介绍一下测试环境: 5台VM机器(RHEL6.3): 1, collector01 2, namenode 3, datanode01 4, datanode02 5, datanode03 第一台机器collect

Hive综合案例分析之用户上网行为分析

知识点:1.Hive复合数据类型:array collect_set collect_list array_contains sort_array 2.lateral view explode(array) lateral view out 需求: click_log : cookie_id     ad_id      time ad_list: ad_id     ad_url     catalog_list 统计: cookie_catalog: cookie_id     ad_cat

Swift程序入口深度分析

1.swift为什么不需要main 在c/c++及其它语言中都有一个main函数,程序从main作为起点,开始执行程序,如下: int main(int argc, const char * argv[]) { printf("Hello, World!\n"); return 0; } main函数实际上是一个特殊的函数,为了能找到程序入口,大多楼语言都约定main()函数作为入口.那么为什么在Swift中没有这样的一个函数呢?先看一下官方的解释 Code written at gl

Hive的元数据分析

Hive 将元数据存储在 RDBMS 中,一般常用 MySQL 和 Derby.默认情况下,Hive 元数据保存在内嵌的 Derby 数据库中,只能允许一个会话连接,只适合简单的测试.实际生产环境中不适用, 为了支持多用户会话,则需要一个独立的元数据库,使用 MySQL 作为元数据库,Hive 内部对 MySQL 提供了很好的支持,配置一个独立的元数据库需要增加以下步骤.