flume_kafka_hdfs_hive数据的处理

     使用flume收集数据,将数据传递给kafka和hdfs,kafka上的数据可以使用storm构建实时计算,而hdfs上的数据,经过MR处理之后可以导入hive中进行处理。

     环境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。

     说明:所有服务都架设在一台机器上。

     1:安装hadoop:这篇文章写得比较完整,可以看看:Ubuntu 12.10 安装JDK、Hadoop全过程

我在安装过程中出现:Does not contain a valid host:port authority: file:///,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml没有发现错误,还特地看了些hosts配置,最后网上找到,fs.default.name中default写错了,启动hadoop。

     2:安装hive:下载解压之后,设置HIVE_HOME,将HIVE_HOME/bin加入到PATH变量中,直接输入hive即可启动。默认hive是使用嵌入模式的Derby数据库,它的特点是小巧,而且老爹也是apache,但存在单session,无法多用户共享,这里参考网上的资料将元数据存储到mysql中去:Hive集成Mysql作为元数据,这里我出现了点问题,只能使用localhost进行连接,无法使用[email protected],试着按照文章中查找my.conf,但没有找到相关配置,在实验环境下这样也可以用:

     3:安装flume:

Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

Flume是由Cloudera推出的一个高效收集,处理,移动大量日志数据的分布式,可靠地,高可用的服务。它有简单并且灵活的架构基于数据流之上。它使用可靠地协调性,容错转移,恢复机制使它强健性并且容错。它使用简单可伸缩的数据模型能实现在线数据分析。

Flume安agent划分,一个agent包括Source,Channel,Sink三个部分,Source从Web Server中取数据,push交给Channel,Sink将pull Channel得到数据,一个agent可以有多个Channel, Sink。

     配置FLUME_HOME,将PATH中加入Flume下的执行路径,将conf下的flume-conf.properties.template重命名为flume-conf.properties,然后进行配置,在单机情况下:

agent.sources = r1                //agent中添加source,命名为r1
agent.sinks = s1                  //agent中添加sink,命名为s1
agent.channels = c1               //agent中添加channel,命名为c1
agent.sinks.s1.channel = c1       //s1从c1中取数据
agent.sources.r1.channels = c1    //r1将数据交给c1

#describe the source
agent.sources.r1.type = exec      //定义r1的类型为exec
agent.sources.r1.command = tail -F /root/input/loginfo    //r1执行的命令

#use a channel which buffers events in memory
agent.channels.c1.type = memory    //定义c1的类型memory
agent.channels.c1.capacity = 1000  //c1的容量
agent.channels.c1.transactionCapacity = 100  //channel获取或者sink获得一次最大的数据量 

#sinks type
agent.sinks.s1.type = logger       //定义s1的类型为logger

完成后启动flume,测试:

bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent

4:安装kafka:关于kafka的介绍:Kafka快速入门,简单来说,kafka集群中的一台服务器就是一个broker,消息按名字分类,叫做topic,消息的产生是producer,消息的获取方为customer。kafka的安装方法同上。由于使用默认配置,kafka中的config下不需要配置了,直接启动即可进行模拟:

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

相关可以看apache-kafka

5:进行整合:数据的处理过程包括数据收集,数据清理,数据存储,数据分析,数据展现。在这里数据的收集由flume负责,定期从web server中收集log相关信息,对于实时数据的处理,将数据直接发送到kafka,然后交给后面的storm处理(这个没有做),对于离线部分,经过简单的mr处理后存储到hdfs上,然后使用hive操作。

总的架构图:

Flume的设计:

在搭建之前,先安装maven:安装步骤同上Flume与Kafka

安装完后echo $PATH:

/usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin

Flume与Kafka之间整合需要一个插件:这里介绍个flume-kafka插件,flume1.4,kafka0.7.2的基础上,将代码下载下来,进入目录,使用maven打包成jar文件,将生成的jar包放到flume的lib或相关目录下,依次将hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar导入,mvn package过程中可能会报错,找不到kafka0.7.2.jar,你需要将额外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再进行package。

对于myggg开启6个终端:

对于发送到kafka中的数据以后在处理,现在主要是针对hadoop中的数据,首先使用MR处理,格式化文本。

6:后续:解压eclipse,将之前准备的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目录下,使用vnc连接到机器,编写MR程序。

使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:

1,b
2,c
3,d
4,e
5,f
6,g
7,z
0,o

编写MR,将一行记录拆分为key,value:

public static class MyMapper
         extends Mapper<Object, Text, IntWritable, Text>{
      private IntWritable hello = new IntWritable();
      private Text world = new Text();

      public void map(Object key, Text value, Context context
                      ) throws IOException, InterruptedException {
        String[] array = value.toString().split(",");
        if (array.length == 2) {
          hello.set(Integer.parseInt(array[0]));
          world.set(array[1]);

          context.write(hello, world);
        }
      }
    }

查看结果:

[[email protected] eclipse]# hadoop fs -cat /myoutput/part-r-00000
0    o
1    b
2    c
3    d
4    e
5    f
6    g
7    z

使用hive建立外部表查看数据:

create external table employee(id int, name string)
row format delimited fields terminated by ‘\t‘
lines terminated by ‘\n‘
stored as textfile
location ‘/myoutput‘;

然后就可以进行相关查询与处理了。

时间: 2024-08-24 22:15:31

flume_kafka_hdfs_hive数据的处理的相关文章

记一次MySQL找回用户数据

事情经过 有天,我们公司外区的一个销售C说他8月3号以前的工作流记录找不到了.问清缘由,原来是更新了微信号(我们公司的工作流是基于企业微信开发的).经过分析,微信号和流程数据并没什么关系,所以初步得出结论:本来只需要更新微信号的,结果我们公司的流程系统管理员把用户先删除,再创建了新的用户. 解决过程 1.首先想到的是直接从定时备份数据里面找回原来的用户ID,结果发现系统只备份了十天的记录,而工作流系统上显示销售C只有8月3号以后的流程记录,距今已经40多天,从自动备份的数据里已经无法恢复. 2.

使用 Chrome 浏览器插件 Web Scraper 10分钟轻松实现网页数据的爬取

本文标签: WebScraper Chrome浏览器插件 网页数据的爬取 使用Chrome 浏览器插件 Web Scraper 可以轻松实现网页数据的爬取,不写代码,鼠标操作,点哪爬哪,还不用考虑爬虫中的登陆.验证码.异步加载等复杂问题. Web Scraper插件 Web Scraper 官网中的简介: Web Scraper Extension (Free!)Using our extension you can create a plan (sitemap) how a web site

Day4 - 迭代器&amp;生成器、装饰器、Json &amp; pickle 数据序列化、软件目录结构规范

---恢复内容开始--- 本节内容 迭代器&生成器 装饰器 Json & pickle 数据序列化 软件目录结构规范 作业:ATM项目开发 1.列表生成式,迭代器&生成器 列表生成式 需求:列表a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],要求把列表里的每个值加1 1 a = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 2 b = [] 3 for i in a: 4 b.append(i+1) 5 a = b 6 print(a) 普通青

Oracle 10g通过创建物化视图实现不同数据库间表级别的数据同步

摘自:http://blog.csdn.net/javaee_sunny/article/details/53439980 目录(?)[-] Oracle 10g 物化视图语法如下 实例演示 主要步骤 在A节点创建原表和物化视图日志 在B节点创建连接A节点的远程链接 在B节点处创建目标表和与目标表名称相同的物化视图 在B节点处刷新物化视图 升级采用存储过程定时任务JOB方式定时刷新物化视图 进一步优化 文章更新记录 参考文章 Oracle 10g 物化视图语法如下: create materia

MySQL(九)之数据表的查询详解(SELECT语法)二

上一篇讲了比较简单的单表查询以及MySQL的组函数,这一篇给大家分享一点比较难得知识了,关于多表查询,子查询,左连接,外连接等等.希望大家能都得到帮助! 在开始之前因为要多表查询,所以搭建好环境: 1)创建数据表suppliers 前面已经有一张表是book表,我们在建立一张suppliers(供应商)表和前面的book表对应. 也就是说 让book中s_id字段值指向suppliers的主键值,创建一个外键约束关系. 其实这里并没有达到真正的外键约束关系,只是模拟,让fruits中的s_id中

C#如何拿到从http上返回JSON数据?

在实际开发中,我们经常会使用到API,所谓API一般就是一个地址,我们称之为接口.然后我们通过用C#对这地址发送请求,请求后,服务器就会给我们返回数据,一般是XML或者JSON,这里我们主要讲述的是JSON. 为了演示,我们这里准备了一个接口,这是一个查询物流的接口.(读者读到这篇文章的时候,接口可能有效,也可能失效,因为接口是网上找的,不是笔者自己写的,但是原理是一样的.) 接口:  http://www.kuaidi100.com/query?type=快递公司编码&postid=物流单号

C#中导出数据到Excel表格中

之前PM交给我一个自动化测试的Case,让我抓取页面上的数据到Excel表格中,刚好又接了一个之前人家做的系统, 刚好看到可以用NPOI导数据,就动手试试,成功导出. 由于鄙人比较菜,也比较懒, 怕自己忘记了,今天就总结一下,以防下次用可以参考. 1.要使用NPOI,首先需要在Project中Install NPOI的 Package. 右键点击Project------>Manage NuGet Packages---->Search NPOI----->点击搜索到的NPOI然后点击等

Oracle in 查询数据

问题描述: 查询所有的数据,查询结果:146360 select count(1) from bank_cde; in查询的获取部分数据,查询结果 :73080 select count(1) from bank_cde t where t.belongcode2 in('ABC','BCOM','BOC','CCB','CEB','CGB','CIB','CITIC','CMBC','ICBC','PAB','POST','SPDB'); not in查询数据,结果为0 select coun

SQL基础:数据表的创建

1. 先选择创建表所在的数据库 2. 创建表 3. 查看表是否创建成功 4. 主键:要求主键列的数据唯一,且不允许为空.主键能够唯一的标识表中的每一条记录,可以结合外键来定义不同数据表之间的关系,并且可以加快数据库查询的速度. 主键分为:单字段主键和多字段联合主键. 4.1 单字段主键 4.1.1 定义方式一:在定义列的同时指定主键 4.1.2 定义方式二:在定义完所有的列之后指定主键 4.2 多字段联合主键 5. 使用外键约束 外键用来在两个表之间建立连接,可以是一列或多列.一个表的外键可以是