通过hive自定义函数直接回写数据到数据库

hive一般用来执行离线统计分析相关的功能,然后将执行的结果导入到数据库的表中供前端报表可视化展现来查询。

导回数据库的方式有许多,sqoop、hive jdbc、mr jdbc等等,但是这几种方式都会有一个二次处理环节(数据需要人工)。

这次介绍另外一种处理方式,直接将对数据库的操作集成在udf中,这样直接写一个hql查询语句就可以了。

代码如下:

package com.taisenki.tools;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;

import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;

/**
 * 在hive0.13版本之上才能注册永久函数,否则只能注册临时函数
 * @author taisenki
 *
 */
@Description(name = "batch_import",  value = "_FUNC_(sql, args1[, args2,...][, config_path]) - Return ret ")
public class SqlBatchImportUDF extends GenericUDF {

    public static final String DEFAULT_CONFIG_ROOT_PATH = "/user/hive/udf/sjk/";
    public static final String DEFAULT_CONFIG_FILE_NAME = "sjk.properties";
    public static final String DEFAULT_CONFIG_FILE_SUFFIX = "properties";
    private IntObjectInspector retValInspector;
    private String sql;
    private PrimitiveObjectInspector[] paramsInspectors;
    private int insert = 0;
    private Connection conn;
    private PreparedStatement psi;
    private int count = 0;

    @Override
    public void close() throws IOException {
        // TODO Auto-generated method stub
        try {if (insert > 0) {
                psi.executeBatch();
                conn.commit();
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally{
            try {
                if(conn != null)
                    conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        super.close();
    }

    @Override
    public Object evaluate(DeferredObject[] arg0) throws HiveException {
        // TODO Auto-generated method stub
        try {
            for (int i = 0; i < count; i++) {
                Object param = paramsInspectors[i].getPrimitiveJavaObject(arg0[i + 1].get());
                psi.setObject(i + 1, param);
            }
            psi.addBatch();
            insert++;
            if(insert>1000){
                psi.executeBatch();
                conn.commit();
                insert = 0;
            }
            IntWritable iw = new IntWritable(insert);
            return retValInspector.getPrimitiveWritableObject(iw);  

        } catch (SQLException e) {
            e.printStackTrace();
            throw new HiveException(e);
        }
    }

    @Override
    public String getDisplayString(String[] arg0) {
        // TODO Auto-generated method stub
        return "batch_import(sql, args1[, args2,...][, config_path])";
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arg0)
            throws UDFArgumentException {
        // TODO Auto-generated method stub
        if (arg0.length < 2) {
            throw new UDFArgumentException(" Expecting  at least two arguments ");
        }
        insert = 0;
        //第一个参数校验,必须是一个非空的sql语句
        if (arg0[0].getCategory() == Category.PRIMITIVE
                && ((PrimitiveObjectInspector) arg0[0]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
            if (!(arg0[0] instanceof ConstantObjectInspector)) {
                throw new UDFArgumentException("the frist arg   must be a sql string constant");
            }
            ConstantObjectInspector sqlInsp = (ConstantObjectInspector) arg0[0];
            this.sql = sqlInsp.getWritableConstantValue().toString();
            int i = -1;
            count = 0;
            while (true) {
                i = sql.indexOf("?", i + 1);
                if (i == -1) {
                    break;
                }
                count++;
            }
            if (this.sql == null || this.sql.trim().length() == 0) {
                throw new UDFArgumentException("the frist arg   must be a sql string constant and not nullable");
            }
        }  

        if (count+1 > arg0.length){
            throw new UDFArgumentException("arguments not enough with this sql["+(arg0.length-1)/count+"]");
        }

        //默认情况
        String fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + SqlBatchImportUDF.DEFAULT_CONFIG_FILE_NAME;
        //判断是否存在指定的配置文件路径
        if (count+1 < arg0.length){
            //第一个参数校验
            if (arg0[count+1].getCategory() == Category.PRIMITIVE
                    && ((PrimitiveObjectInspector) arg0[count+1]).getPrimitiveCategory() == PrimitiveObjectInspector.PrimitiveCategory.STRING) {
                if (!(arg0[count+1] instanceof ConstantObjectInspector)) {
                    throw new UDFArgumentException("mysql connection pool config path  must be constant");
                }
                ConstantObjectInspector propertiesPath = (ConstantObjectInspector) arg0[count+1];  

                fileName1 = propertiesPath.getWritableConstantValue().toString();
                Path path1 = new Path(fileName1);
                if (path1.toUri().getScheme() == null) {
                    if (!"".equals(FilenameUtils.getExtension(fileName1)) && !DEFAULT_CONFIG_FILE_SUFFIX.equals(FilenameUtils.getExtension(fileName1))) {
                        throw new UDFArgumentException("不支持的文件扩展名,目前只支持properties文件!");
                    }
                    //如果是相对路径,补齐根路径
                    if (!fileName1.startsWith("/")) {
                        fileName1 = SqlBatchImportUDF.DEFAULT_CONFIG_ROOT_PATH + fileName1;
                    }
                }
                //如果只写了文件前缀的话,补上后缀
                if (!FilenameUtils.isExtension(fileName1, DEFAULT_CONFIG_FILE_SUFFIX)) {
                    fileName1 = fileName1 + FilenameUtils.EXTENSION_SEPARATOR_STR + DEFAULT_CONFIG_FILE_SUFFIX;
                }
            }
        }

        Properties properties = new Properties();
        Configuration conf = new Configuration();
        Path path2 = new Path(fileName1);  

        try (FileSystem fs = FileSystem.newInstance(path2.toUri(), conf); //这里不能用FileSystem.get(path2.toUri(), conf),必须得重新newInstance,get出来的是共享的连接,这边关闭的话,会导致后面执行完之后可能出现FileSystem is closed的异常
             InputStream in = fs.open(path2)) {
            properties.load(in);
        } catch (FileNotFoundException ex) {
            throw new UDFArgumentException("在文件系统中或者是HDFS上没有找到对应的配置文件");
        } catch (Exception e) {
            e.printStackTrace();
            throw new UDFArgumentException(e);
        }  

        try {
            Class.forName(properties.getProperty("driverClassName"));
            System.out.println(properties.getProperty("driverClassName"));
            System.out.println(properties.getProperty("url"));
            conn = DriverManager.getConnection(properties.getProperty("url"), properties);
            psi = conn.prepareStatement(sql);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            throw new UDFArgumentException(e);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
            throw new UDFArgumentException(e);
        }
        //中间为参数
        paramsInspectors = new PrimitiveObjectInspector[count];
        for (int i = 0; i < count; i++) {
            paramsInspectors[i] = (PrimitiveObjectInspector) arg0[i+1];
        }
        retValInspector = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
        return retValInspector;
    }

}

然后上传jar包,注册udf,注意此处需把对应数据库的驱动包一同进行注册操作:

如回写oracle: create function default.oracleSave as ‘com.taisenki.tools.SqlBatchImportUDF‘ using jar ‘hdfs://cdh5/data/lib/test.jar‘, jar ‘hdfs://cdh5/data/lib/ojdbc6.jar‘;

然后写一个HQL测试一下:

select oracleSave(‘insert into test111 values (?)‘,b.id) from (select 2 id from dual) b;

UDF第一个参数是静态参数,是对应数据库的sql语句,描述入库方式,然后后面的参数就不固定了,一一对应sql语句中的占位符,比如我上面有1个占位符,然后我后面就跟了1个参数。

若传入的参数恰好比占位符多1个的时候,最后一个参数则为指定数据库配置文件名,里面配置了如何开启连接池连接哪个数据库什么的。

附上一个默认的sjk.properties:

driverClassName=oracle.jdbc.driver.OracleDriver
url=jdbc:oracle:thin:@host:port:inst
user=test
password=test

此处注意,如果是hive 0.13以下的版本,是不支持注册永久function的,请使用
create temporaryfunction来进行,而且只支持session级别的,断开后自动消失……

时间: 2024-10-24 10:38:44

通过hive自定义函数直接回写数据到数据库的相关文章

Hive自定义函数的学习笔记(1)

前言: hive本身提供了丰富的函数集, 有普通函数(求平方sqrt), 聚合函数(求和sum), 以及表生成函数(explode, json_tuple)等等. 但不是所有的业务需求都能涉及和覆盖到, 因此hive提供了自定义函数的接口, 方便用户扩展. 自己好像很久没接触hadoop了, 也很久没博客了, 今天趁这个短期的项目, 对hive中涉及的自定义函数做个笔记. 准备: 编写hive自定义函数前, 需要了解下当前线上hive的版本. hive --vesion 比如作者使用到的hive

hive自定义函数(UDF)

首先什么是UDF,UDF的全称为user-defined function,用户定义函数,为什么有它的存在呢?有的时候 你要写的查询无法轻松地使用Hive提供的内置函数来表示,通过写UDF,Hive就可以方便地插入用户写的处理代码并在查询中使用它们,相当于在HQL(Hive SQL)中自定义一些函数,首先UDF必须用java语言编写,Hive本身就是用java写的. 编写UDF需要下面两个步骤: 1.继承org.apache.hadoop.hive.ql.UDF 2.实现evaluate函数,这

hive自定义函数UDF UDTF UDAF

Hive 自定义函数 UDF UDTF UDAF 1.UDF:用户定义(普通)函数,只对单行数值产生作用: UDF只能实现一进一出的操作. 定义udf 计算两个数最小值 public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } else { return a

Hive自定义函数UDAF开发

Hive支持自定义函数,UDAF是接受多行,输出一行. 通常是group by时用到这种函数. 其实最好的学习资料就是官方自带的examples了. 我这里用的是0.10版本hive,所以对于的examples在 https://github.com/apache/hive/tree/branch-0.10/contrib/src/java/org/apache/hadoop/hive/contrib/udaf/example 我这里的功能需求是: actionCount(act_code,ac

Hive自定义函数UDF示例

简单自定义函数只需继承UDF类,然后重构evaluate函数即可 LowerCase.java: package com.example.hiveudf; import org.apache.hadoop.hive.ql.exec.UDF; public final class LowerCase extends UDF { public String evaluate(final String s) { if (s == null) { return null; } return new St

Hive 自定义函数(转)

Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法.Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中. 可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令. Hive可以允许用户编写自己定义的函数UDF,来在查询中使用.Hive中有3种UDF: UDF:操作单个

[Hive]Hive自定义函数UDF

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数 用户自定义函数(user defined function),针对单条记录. 编写一个UDF,需要继承UDF类,并实现evaluate()函数.在查询执行过程中,查询中对应的每个应用到这个函数的地方都会对这个类进行实例化.对于每行输入都会调用到evaluate()函数.而evaluate()函数处理的值会返回给Hive.同时用户是可以重载evaluate方法的.Hive会像Java的方法重载一样,自动选择匹配的

Hive自定义函数(UDF、UDAF)

当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数. UDF 用户自定义函数(user defined function)–针对单条记录. 创建函数流程 1.自定义一个Java类 2.继承UDF类 3.重写evaluate方法 4.打成jar包 6.在hive执行add jar方法 7.在hive执行创建模板函数 8.hql中使用 Demo01: 自定义一个Java类 package UDFDemo; import org.apache.hadoop.hive.

Hive 自定义函数 UDF UDAF UDTF

UDF:用户定义(普通)函数,只对单行数值产生作用: 继承UDF类,添加方法 evaluate() /** * @function 自定义UDF统计最小值 * @author John * */ public class Min extends UDF { public Double evaluate(Double a, Double b) { if (a == null) a = 0.0; if (b == null) b = 0.0; if (a >= b) { return b; } el