hive不支持多个字符作为分隔符的解决方案

题记:

  近期在做某个大型银行的大数据项目,当在处理非结构化数据时,却发现他们给的数据并不符合hive和pig的处理要求,数据每行必须需要多个分割符才能完美处理,一下午也没有想到完美的办法解决,今天重新审视了一下整个过程。看来hive的命令行没法搞定了。于是乎,只能通过代码来搞定。

1、重新实现hive的InputFormat了,别急放码过来

package hiveStream;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

public class MyHiveInputFormat  extends TextInputFormat implements JobConfigurable{

	@Override
	public RecordReader<LongWritable, Text> getRecordReader(
			InputSplit genericSplit, JobConf job, Reporter reporter)
			throws IOException {
		 reporter.setStatus(genericSplit.toString());
	       return new MyRecordReader((FileSplit) genericSplit, job);
	}

}

2、仔细看看下面的方法,不解释,自己领悟。

package hiveStream;

import java.io.IOException;
import java.io.InputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.LineReader;

public class MyRecordReader implements RecordReader<LongWritable, Text>{

	private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader lineReader;
    int maxLineLength;

    public MyRecordReader(InputStream in, long offset, long endOffset,
            int maxLineLength) {
        this.maxLineLength = maxLineLength;
        this.start = offset;
        this.lineReader = new LineReader(in);
        this.pos = offset;
        this.end = endOffset;
    }

    public MyRecordReader(InputStream in, long offset, long endOffset,
            Configuration job) throws IOException {
        this.maxLineLength = job.getInt(
                "mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);
        this.lineReader = new LineReader(in, job);
        this.start = offset;
        this.end = endOffset;
    }

    // 构造方法
    public MyRecordReader(FileSplit inputSplit, Configuration job)
            throws IOException {
        maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",
                Integer.MAX_VALUE);
        start = inputSplit.getStart();
        end = start + inputSplit.getLength();
        final Path file = inputSplit.getPath();
        // 创建压缩器
        compressionCodecs = new CompressionCodecFactory(job);
        final CompressionCodec codec = compressionCodecs.getCodec(file);
        // 打开文件系统
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(file);
        boolean skipFirstLine = false;

        if (codec != null) {
            lineReader = new LineReader(codec.createInputStream(fileIn), job);
            end = Long.MAX_VALUE;
        } else {
            if (start != 0) {
                skipFirstLine = true;
                --start;
                fileIn.seek(start);
            }
            lineReader = new LineReader(fileIn, job);
        }

        if (skipFirstLine) {
            start += lineReader.readLine(new Text(), 0,
                    (int) Math.min((long) Integer.MAX_VALUE, end - start));
        }
        this.pos = start;
    }

    @Override
	public void close() throws IOException {
    	if (lineReader != null)
            lineReader.close();
	}

	@Override
	public LongWritable createKey() {
		return new LongWritable();
	}

	@Override
	public Text createValue() {
		 return new Text();
	}

	@Override
	public long getPos() throws IOException {
		  return pos;
	}

	@Override
	public float getProgress() throws IOException {
	   if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (pos - start) / (float) (end - start));
        }
	}

	@Override
	public boolean next(LongWritable key, Text value) throws IOException {
		 while (pos < end) {
	            key.set(pos);
	            int newSize = lineReader.readLine(value, maxLineLength,
	                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
	                            maxLineLength));
	            // 把字符串中的"##"转变为"#"
	            String strReplace = value.toString().replaceAll("\\s+", "\001");
	            Text txtReplace = new Text();
	            txtReplace.set(strReplace);
	            value.set(txtReplace.getBytes(), 0, txtReplace.getLength());
	            if (newSize == 0)
	                return false;
	            pos += newSize;
	            if (newSize < maxLineLength)
	                return true;
	        }
	        return false;
	    }
	}

3、处理实例:如下

数据处理要求:

    12 afd   fewf	fewfe  we
    76 vee   ppt	wfew  wefw
    83 tyutr   ppt	wfew  wefw
    45 vbe   ppt	wfew  wefw
    565 wee   ppt	wfew  wefw
    12 sde   ppt	wfew  wefw
注意:字段之间的空格不一致

1、建表:
    create table micmiu_blog(author int, category string, url string,town string,oop string) stored as inputformat ‘hiveStream.MyHiveInputFormat‘ outputformat  ‘org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat‘;
注意:输出咱可没有重写哦

2、加载数据:
    LOAD DATA LOCAL INPATH‘/mnt/test‘ OVERWRITE INTO TABLE micmiu_blog;

3、看看的成果:
    select * from micmiu_blog;

自己去试试,不解释

  

时间: 2024-12-25 05:15:21

hive不支持多个字符作为分隔符的解决方案的相关文章

JS中比較2个字符串内元素的不同(字符1, 字符2, 分隔符可选)

比較2个字符串内元素的不同(字符1, 字符2, 分隔符可选) 文件: diff.js // 演示样例使用方法 /* var str1 = "tie, mao, 55"; var str2 = "tie, mao, csdn"; var result = diff(str1, str2, ','); // 对象 var rs = "" + result; // " 55, csdn" var df1 = result.diff1

Pentaho的Mondrian对Hive的支持

需求描述 考虑直接在Hive或者Impala等Big Data方案,能够支持MDX查询,现调研一下Mondrian对hive的支持情况. 环境准备 hive环境,采用hive-0.10-cdh4.2.1 客户端程序使用的类库:mondrian-3.6.0.olap4j-1.2.0-SNAPSHOT 数据准备 来源于网上一个数据源,准备四张表 Customer - 客户信息维表 Product - 产品维表 ProductType - 产品类表维表 Sale - 销售记录表 为了方便测试数据与MD

如何为SparkSQL添加hive中支持的而SparkSQL暂未支持的命令

以ANALYZE为例描述 ANALYZE在Hive中的使用方法详见:https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables ANALYZE在Hive中使用简单介绍 一张表有4个分区: Partition1: (ds='2008-04-08', hr=11) Partition2: (ds='2008-04-08', hr=12) Partition3: (ds='2008-04-09',

JS中比较2个字符串内元素的不同(字符1, 字符2, 分隔符可选)

比较2个字符串内元素的不同(字符1, 字符2, 分隔符可选) 文件: diff.js // 示例用法 /* var str1 = "tie, mao, 55"; var str2 = "tie, mao, csdn"; var result = diff(str1, str2, ','); // 对象 var rs = "" + result; // " 55, csdn" var df1 = result.diff1; //

phpstudy APACHE支持.htaccess以及 No input file specified解决方案

APACHE支持.htaccess以及 No input file specified解决方案 你的Apache安装文件夹conf里找到httpd.conf文件 索LoadModule rewrite_module modules/mod_rewrite.so 如果前面有注释符号#,请去掉.搜索Options FollowSymLinks,然后将它下面的AllowOverride None 修改为AllowOverride All: [1] 没想到遇见了 No input file specif

Hive中将查询结果导出到指定分隔符的文件中

在Hive0.11.0版本中新引进了一个新的特性,当用户将Hive查询结果输出到文件中时,用户可以指定列的分割符,而在之前的版本是不能指定列之间的分隔符. 在Hive0.11.0之前版本如下使用,无法指定分隔符,默认为\x01: hive (hive)> insertoverwrite local directory '/home/hadoop/export_hive' select * from a; Query ID =hadoop_20150627174342_64852f3a-56ed-

Hive不支持非相等的join

由于 hive 与传统关系型数据库面对的业务场景及底层技术架构都有着很大差异,因此,传统数据库领域的一些技能放到 Hive 中可能已不再适用.关于 hive 的优化与原理.应用的文章,前面也陆陆续续的介绍了一些,但大多都偏向理论层面,本文就介绍一个实例,从实例中一步步加深对 hive 调优的认识与意识. 1.需求 需求我做了简化,很简单,两张表做个 join,求指定城市,每天的 pv,用传统的 RDBMS SQL 写出来就这样的: SELECT t.statdate, c.cname, coun

apache2.2支持URL中文字符

网站环境:OS: centos6.9 X64apache: 2.2.15 网站转移到新环境,客户反映新闻的一些图片无法正常显示.查看URL路径发现调用的图片是中文名称.网上查下,apache支持中文,需要另外的插件支持.mod_encoding.安装mod_encoding需要gcc编译器,要确保系统已经安装了该编译器. gcc编译器安装#yum install gcc 1.mod_encoding 下载mod_encoding_64.tgz文件下载2.安装mod_encoding#tar zx

Hive本地模式安装及遇到的问题和解决方案

Apache Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供简单的SQL查询功能,可以将SQL语句转换为MapReduce任务进行运行. 其优点是学习成本低,可以通过类SQL语句快速实现简单的MapReduce统计,不必开发专门的MapReduce应用,十分适合数据仓库的统计分析. Hive有三种运行模式: 1.内嵌模式:将元数据保存在本地内嵌的Derby数据库中,这得使用Hive最简单的方式,不过使用内嵌模式的话,缺点也比较明显,因为一个内嵌的D