Kafka+Flume+Morphline+Solr+Hue数据组合索引

背景:Kafka消息总线的建成,使各个系统的数据得以在kafka节点中汇聚,接下来面临的任务是最大化数据的价值,让数据“慧”说话。

环境准备:

Kafka服务器*3。

CDH 5.8.3服务器*3,安装Flume,Solr,Hue,HDFS,Zookeeper服务。

Flume提供了可扩展的实时数据传输通道,Morphline提供了轻量级的ETL功能,SolrCloud+Hue提供了高性能搜索引擎和多样的数据展现形式。

一.环境安装(略)

二.修改CDH默认配置:

1.在Flume配置界面配置Flume依赖Solr。

2.在Solr配置界面配置Solr使用Zookeeper存储配置文件,使用HDFS存储索引文件。

3.在Hue配置界面配置Hue依赖Solr

4.配置Hue界面可以被外网访问。

三.按场景配置各CDH服务及开发代码。

Kafka Topic: eventCount

Topic数据格式:

{
    "timestamp": "1481077173000",
    "accountName": "旺小宝",
    "tagNames": [
        "incoming"
    ],
    "account": "WXB",
    "eventType": "phone",
    "eventTags": [
        {
            "value": 1,
            "name": "incoming"
        }
    ]
}

1.Solr创建对应Collection。

1)登录任意CDH节点。生成collection配置文件骨架。

$ solrctl instancedir --generate $HOME/solr_configs

2)找到文件夹中的schema.xml文件,修改collection的schema。

第一步:修改filed。schema.xml中预定义了很多filed,除了name=id,_root_,_version_不能去掉之外,其他的全部可以去掉。field对应的是json中需要被索引的字段。

(Notice:json中的timestamp对应的是下面的eventTime,而下面的timestamp是flume接受kafka数据的时间。这是通过Morphline配置实现的转换)

<field name="id" type="string" indexed="true" stored="true" required="true" multiValued="false" /> 

   <!-- points to the root document of a block of nested documents. Required for nested
        document support, may be removed otherwise
   -->
   <field name="_root_" type="string" indexed="true" stored="false"/>
   <field name="account" type="string" indexed="true" stored="true"/>
   <field name="accountName" type="string" indexed="true" stored="true"/>
   <field name="subaccount" type="string" indexed="true" stored="true"/>
   <field name="subaccountName" type="string" indexed="true" stored="true"/>
   <field name="eventTime" type="tlong" indexed="false" stored="true"/>
   <field name="eventType" type="string" indexed="true" stored="true"/>
   <field name="eventTags" type="string" indexed="true" stored="true" multiValued="true"/>
   <field name="_attachment_body" type="string" indexed="false" stored="true"/>
   <field name="timestamp" type="tlong" indexed="false" stored="true"/>
   <field name="_version_" type="long" indexed="true" stored="true"/>

第二步:去掉所有copy field。

第三步:添加动态字段dynamicFiled。

<dynamicField name="tws_*" type="text_ws" indexed="true" stored="true" multiValued="true"/>

3) 上传配置,创建collection

$ solrctl instancedir --create event_count_records solr_configs
$ solrctl collection --create event_count_records -s 3 -c event_count_records

2.Flume配置

创建一个新的角色组kafka2solr,修改代理名称为kafka2solr,并为该角色组分配服务器。

# 配置 source  channel sink 的名字
kafka2solr.sources = source_from_kafka
kafka2solr.channels = mem_channel
kafka2solr.sinks = solrSink

# 配置Source类别为kafka
kafka2solr.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSource
kafka2solr.sources.source_from_kafka.channels = mem_channel
kafka2solr.sources.source_from_kafka.batchSize = 100
kafka2solr.sources.source_from_kafka.kafka.bootstrap.servers= kafkanode0:9092,kafkanode1:9092,kafkanode2:9092
kafka2solr.sources.source_from_kafka.kafka.topics = eventCount
kafka2solr.sources.source_from_kafka.kafka.consumer.group.id = flume_solr_caller
kafka2solr.sources.source_from_kafka.kafka.consumer.auto.offset.reset=latest

#配置channel type为memory,通常生产环境中设置为file或者直接用kafka作为channel
kafka2solr.channels.mem_channel.type = memory
kafka2solr.channels.mem_channel.keep-alive = 60

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
kafka2solr.channels.mem_channel.capacity = 10000
kafka2solr.channels.mem_channel.transactionCapacity = 3000  

# 配置sink到solr,并使用morphline转换数据
kafka2solr.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
kafka2solr.sinks.solrSink.channel = mem_channel
kafka2solr.sinks.solrSink.morphlineFile = morphlines.conf
kafka2solr.sinks.solrSink.morphlineId=morphline1
kafka2solr.sinks.solrSink.isIgnoringRecoverableExceptions=true

3.Flume-NG的Solr接收器配置

SOLR_LOCATOR : {
  # Name of solr collection
  collection : event_count_records

  # ZooKeeper ensemble 
  #CDH的专有写法,开源版本不支持。
  zkHost : "$ZK_HOST"
  }

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**", "org.apache.solr.**"]

    commands : [
{
  #Flume传过来的kafka的json数据是用二进制流的形式,需要先读取json
   readJson{}
}

{
 #读出来的json字段必须转换成filed才能被solr索引到
extractJsonPaths {
 flatten:true
 paths:{
account:/account
accountName:/accountName
subaccount:/subaccount
subaccountName:/subaccountName
eventTime:/timestamp
eventType:/eventType
eventTags:"/eventTags[]/name"
#按分钟存timestamp
eventTimeInMinute_tdt:/timestamp
#按小时存timestamp
eventTimeInHour_tdt:/timestamp
#按天存timestamp
eventTimeInDay_tdt:/timestamp
#_tdt后缀会被动态识别为日期类型的索引字段
#按不同时间间隔存索引以增加查询性能
}

}
}

#转换long型时间为Date格式
{convertTimestamp {
  field : eventTimeInMinute_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/MINUTE‘"
  outputTimezone : Asia/Shanghai
}}

{convertTimestamp {
  field : eventTimeInHour_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/HOUR‘"
  outputTimezone : Asia/Shanghai
}}
{convertTimestamp {
  field : eventTimeInDay_tdt
  inputFormats : ["unixTimeInMillis"]
  inputTimezone : UTC
  outputFormat : "yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z/DAY‘"
  outputTimezone : Asia/Shanghai
}}

#kafka中的json数据传到flume中时会被放入_attachment_body字段,readJson后会变成JsonNode对象,需要toString之后才能保存
{toString { field : _attachment_body }}

#为每一条记录生成一个UUID
{generateUUID {
  field : id
}}

#对未定义的Solr字段加tws前缀,根据schema.xml中定义的tws_*为text_ws类型,会动态未未定义的字段建索引。
          {
        sanitizeUnknownSolrFields {
          # Location from which to fetch Solr schema
          solrLocator : ${SOLR_LOCATOR}
          renameToPrefix:"tws_"
        }
      }  

#将数据导入到solr中
      {loadSolr {solrLocator : ${SOLR_LOCATOR}}}
    ]
  }
]

重启被影响的Flume节点,数据开始导入solr。

3.通过Hue查询Solr中的数据。

Solr+Hue实战

时间: 2024-10-25 18:19:27

Kafka+Flume+Morphline+Solr+Hue数据组合索引的相关文章

Flume+Morphline+Solr+Hue实时索引调试及问题定位

Technorati Tags: Solr,Hue,Flume,Morphline,大数据 1.Flume和Morphline添加日志打印 log4j.logger.org.apache.flume.sink.solr=DEBUG log4j.logger.org.kitesdk.morphline=TRACE 2.在线更新solr collection配置 $ solrctl instancedir --update url_analysis_records url_analysis_conf

搜索引擎系列十:Solr(solrj 、索引API 、 结构化数据导入)

一.SolrJ介绍 1. SolrJ是什么? Solr提供的用于JAVA应用中访问solr服务API的客户端jar.在我们的应用中引入solrj: <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId> <version>7.3.0</version> </dependency> 2. SolrJ的核

大数据架构-使用HBase和Solr将存储与索引放在不同的机器上

摘要:HBase和Solr可以通过协处理器Coprocessor的方式向Solr发出请求,Solr对于接收到的数据可以做相关的同步:增.删.改索引的操作,这样就可以同时使用HBase存储量大和Solr检索性能高的优点了,更何况HBase和Solr都可以集群.这对海量数据存储.检索提供了一种方式,将存储与索引放在不同的机器上,是大数据架构的必须品. 关键词:HBase, Solr, Coprocessor, 大数据, 架构 正如我的之前的博客“Solr与HBase架构设计”http://http:

[转载] 利用flume+kafka+storm+mysql构建大数据实时系统

原文: http://mp.weixin.qq.com/s?__biz=MjM5NzAyNTE0Ng==&mid=205526269&idx=1&sn=6300502dad3e41a36f9bde8e0ba2284d&key=c468684b929d2be22eb8e183b6f92c75565b8179a9a179662ceb350cf82755209a424771bbc05810db9b7203a62c7a26&ascene=0&uin=Mjk1ODMy

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

利用Flume将MySQL表数据准实时抽取到HDFS

转自:http://blog.csdn.net/wzy0623/article/details/73650053 一.为什么要用到Flume 在以前搭建HAWQ数据仓库实验环境时,我使用Sqoop抽取从MySQL数据库增量抽取数据到HDFS,然后用HAWQ的外部表进行访问.这种方式只需要很少量的配置即可完成数据抽取任务,但缺点同样明显,那就是实时性.Sqoop使用MapReduce读写数据,而MapReduce是为了批处理场景设计的,目标是大吞吐量,并不太关心低延时问题.就像实验中所做的,每天定

中国移动实时数据分析-基于spark+kafka+flume

这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper 主要是为了熟悉一下整个的一个spark-streaming的一个整个流程,还有就是了解调优的地方. 上述假设已经安装好了相应的组件,然后就开始正式的踩坑之路: 1.编写一个java程序去读取原始数据文件,模拟1s进行文件的插入一行,原始的数据文件格式如下: 坑a .整个的数据格式是js

oracle 优化——索引与组合索引

1.索引结构.第一张图是索引的官方图解,右侧是存储方式的图解. 图中很清晰的展示了索引存储的状况. 在leaf 节点中存储了一列,索引所对应项的 :值,rowId,长度,头信息(控制信息) 这样我们就能很清楚.如果通过索引查找数据,而只需要这个索引的值的时候,写上列名,就可以不需要回表. 2.索引在一般的数据量情况下,只有三层.leaf 是目录,branch 是目录的目录.可以做一个测试 1 drop table t1 purge; 2 drop table t2 purge; 3 drop t

(solr系列:五) solr定时实时重建索引和增量更新

将mysql中的数据导入到了solr中之后,如果数据库中的数据有变动,solr中还是第一次导入的旧的数据,那该如何是好呢?该如何实现mysql数据库中的数据定时同步到solr中呢?下面将做详细的介绍. 准备工作要做好: 1.下载jar包:solr-dataimportscheduler-1.1.jar http://pan.baidu.com/s/1hsySs2S 2.新建文件:dataimport.properties,文件复制下面的就好,具体配置含义已给出注释: ##############