大数据实战之Logstash采集->Kafka->ElasticSearch检索

1. Logstash概述

 Logstash的官网地址为:https://www.elastic.co/cn/products/logstash,以下是官方对Logstash的描述。

 Logstash是与Flume类似,也是一种数据采集工具,区别在于组件和特性两大方面。常用的数据采集工具有Sqoop、Flume、Logstash,计划将单独写一篇博文论述它们之间的区别,所以这里就不赘述,感兴趣可关注后期的博文。

2. Kafka概述

 Kafka的官网是:http://kafka.apache.org/,官方的介绍如下图:

  总结来说,Kafka是一个分布式消息队列,具有生产者和消费者的功能,它依赖Zookeeper集群来保存meta数据,根据Topic来归类存储的消息,Kafka集群由多个实例组成,每个实例称为broker。

3. ElasticSearch概述

 ElasticSearch是一个分布式的搜索和数据分析引擎。它的官网是:https://www.elastic.co/cn/products/elasticsearch,官方对ElasticSearch的描述如下,通过官方的描述能够对ElasticSearch有一个整体的了解。

3. 编程实战

3.1 小项目介绍

 在VM的linux本地logserver目录下存有模拟数据data.log,启动一个logstash监视Linux的logserver目录的data.log日志文件,当日志文件发生了修改,将日志文件采集到Kafka消息队列的名为logs的Topic中,另启动一个logstash将Kafka的消息采集到ElashticSearch,使用ElasticSearch检索数据。

  

3.2 开发环境

 系统环境: VM中存在三台Linux机器(bigdata12,bigdata14,bigdata15)

 软件环境:kafka_2.11-0.9.0.1、zookeeper-3.4.10、elasticsearch-2.4.4、logstash-2.3.1

3.3 环境准备

 1. 首先在三台机器开启zookeeper,各机器运行zkServer.sh start,Linux下查看是否有然后使用zkServer.sh status查看zookeeper的状态,如果看到leader和follower角色的出现就代表运行正常。

 2. 三台启动Kafka,到kafka目录下,运行 nohup bin/kafka-server-start.sh conf/serverproperties.conf。使用

 3. 使用非root用户启动elasticsearch,使用非root用户进入elasticsearch目录执行: bin/elasticsearch -d

 注意,必须是非root用户,否则会报错。如果没有,就创建一个用户。

例如创建一个用户为zhou的话,执行:

  (1) 添加用户:useradd bigdata,

  (2) 为用户添加密码 :echo 123456 | passwd --stdin zhou,

  (3) 将zhou添加到sudoers: echo "bigdata ALL = (root) NOPASSWD:ALL" | tee /etc/sudoers.d/zhou

  (4) 修改权限: chmod 0440 /etc/sudoers.d/zhou

  (5) 从root切换成zhou: su - zhou

  (6) 然后再执行启动elasticsearch命令

 4. 检查进程运行情况

  在Linux环境下执行jps命令查看进程是否正常启动,每台机器查看是否有以下进程

  

 在elasticsearch安装了head的前提下,在windows环境开启浏览器,在地址栏输入http://ip地址:9200/_plugin/head ,例如,根据我的配置,输入了http://192.168.243.11:9200/_plugin/head。出现以下界面,表示Elasticsearch启动正常

 

 在以上环节确认后,就代表环境启动运行正常,可以进行正常开发程序。

3.4 开发

3.4.1 编写logstash配置

 在bigdata12机器中进入logstash的conf目录:

 vi dataTokafka.conf

 1 input {
 2   file {
 3         codec => plain {
 4         charset => "UTF-8"
 5     }
 6     path => "/root/logserver/supernova.log"
 7     discover_interval => 5
 8     start_position => "beginning"
 9   }
10 }
11
12 output {
13     kafka {
14           topic_id => "supernova"
15           codec => plain {
16           format => "%{message}"
17           charset => "UTF-8"
18       }
19           bootstrap_servers => "bigdata12:9092,bigdata14:9092,bigdata15:9092"
20     }
21 }

 在bigdata14机器中进入logstash的conf目录:

  vi dataToElastic.conf

input {
  kafka {
    type => "supernova"
    auto_offset_reset => "smallest"
    codec => "plain"
    group_id => "elas2"
    topic_id => "supernova"
    zk_connect => "bigdata12:2181,bigdata14:2181,bigdata15:2181"
  }
}
filter {
  if [type] == "supernova" {
    mutate {
      split => { "message" => "|" }
      add_field => {
                "id" => "%{message[0]}"
                "time" => "%{message[1]}"
                "ip" => "%{message[2]}"
                "user" => "%{message[3]}"
     }
     remove_field => [ "message" ]
   }
  }
}
output {
  if [type] == "supernova" {
    elasticsearch {
      index => "supernova"
      codec => plain {
        charset => "UTF-16BE"
      }
      hosts => ["bigdata12:9200", "bigdata14:9200", "bigdata15:9200"]
    }
  }
}

 3.4.2 运行

 (1) 在bigdata12机器中,使用3.4.1中的dataTokakfa.conf启动logstash。执行:bin/logstash -f conf/dataTokakfa.conf,监听supernova.log文件

 (2) 在bigdata14机器中,使用3.4.1中的dataToElastic.conf启动logstach。执行:bin/logstash -f conf/dataToElastic.conf,将Kafka数据采集到Elasticsearch。

 (3) 为了便于观察,在bigdata15机器中,启动kafka消费者,查看Topic中的数据。执行:bin/kafka-console-consumer.sh --zookeeper bigdata11:2181 --from-beginning --topic logs,用于消费Kafka中Topic名为logs的消息。

 (4) 编辑修改Logstash监听的supernova.log文件。

 启动】:

 【修改】在bigdata15中修改了数据(右下角窗口)

 

【监视过程】:bigdata15中(右上),kafka的consumer消费到了supernova.log文件中的数据,在bigdata14中,可以看到将数据传至ElasticSearch的数据(左下)

【ElasticSeach结果】

 可以看到Elastic集群中,产生了一个supernova的type(类似关系数据库中的table)

 【查看ElasticSearch数据】

 3.4.2 ElasticSearch检索

使用Junit单元测试的方法来编写测试方法,代码如下:

 EalsticSearch.java

package novaself;

import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.junit.Before;
import org.junit.Test;

import java.net.InetAddress;
import java.util.Iterator;

/**
 * @author Supernova
 * @date 2018/06/22
 */
public class ElasticSearch  {

    private Client client;

    /**
     * 获取客户端
     */
    @Before
    public void getClient() throws Exception {
        // ElasticSearch服务默认端口9300
        Settings settings = Settings.settingsBuilder()
                .put("cluster.name", "bigdata").build();
        client = TransportClient.builder().settings(settings).build()
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata12"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata14"), 9300))
                .addTransportAddress(new InetSocketTransportAddress(
                        InetAddress.getByName("bigdata15"), 9300));
    }

    /**
     * 词条查询: 用户名中有"新"字的数据
     */
    @Test
    public void testTermQuery(){
        /*
         * termQuery词条查询: 只匹配指定字段中含有该词条的文档
         * 查询user字段为超新星的记录
         */
        SearchResponse response = client.prepareSearch("supernova")
                .setTypes("supernova")
                .setQuery(QueryBuilders.termQuery("user","新"))
                .get();

        // 获取结果集对象、命中数
        SearchHits hits = response.getHits();
        // 使用迭代器遍历数据
        Iterator<SearchHit> iter = hits.iterator();
        while(iter.hasNext()){
            SearchHit hit = iter.next();
            // 以Json格式输出
            String result = hit.getSourceAsString();
            System.out.println(result);
        }

        //关闭客户端
        client.close();
    }
    /**
     * 模糊查询: 星期四的数据
     */
    @Test
    public void testWildcardQuery() throws Exception{
        /*
         * wildcardQuery模糊查询,time字段中包含"四"的数据
         */
        SearchResponse response = client.prepareSearch("supernova")
                .setTypes("supernova")
                .setQuery(QueryBuilders.wildcardQuery("time","四"))
                .get();

        // 获取结果集对象、命中数
        SearchHits hits = response.getHits();
        // 使用迭代器遍历数据
        Iterator<SearchHit> iter = hits.iterator();
        while(iter.hasNext()){
            SearchHit hit = iter.next();
            // 以Json格式输出
            String result = hit.getSourceAsString();
            System.out.println(result);
        }

        //关闭客户端
        client.close();
    }
}

【检索结果】:

词条查询:testTermQuery( )方法的运行结果:

 模糊查询:testWildcardQuery ( )方法的运行结果:

原文地址:https://www.cnblogs.com/snova/p/9203425.html

时间: 2024-11-05 11:46:58

大数据实战之Logstash采集->Kafka->ElasticSearch检索的相关文章

【大数据实战】Logstash采集-&gt;Kafka-&gt;ElasticSearch检索

[大数据实战]Logstash采集->Kafka->ElasticSearch检索 https://www.cnblogs.com/snova/p/9203425.html 原文地址:https://www.cnblogs.com/chengjun/p/9280514.html

《OD大数据实战》驴妈妈旅游网大型离线数据电商分析平台

一.环境搭建 1. <OD大数据实战>Hadoop伪分布式环境搭建 2. <OD大数据实战>Hive环境搭建 3. <OD大数据实战>Sqoop入门实例 4. <OD大数据实战>Flume入门实例 5. <OD大数据实战>Kafka入门实例 6. <OD大数据实战>Oozie环境搭建 7. <OD大数据实战>HBase环境搭建 二.

下载大数据实战课程第一季Python基础和网络爬虫数据分析

python语言近年来越来越被程序相关人员喜欢和使用,因为其不仅简单容易学习和掌握,而且还有丰富的第三方程序库和相应完善的管理工具:从命令行脚本程序到gui程序,从B/S到C/S,从图形技术到科学计算,软件开发到自动化测试,从云计算到虚拟化,所有这些领域都有python的身影:python已经深入到程序开发的各个领域,并且会越来越多的人学习和使用. 大数据实战课程第一季Python基础和网络爬虫数据分析,刚刚入手,转一注册文件,视频的确不错,可以先下载看看:链接:http://pan.baidu

王家林亲授《DT大数据梦工厂》大数据实战视频 Scala 深入浅出实战经典 - 第71讲

王家林亲授<DT大数据梦工厂>大数据实战视频 Scala 深入浅出实战经典(1-71讲)完整视频.PPT.代码下载:百度云盘:http://pan.baidu.com/s/1c0noOt6 腾讯微云:http://url.cn/TnGbdC 360云盘:http://yunpan.cn/cQ4c2UALDjSKy 访问密码 45e2 技术爱好者尤其是大数据爱好者 可以加DT大数据梦工厂的qq群 DT大数据梦工厂① :462923555 DT大数据梦工厂②:437123764 DT大数据梦工厂③

《OD大数据实战》hive环境搭建

一.搭建hadoop环境 <OD大数据实战>hadoop伪分布式环境搭建 二.hive环境搭建 1. 准备安装文件 下载地址: http://archive.cloudera.com/cdh5/cdh/5/ hive-0.13.1-cdh5.3.6.tar.gz 2. 解压 tar -zxvf hive-0.13.1-cdh5.3.6.tar.gz -C /opt/modules/cdh/ 3. 修改配置 cd /opt/modules/cdh/hive-0.13.1-cdh5.3.6/con

SparkSQL大数据实战:揭开Join的神秘面纱

本文来自 网易云社区 . Join操作是数据库和大数据计算中的高级特性,大多数场景都需要进行复杂的Join操作,本文从原理层面介绍了SparkSQL支持的常见Join算法及其适用场景. Join背景介绍 Join是数据库查询永远绕不开的话题,传统查询SQL技术总体可以分为简单操作(过滤操作-where.排序操作-limit等),聚合操作-groupby以及Join操作等.其中Join操作是最复杂.代价最大的操作类型,也是OLAP场景中使用相对较多的操作.因此很有必要对其进行深入研究. 另外,从业

Kylin大数据 实战 教程

Kylin大数据实战教程链接:https://pan.baidu.com/s/17vuLNQDjBGUirQV_IdXgfg提取码:bj4b 复制这段内容后打开百度网盘手机App,操作更方便哦课程学习地址:https://www.xuetuwuyou.com/course/316请添加链接描述课程出自学途无忧网:www.xuetuwuyou.com咨询QQ:2591905126 本课程为专题课,通过全面讲解Kylin架构原理.分布式集群搭建以及项目案例,让你快速掌握Kylin实时大数据BI技术,

数据工程师必须掌握的7个大数据实战项目

简介: 值得收藏,数据工程师必须掌握的7个大数据实战项目 原创: Lenis 有关SQL 1作为一名电影爱好者,我阅片无数,有些片子还经常翻来覆去看个好几遍.小时候因为这事儿,没少被我妈抓耳朵,“看过的片子为啥还要倒二遍?”我也说不上来,就是单纯的爱看. 男人爱看的电影,以武侠,动作,科技为多,也认识了一帮明星,比如尼古拉斯凯奇,史泰龙,李小龙,成龙,李连杰,甄子丹等等.这些人很猛,有男人气.只要是他们的片儿,肯定不落下.在我眼里,他们就是好片代名词. 不知几何时,电影上开始出现一些不认识的男明

大数据实战项目必备技能三:storm

导读: Storm是一个分布式计算框架,主要使用Clojure与Java语言编写,最初是由Nathan Marz带领Backtype公司团队创建,在Backtype公司被Twitter公司收购后进行开源.最初的版本是在2011年9月17日发行,版本号0.5.0. 2013年9月,Apache基金会开始接管并孵化Storm项目.Apache Storm是在Eclipse Public License下进行开发的,它提供给大多数企业使用.经过1年多时间,2014年9月,Storm项目成为Apache