Hadoop3集群搭建之——hive添加自定义函数UDTF (一行输入,多行输出)

上篇:

Hadoop3集群搭建之——虚拟机安装

Hadoop3集群搭建之——安装hadoop,配置环境

Hadoop3集群搭建之——配置ntp服务

Hadoop3集群搭建之——hive安装

Hadoop3集群搭建之——hbase安装及简单操作

Hadoop3集群搭建之——hive添加自定义函数UDF

Hadoop3集群搭建之——hive添加自定义函数UDTF

上篇中,udtf函数,只有为一行输入,一行输出。udtf是可以一行输入,多行输出的。

简述下需求:  

输入开始时间,结束时间,返回每个小时的时长

直接上代码:

package com.venn.udtf;

import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;

/**
 * Created by venn on 5/20/2018.
 * SplitHour : split hour
 */
public class SplitHour extends GenericUDTF {

    /**
     * add the column name
     * @param args
     * @return
     * @throws UDFArgumentException
     */
    @Override
    public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
            throw new UDFArgumentLengthException("ExplodeMap takes only one argument");
        }
        if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentException("ExplodeMap takes string as a parameter");
        }

        ArrayList<String> fieldNames = new ArrayList<String>();
        ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
        fieldNames.add("begintime");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("endtime");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("hour");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldNames.add("seconds");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    /**
     * process the column
     * @param objects
     * @throws HiveException
     */
    public void process(Object[] objects) throws HiveException {

        String [] input = objects[0].toString().split(",");
        // 2018-06-06 10:25:35
        String beginTime = input[0];
        String endTime = input[1];

        String[] result = new String[4];
        result[0] = beginTime;
        result[1] = endTime;

        // begintime
        int bhour = Integer.parseInt(beginTime.substring(11, 13));
        int bmin = Integer.parseInt(beginTime.substring(14, 16));
        int bsecond = Integer.parseInt(beginTime.substring(17, 19));
        // endtime
        int ehour = Integer.parseInt(endTime.substring(11, 13));
        int emin = Integer.parseInt(endTime.substring(14, 16));
        int esecond = Integer.parseInt(endTime.substring(17, 19));

        // 1.if begin hour equal end hour, second is : (emin - bmin) * 60 + (esecond - bsecond)
        if (bhour == ehour) {
            result[2] = String.valueOf(bhour);
            result[3] = String.valueOf((emin - bmin) * 60 + (esecond - bsecond));
            forward(result);
            return;
        }

        boolean flag = true;
        //TODO 待优化,先输出第一个循环的时长,再循环后面的就不用判断
        while (bhour != ehour) {
            result[2] = String.valueOf(bhour);

            if(flag){
                flag = false;
            // 2. if begintime hour != endtime, the first hour, second is : 3600 - bmin * 60 - bsecond
                result[3] = String.valueOf(3600 - bmin * 60 - bsecond);
            }else {
                // 3. next hour is 3600
                result[3] = String.valueOf(3600);
            }
            bhour += 1;
            // 输出到hive
            forward(result);
        }

        result[2] = String.valueOf(bhour);
        // 4. the end hour is : emin  * 60 + esecond
        result[3] = String.valueOf( emin  * 60 + esecond);
        forward(result);

    }

    public void close() throws HiveException {

    }

}

udtf 函数介绍参加上篇

使用方式见上篇

Hadoop3集群搭建之——hive添加自定义函数UDTF

样例:

hive> select split_hour( concat(begintime,‘,‘,endtime)) from viewlog where log_date=20180401 limit 10;
OK
begintime    endtime    hour    seconds
2018-04-01 10:26:14    2018-04-01 10:26:21    10    7
2018-04-01 07:21:47    2018-04-01 07:22:23    7    36
2018-04-01 15:18:08    2018-04-01 15:18:11    15    3
2018-04-01 18:05:13    2018-04-01 18:05:28    18    15
2018-04-01 07:18:34    2018-04-01 07:18:52    7    18
2018-04-01 23:28:32    2018-04-01 23:29:44    23    72
2018-04-01 06:34:11    2018-04-01 06:34:17    6    6
2018-04-01 14:02:40    2018-04-01 14:03:33    14    53
2018-04-01 17:30:23    2018-04-01 17:30:26    17    3
2018-04-01 12:15:07    2018-04-01 12:15:11    12    4
2018-04-01 06:53:40    2018-04-01 07:02:09    6    380
2018-04-01 06:53:40    2018-04-01 07:02:09    7    129
Time taken: 2.238 seconds, Fetched: 12 row(s)

搞定

原文地址:https://www.cnblogs.com/Springmoon-venn/p/9286670.html

时间: 2024-12-08 09:08:22

Hadoop3集群搭建之——hive添加自定义函数UDTF (一行输入,多行输出)的相关文章

大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单机的搭建,是因为作为个人学习的话,单机已足以,好吧,说实话是自己的电脑不行,使用虚拟机实在太卡了... 整个的集群搭建是在公司的测试服务搭建的,在搭建的时候遇到各种各样的坑,当然也收获颇多.在成功搭建大数据集群之后,零零散散的做了写笔记,然后重新将这些笔记整理了下来.于是就有了本篇博文. 其实我在搭

rabbitmq集群搭建(centos6.5)

一:rabbitmq的安装: 参考:http://www.blogjava.net/hellxoul/archive/2014/06/25/415135.html http://blog.haohtml.com/archives/15249 说明:修改机器名字后再安装(为后面集群做准备) vi /etc/sysconfig/network 修改名字 vi /etc/hosts 修改地址映射表,如192.168.1.112   rabbitmq-node1.com rabbitmq-node1 #

Shark集群搭建配置

一.Shark简介 Shark是基于Spark与Hive之上的一种SQL查询引擎,官网的架构图及性能测试图如下: 我们涉及到了2个依赖组件,1是Apache Spark, 另外一个是AMPLAB的Hive0.11. 这里注意版本的选择,一定要选择官方的推荐版本: Spark0.91 + AMPLAB Hive0.11 + Shark0.91 一定要自己编译好它们,适用于自己的集群. 二.Shark集群搭建 1. 搭建Spark集群,这个可以参照:Spark集群搭建. 2. 编译AMPLAB的Hi

presto集群安装&整合hive|mysql|jdbc

Presto是一个运行在多台服务器上的分布式系统. 完整安装包括一个coordinator(调度节点)和多个worker. 由客户端提交查询,从Presto命令行CLI提交到coordinator. coordinator进行解析,分析并执行查询计划,然后分发处理队列到worker中. 目录: 搭建前环境准备 集群计划 连接器 安装步骤 配置文件 运行presto 整合hive测试 整合mysql测试 整合jdbc测试 1.搭建前环境准备 CentOS 6.7 java8 Python3.4.4

Flume 学习笔记之 Flume NG高可用集群搭建

Flume NG高可用集群搭建: 架构总图: 架构分配: 角色 Host 端口 agent1 hadoop3 52020 collector1 hadoop1 52020 collector2 hadoop2 52020 agent1配置(flume-client.conf): #agent1 name agent1.channels = c1 agent1.sources = r1 agent1.sinks = k1 k2 #set gruop agent1.sinkgroups = g1 #

CDH集群搭建步骤

CDH集群搭建步骤详细文档 一.关于CDH和Cloudera Manager CDH (Cloudera's Distribution,including Apache Hadoop),是Hadoop众多分支中的一种,由Cloudera维护,基于稳定版本的Apache Hadoop构建,并集成了很多补丁,可直接用于生产环境. Cloudera Manager则是为了便于在集群中进行Hadoop等大数据处理相关的服务安装和监控管理的组件,对集群中主机.Hadoop.Hive.Spark等服务的安装

Hadoop学习之路(四)Hadoop集群搭建和简单应用

概念了解 主从结构:在一个集群中,会有部分节点充当主服务器的角色,其他服务器都是从服务器的角色,当前这种架构模式叫做主从结构. 主从结构分类: 1.一主多从 2.多主多从 Hadoop中的HDFS和YARN都是主从结构,主从结构中的主节点和从节点有多重概念方式: 1.主节点 从节点 2.master slave 3.管理者 工作者 4.leader follower Hadoop集群中各个角色的名称: 服务 主节点 从节点 HDFS NameNode DataNode YARN Resource

MHA 高可用集群搭建(二)

MHA 高可用集群搭建安装scp远程控制http://www.cnblogs.com/kevingrace/p/5662839.html yum install openssh-clients mysql5.7运行环境:centos6.51 主机部署 manager:192.168.133.141test1: 192.168.133.138test2:192.168.133.139 (为master1的备用)test3: 192.168.133.140 test1为主,test2和test3为备

Hadoop2.7.2安装与集群搭建

1.环境准备 jdk需要1.7以上版本64位. 创建hadoop用户. 在hadoop用户目录下解压安装包hadoop-2.7.2.tar.gz 2.配置免密码登录 各节点分别执行 生成公钥和私钥:ssh-keygen -t rsa 四次enter. 将公钥添加进公钥库:cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys 修改authorized_keys权限:chmod 600 ~/.ssh/authorized_keys 验证:ssh local