Presto源码解读1-客户端提交查询-吕信(原创)

客户端请求处理就是接收交互式/非交互式命令,通过httpclient发送到服务器(coordinator),客户端通过httpclient更新执行状态打印给用户看,最后收集结果打印给用户。启动Presto cli 客户端的代码文件为:

presto-cli\src\main\java\com\facebook\presto\cli\Presto.java

public final class Presto

{

private Presto() {}

public static void main(String[] args)

throws Exception

{

Console console = singleCommand(Console.class).parse(args);

if (console.helpOption.showHelpIfRequested()) {

return;

}

console.run();

}

}

Console类对应的源文件为:presto-cli\src\main\java\com\facebook\presto\cli\Console.java,在该类中对查询的处理方法为:

private static void process(QueryRunner queryRunner, String sql, OutputFormat outputFormat, boolean interactive)

{

try (Query query = queryRunner.startQuery(sql)) {

query.renderOutput(System.out, outputFormat, interactive);

// update session properties if present

if (!query.getSetSessionProperties().isEmpty() || !query.getResetSessionProperties().isEmpty()) {

Map<String, String> sessionProperties = new HashMap<>(queryRunner.getSession().getProperties());

sessionProperties.putAll(query.getSetSessionProperties());

sessionProperties.keySet().removeAll(query.getResetSessionProperties());

queryRunner.setSession(withProperties(queryRunner.getSession(), sessionProperties));

}

}

catch (RuntimeException e) {

System.out.println("Error running command: " + e.getMessage());

if (queryRunner.getSession().isDebug()) {

e.printStackTrace();

}

}

}

Query.java StatementCliet.java Console.java QueryRunner.java

  1. 客户端初始化:console.run()

@Override

public void run()

{

//将登录的配置参数(--catalog –server –schema 日志级别等)装载到ClientSession,附录1:客户端默认session:

ClientSession session = clientOptions.toClientSession();

//命令行查询 --execute

boolean hasQuery = !Strings.isNullOrEmpty(clientOptions.execute);

//--file 执行的文件

boolean isFromFile = !Strings.isNullOrEmpty(clientOptions.file);

if (!hasQuery || !isFromFile) {

AnsiConsole.systemInstall();

}

initializeLogging(session.isDebug());

String query = clientOptions.execute;

if (hasQuery) {

query += ";";

}

//冲突检查,--execute和—file不能同时使用

if (isFromFile) {

if (hasQuery) {

throw new RuntimeException("both --execute and --file specified");

}

try {

query = Files.toString(new File(clientOptions.file), Charsets.UTF_8);

hasQuery = true;

}

catch (IOException e) {

throw new RuntimeException(format("Error reading from file %s: %s", clientOptions.file, e.getMessage()));

}

}

//有查询就执行查询,没查询就运行客户端

try (QueryRunner queryRunner = QueryRunner.create(session)) {

if (hasQuery) {

executeCommand(queryRunner, query, clientOptions.outputFormat);

}

else {

runConsole(queryRunner, session);

}

}

}

  1. 执行命令行:com.facebook.presto.cli.Console.runConsole(QueryRunner queryRunner, ClientSession session)

private static void runConsole(QueryRunner queryRunner, ClientSession session)

{

try (TableNameCompleter tableNameCompleter = new TableNameCompleter(queryRunner);

LineReader reader = new LineReader(getHistory(), tableNameCompleter)) {

tableNameCompleter.populateCache();

StringBuilder buffer = new StringBuilder();

while (true) {

// read a line of input from user

String prompt = PROMPT_NAME + ":" + session.getSchema();

if (buffer.length() > 0) {

prompt = Strings.repeat(" ", prompt.length() - 1) + "-";

}

String line = reader.readLine(prompt + "> ");

// add buffer to history and clear on user interrupt

if (reader.interrupted()) {

String partial = squeezeStatement(buffer.toString());

if (!partial.isEmpty()) {

reader.getHistory().add(partial);

}

buffer = new StringBuilder();

continue;

}

// exit on EOF

if (line == null) {

return;

}

// check for special commands if this is the first line

if (buffer.length() == 0) {

String command = line.trim();

if (command.endsWith(";")) {

command = command.substring(0, command.length() - 1).trim();

}

switch (command.toLowerCase()) {

case "exit":

case "quit":

return;

case "help":

System.out.println();

System.out.println(getHelpText());

continue;

}

}

// not a command, add line to buffer

buffer.append(line).append("\n");

// execute any complete statements

String sql = buffer.toString();

StatementSplitter splitter = new StatementSplitter(sql, ImmutableSet.of(";", "\\G"));

for (Statement split : splitter.getCompleteStatements()) {

System.out.printf("Execute query:" + split.statement());

Optional<Object> statement = getParsedStatement(split.statement());

if (statement.isPresent() && isSessionParameterChange(statement.get())) {

session = processSessionParameterChange(statement.get(), session);

queryRunner.setSession(session);

tableNameCompleter.populateCache();

}

else {

OutputFormat outputFormat = OutputFormat.ALIGNED;

if (split.terminator().equals("\\G")) {

outputFormat = OutputFormat.VERTICAL;

}

process(queryRunner, split.statement(), outputFormat, true);

}

reader.getHistory().add(squeezeStatement(split.statement()) + split.terminator());

}

// replace buffer with trailing partial statement

buffer = new StringBuilder();

String partial = splitter.getPartialStatement();

if (!partial.isEmpty()) {

buffer.append(partial).append(‘\n‘);

}

}

}

catch (IOException e) {

System.err.println("Readline error: " + e.getMessage());

}

}

  1. 发送请求到Coordinator:

在方法:com.facebook.presto.cli.QueryRunner.startInternalQuery(String query)中创建一个StatementClient对象,在调用StatementClient类的构造方法中发送请求到Coordinator中。

public StatementClient(HttpClient httpClient, JsonCodec<QueryResults> queryResultsCodec, ClientSession session, String query)

{

checkNotNull(httpClient, "httpClient is null");

checkNotNull(queryResultsCodec, "queryResultsCodec is null");

checkNotNull(session, "session is null");

checkNotNull(query, "query is null");

this.httpClient = httpClient;

this.responseHandler = createFullJsonResponseHandler(queryResultsCodec);

this.debug = session.isDebug();

this.timeZoneId = session.getTimeZoneId();

this.query = query;

Request request = buildQueryRequest(session, query);

//发送请求给Coordinator

currentResults.set(httpClient.execute(request, responseHandler).getValue());

}

  1. 打印查询执行过程和结果

public void renderOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)

{

SignalHandler oldHandler = Signal.handle(SIGINT, new SignalHandler()

{

@Override

public void handle(Signal signal)

{

if (ignoreUserInterrupt.get() || client.isClosed()) {

return;

}

try {

if (!client.cancelLeafStage()) {

client.close();

}

}

catch (RuntimeException e) {

log.debug(e, "error canceling leaf stage");

client.close();

}

}

});

try {

renderQueryOutput(out, outputFormat, interactive);

}

finally {

Signal.handle(SIGINT, oldHandler);

}

}

private void renderQueryOutput(PrintStream out, OutputFormat outputFormat, boolean interactive)

{

StatusPrinter statusPrinter = null;

@SuppressWarnings("resource")

PrintStream errorChannel = interactive ? out : System.err;

if (interactive) {

//交互式打印,非交互式会等待输出结果

statusPrinter = new StatusPrinter(client, out);

statusPrinter.printInitialStatusUpdates();

}

else {

waitForData();

}

打印执行情况的方法

public void printInitialStatusUpdates()

{

long lastPrint = System.nanoTime();

try {

while (client.isValid()) {

try {

// exit status loop if there is there is pending output

if (client.current().getData() != null) {

return;

}

// update screen if enough time has passed

if (Duration.nanosSince(lastPrint).getValue(SECONDS) >= 0.5) {

console.repositionCursor();

printQueryInfo(client.current());

lastPrint = System.nanoTime();

}

// fetch next results (server will wait for a while if no data)

client.advance();

}

catch (RuntimeException e) {

log.debug(e, "error printing status");

}

}

}

finally {

console.resetScreen();

}

}

附录1:客户端默认session:

public class ClientOptions

{

@Option(name = "--server", title = "server", description = "Presto server location (default: localhost:8080)")

public String server = "localhost:8080";

@Option(name = "--user", title = "user", description = "Username")

public String user = System.getProperty("user.name");

@Option(name = "--source", title = "source", description = "Name of source making query")

public String source = "presto-cli";

@Option(name = "--catalog", title = "catalog", description = "Default catalog")

public String catalog = "default";

@Option(name = "--schema", title = "schema", description = "Default schema")

public String schema = "default";

@Option(name = {"-f", "--file"}, title = "file", description = "Execute statements from file and exit")

public String file;

@Option(name = "--debug", title = "debug", description = "Enable debug information")

public boolean debug;

@Option(name = "--execute", title = "execute", description = "Execute specified statements and exit")

public String execute;

@Option(name = "--output-format", title = "output-format", description = "Output format for batch mode (default: CSV)")

public OutputFormat outputFormat = OutputFormat.CSV;

时间: 2024-10-09 22:53:23

Presto源码解读1-客户端提交查询-吕信(原创)的相关文章

Spark源码解读-JOB的提交与执行

本文以wordCount为例,详细说明spark创建和运行job的过程,重点是在进程及线程的创建. 实验环境搭建 在进行后续操作前,确保下列条件已满足. 1. 下载spark binary 0.9.1 2. 安装scala 3. 安装sbt 4. 安装java 启动spark-shell单机模式运行,即local模式 local模式运行非常简单,只要运行以下命令即可,假设当前目录是$SPARK_HOME MASTER=local bin/spark-shell "MASTER=local&quo

源码解读Mybatis List列表In查询实现的注意事项

转自:http://www.blogjava.net/xmatthew/archive/2011/08/31/355879.html 源码解读Mybatis List列表In查询实现的注意事项 在SQL开发过程中,动态构建In集合条件查询是比较常见的用法,在Mybatis中提供了foreach功能,该功能比较强大,它允许你指定一个集合,声明集合项和索引变量,它们可以用在元素体内.它也允许你指定开放和关闭的字符串,在迭代之间放置分隔符.这个元素是很智能的,它不会偶然地附加多余的分隔符.下面是一个演

Spark学习之路 (十六)SparkCore的源码解读(二)spark-submit提交脚本

讨论QQ:1586558083 目录 一.概述 二.源码解读 2.2 find-spark-home 2.3 spark-class 2.4 SparkSubmit 正文 回到顶部 一.概述 上一篇主要是介绍了spark启动的一些脚本,这篇主要分析一下Spark源码中提交任务脚本的处理逻辑,从spark-submit一步步深入进去看看任务提交的整体流程,首先看一下整体的流程概要图: 回到顶部 二.源码解读 2.1 spark-submit # -z是检查后面变量是否为空(空则真) shell可以

Retrofit2 源码解读

开源库地址:https://github.com/square/retrofit 解读版本:2.1.0 基本概念 Retrofit 是一个针对Java/Android类型安全的Http请求客户端. 基本使用如下: 首先定义一个接口,抽象方法的返回值必须为Call<XX>. public interface GitHubService { @GET("users/{user}/repos") Call<List<Repo>> listRepos(@Pa

AFNetworking 3.0 源码解读 总结

终于写完了 AFNetworking 的源码解读.这一过程耗时数天.当我回过头又重头到尾的读了一篇,又有所收获.不禁让我想起了当初上学时的种种情景.我们应该对知识进行反复的记忆和理解.下边是我总结的 AFNetworking 中能够学到的知识点. 1.枚举(enum) 使用原则:当满足一个有限的并具有统一主题的集合的时候,我们就考虑使用枚举.这在很多框架中都验证了这个原则.最重要的是能够增加程序的可读性. 示例代码: /** * 网络类型 (需要封装为一个自己的枚举) */ typedef NS

HttpClient 4.3连接池参数配置及源码解读

目前所在公司使用HttpClient 4.3.3版本发送Rest请求,调用接口.最近出现了调用查询接口服务慢的生产问题,在排查整个调用链可能存在的问题时(从客户端发起Http请求->ESB->服务端处理请求,查询数据并返回),发现原本的HttpClient连接池中的一些参数配置可能存在问题,如defaultMaxPerRoute.一些timeout时间的设置等,虽不能确定是由于此连接池导致接口查询慢,但确实存在可优化的地方,故花时间做一些研究.本文主要涉及HttpClient连接池.请求的参数

(转)go语言nsq源码解读二 nsqlookupd、nsqd与nsqadmin

转自:http://www.baiyuxiong.com/?p=886 ----------------------------------------------------------------------- 上一篇go语言nsq源码解读-基本介绍  介绍了最基本的nsq环境搭建及使用.在最后使用时,我们用到了几个命令:nsqlookupd.nsqd.nsqadmin.curl及 nsq_to_file,并看到用curl命令写入的几个”hello world”被nsq_to_file命令保

solr源码解读(转)

solr源码解读(转)原文地址:http://blog.csdn.net/duck_genuine/article/details/6962624 配置 solr 对一个搜索请求的的流程 在solrconfig.xml会配置一个handler.配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件 [html] view plaincopy <requestHandler name="standard" class="solr

SpringMVC源码解读 - RequestMapping注解实现解读 - RequestCondition体系

一般我们开发时,使用最多的还是@RequestMapping注解方式. @RequestMapping(value = "/", param = "role=guest", consumes = "!application/json") public void myHtmlService() { // ... } 台前的是RequestMapping ,正经干活的却是RequestCondition,根据配置的不同条件匹配request. @Re