Elasticsearch(二)

一、Java API操作

Elasticsearch的Java客户端非常强大;它可以建立一个嵌入式实例并在必要时运行管理任务

运行一个Java应用程序和Elasticsearch时,有两种操作模式可供使用。该应用程序可在Elasticsearch集群中扮演更加主动或更加被动的角色。在更加主动的情况下(称为Node Client),应用程序实例将从集群接收请求,确定哪个节点应处理该请求,就像正常节点所做的一样。(应用程序甚至可以托管索引和处理请求。)另一种模式称为Transport Client,它将所有请求都转发到另一个Elasticsearch节点,由后者来确定最终目标

1. API基本操作

1.1 操作环境准备

1)创建maven工程

2)添加pom文件


<dependencies>

<dependency>

<groupId>junit</groupId>

<artifactId>junit</artifactId>

<version>4.10</version>

<scope>test</scope>

</dependency>

<dependency>

<groupId>org.elasticsearch</groupId>

<artifactId>elasticsearch</artifactId>

<version>6.1.1</version>

</dependency>

<dependency>

<groupId>org.elasticsearch.client</groupId>

<artifactId>transport</artifactId>

<version>6.1.1</version>

</dependency>

<dependency>

<groupId>org.apache.logging.log4j</groupId>

<artifactId>log4j-core</artifactId>

<version>2.9.0</version>

</dependency>

</dependencies>


3)等待依赖的jar包下载完成

当直接在ElasticSearch 建立文档对象时,如果索引不存在的,默认会自动创建,映射采用默认方式

1.2 获取Transport Client

(1)ElasticSearch服务默认端口9300

(2)Web管理平台端口9200


private TransportClient client;

@SuppressWarnings("unchecked")

@Before

public void getClient() throws Exception {

// 1 设置连接的集群名称

Settings settings = Settings.builder().put("cluster.name", "my-application").build();

// 2 连接集群

client = new PreBuiltTransportClient(settings);

client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("hsiehchou121"), 9300));

// 3 打印集群名称

System.out.println(client.toString());

}


(3)显示log4j2报错,在resource目录下创建一个文件命名为log4j2.xml并添加如下内容


<?xml version="1.0" encoding="UTF-8"?>

<Configuration status="warn">

<Appenders>

<Console name="Console" target="SYSTEM_OUT">

<PatternLayout pattern="%m%n"/>

</Console>

</Appenders>

<Loggers>

<Root level="INFO">

<AppenderRef ref="Console"/>

</Root>

</Loggers>

</Configuration>


1.3 创建索引

源代码


@Test

public void createIndex_blog(){

// 1 创建索引

client.admin().indices().prepareCreate("blog2").get();

// 2 关闭连接

client.close();

}


1.4 删除索引

源代码


@Test

public void deleteIndex(){

// 1 删除索引

client.admin().indices().prepareDelete("blog2").get();

// 2 关闭连接

client.close();

}


1.5 新建文档(源数据json串)

当直接在ElasticSearch建立文档对象时,如果索引不存在的,默认会自动创建,映射采用默认方式

源代码


@Test

public void createIndexByJson() throws UnknownHostException {

// 1 文档数据准备

String json = "{" + "\"id\":\"1\"," + "\"title\":\"基于Lucene的搜索服务器\","

+ "\"content\":\"它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口\"" + "}";

// 2 创建文档

IndexResponse indexResponse = client.prepareIndex("blog", "article", "1").setSource(json).execute().actionGet();

// 3 打印返回的结果

System.out.println("index:" + indexResponse.getIndex());

System.out.println("type:" + indexResponse.getType());

System.out.println("id:" + indexResponse.getId());

System.out.println("version:" + indexResponse.getVersion());

System.out.println("result:" + indexResponse.getResult());

// 4 关闭连接

client.close();

}


1.6 新建文档(源数据map方式添加json)

源代码

Test

public void createIndexByMap() {


    // 1 文档数据准备

Map<String, Object> json = new HashMap<String, Object>();

json.put("id", "2");

json.put("title", "基于Lucene的搜索服务器");

json.put("content", "它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口");

// 2 创建文档

IndexResponse indexResponse = client.prepareIndex("blog", "article", "2").setSource(json).execute().actionGet();

// 3 打印返回的结果

System.out.println("index:" + indexResponse.getIndex());

System.out.println("type:" + indexResponse.getType());

System.out.println("id:" + indexResponse.getId());

System.out.println("version:" + indexResponse.getVersion());

System.out.println("result:" + indexResponse.getResult());

// 4 关闭连接

client.close();

}


1.7 新建文档(源数据es构建器添加json)

源代码


@Test

public void createIndex() throws Exception {

// 1 通过es自带的帮助类,构建json数据

XContentBuilder builder = XContentFactory.jsonBuilder().startObject().field("id", 3).field("title", "基于Lucene的搜索服务器").field("content", "它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。")

.endObject();

// 2 创建文档

IndexResponse indexResponse = client.prepareIndex("blog", "article", "3").setSource(builder).get();

// 3 打印返回的结果

System.out.println("index:" + indexResponse.getIndex());

System.out.println("type:" + indexResponse.getType());

System.out.println("id:" + indexResponse.getId());

System.out.println("version:" + indexResponse.getVersion());

System.out.println("result:" + indexResponse.getResult());

// 4 关闭连接

client.close();

}


1.8 搜索文档数据(单个索引)

源代码

@Test

public void getData() throws Exception {


    // 1 查询文档

GetResponse response = client.prepareGet("blog", "article", "1").get();

// 2 打印搜索的结果

System.out.println(response.getSourceAsString());

// 3 关闭连接

client.close();

}


1.9 搜索文档数据(多个索引)

源代码


@Test

public void getMultiData() {

// 1 查询多个文档

MultiGetResponse response = client.prepareMultiGet().add("blog", "article", "1").add("blog", "article", "2", "3").add("blog", "article", "2").get();

// 2 遍历返回的结果

for(MultiGetItemResponse itemResponse:response){

GetResponse getResponse = itemResponse.getResponse();

// 如果获取到查询结果

if (getResponse.isExists()) {

String sourceAsString = getResponse.getSourceAsString();

System.out.println(sourceAsString);

}

}

// 3 关闭资源

client.close();

}


1.10 更新文档数据(update)

源代码


@Test

public void updateData() throws Throwable {

// 1 创建更新数据的请求对象

UpdateRequest updateRequest = new UpdateRequest();

updateRequest.index("blog");

updateRequest.type("article");

updateRequest.id("3");

updateRequest.doc(XContentFactory.jsonBuilder().startObject()

// 对没有的字段添加, 对已有的字段替换

.field("title", "基于Lucene的搜索服务器")

.field("content","它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。大数据前景无限")

.field("createDate", "2017-8-22").endObject());

// 2 获取更新后的值

UpdateResponse indexResponse = client.update(updateRequest).get();

// 3 打印返回的结果

System.out.println("index:" + indexResponse.getIndex());

System.out.println("type:" + indexResponse.getType());

System.out.println("id:" + indexResponse.getId());

System.out.println("version:" + indexResponse.getVersion());

System.out.println("create:" + indexResponse.getResult());

// 4 关闭连接

client.close();

}


1.11 更新文档数据(upsert)

设置查询条件, 查找不到则添加IndexRequest内容,查找到则按照UpdateRequest更新


@Test

public void testUpsert() throws Exception {

// 设置查询条件, 查找不到则添加

IndexRequest indexRequest = new IndexRequest("blog", "article", "5")

.source(XContentFactory.jsonBuilder().startObject().field("title", "搜索服务器").field("content","它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条款下的开放源码发布,是当前流行的企业级搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。").endObject());

// 设置更新, 查找到更新下面的设置

UpdateRequest upsert = new UpdateRequest("blog", "article", "5")

.doc(XContentFactory.jsonBuilder().startObject().field("user", "李四").endObject()).upsert(indexRequest);

client.update(upsert).get();

client.close();

}


1.12 删除文档数据(prepareDelete)

源代码


@Test

public void deleteData() {

// 1 删除文档数据

DeleteResponse indexResponse = client.prepareDelete("blog", "article", "5").get();

// 2 打印返回的结果

System.out.println("index:" + indexResponse.getIndex());

System.out.println("type:" + indexResponse.getType());

System.out.println("id:" + indexResponse.getId());

System.out.println("version:" + indexResponse.getVersion());

System.out.println("found:" + indexResponse.getResult());

// 3 关闭连接

client.close();

}


2. 条件查询QueryBuilder

2.1 查询所有(matchAllQuery)

源代码


@Test

public void matchAllQuery() {

// 1 执行查询

SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")

.setQuery(QueryBuilders.matchAllQuery()).get();

// 2 打印查询结果

SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象

System.out.println("查询结果有:" + hits.getTotalHits() + "条");

for (SearchHit hit : hits) {

System.out.println(hit.getSourceAsString());//打印出每条结果

}

// 3 关闭连接

client.close();

}


2.2 对所有字段分词查询(queryStringQuery)

源代码


@Test

public void query() {

// 1 条件查询

SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")

.setQuery(QueryBuilders.queryStringQuery("全文")).get();

// 2 打印查询结果

SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象

System.out.println("查询结果有:" + hits.getTotalHits() + "条");

for (SearchHit hit : hits) {

System.out.println(hit.getSourceAsString());//打印出每条结果

}

// 3 关闭连接

client.close();

}


2.3 通配符查询(wildcardQuery)

  • :表示多个字符(0个或多个字符)

    ?:表示单个字符

    源代码


@Test

public void wildcardQuery() {

// 1 通配符查询

SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")

.setQuery(QueryBuilders.wildcardQuery("content", "*全*")).get();

// 2 打印查询结果

SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象

System.out.println("查询结果有:" + hits.getTotalHits() + "条");

for (SearchHit hit : hits) {

System.out.println(hit.getSourceAsString());//打印出每条结果

}

// 3 关闭连接

client.close();

}


2.4 词条查询(TermQuery)

源代码


@Test

public void termQuery() {

// 1 第一field查询

SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")

.setQuery(QueryBuilders.termQuery("content", "全文")).get();

// 2 打印查询结果

SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象

System.out.println("查询结果有:" + hits.getTotalHits() + "条");

for (SearchHit hit : hits) {

System.out.println(hit.getSourceAsString());//打印出每条结果

}

// 3 关闭连接

client.close();

}


2.5 模糊查询(fuzzy)

源代码


@Test

public void fuzzy() {

// 1 模糊查询

SearchResponse searchResponse = client.prepareSearch("blog").setTypes("article")

.setQuery(QueryBuilders.fuzzyQuery("title", "lucene")).get();

// 2 打印查询结果

SearchHits hits = searchResponse.getHits(); // 获取命中次数,查询结果有多少对象

System.out.println("查询结果有:" + hits.getTotalHits() + "条");

Iterator<SearchHit> iterator = hits.iterator();

while (iterator.hasNext()) {

SearchHit searchHit = iterator.next(); // 每个查询对象

System.out.println(searchHit.getSourceAsString()); // 获取字符串格式打印

}

// 3 关闭连接

client.close();

}


3. 映射相关操作

源代码


@Test

public void createMapping() throws Exception {

// 1设置mapping

XContentBuilder builder = XContentFactory.jsonBuilder()

.startObject()

.startObject("article")

.startObject("properties")

.startObject("id1")

.field("type", "string")

.field("store", "yes")

.endObject()

.startObject("title2")

.field("type", "string")

.field("store", "no")

.endObject()

.startObject("content")

.field("type", "string")

.field("store", "yes")

.endObject()

.endObject()

.endObject()

.endObject();

// 2 添加mapping

PutMappingRequest mapping = Requests.putMappingRequest("blog4").type("article").source(builder);

client.admin().indices().putMapping(mapping).get();

// 3 关闭资源

client.close();

}


二、IK分词器

针对词条查询(TermQuery),查看默认中文分词器的效果:

curl -XGET ‘http://hsiehchou:9200/_analyze?pretty&analyzer=standard’ -d ‘中华人民共和国’

{

“tokens” : [

{

“token” : “中”,

“start_offset” : 0,

“end_offset” : 1,

“type” : “”,

“position” : 0

},

{

“token” : “华”,

“start_offset” : 1,

“end_offset” : 2,

“type” : “”,

“position” : 1

},

{

“token” : “人”,

“start_offset” : 2,

“end_offset” : 3,

“type” : “”,

“position” : 2

},

{

“token” : “民”,

“start_offset” : 3,

“end_offset” : 4,

“type” : “”,

“position” : 3

},

{

“token” : “共”,

“start_offset” : 4,

“end_offset” : 5,

“type” : “”,

“position” : 4

},

{

“token” : “和”,

“start_offset” : 5,

“end_offset” : 6,

“type” : “”,

“position” : 5

},

{

“token” : “国”,

“start_offset” : 6,

“end_offset” : 7,

“type” : “”,

“position” : 6

}

]

}

1. IK分词器的安装

1.1 前期准备工作

1)CentOS联网

配置CentOS能连接外网。Linux虚拟机ping www.baidu.com 是畅通的

2)jar包准备

(1)elasticsearch-analysis-ik-master.zip

(下载地址:https://github.com/medcl/elasticsearch-analysis-ik)

(2)apache-maven-3.0.5-bin.tar.gz

1.2 jar包安装

1)Maven解压、配置 MAVEN_HOME和PATH。

tar -zxvf apache-maven-3.0.5-bin.tar.gz -C /opt/module/

sudo vi /etc/profile


#MAVEN_HOME

export MAVEN_HOME=/opt/module/apache-maven-3.0.5

export PATH=$PATH:$MAVEN_HOME/bin


source /etc/profile

验证命令:mvn -version

2)Ik分词器解压、打包与配置

ik分词器解压

unzip elasticsearch-analysis-ik-master.zip -d ./

进入ik分词器所在目录

cd elasticsearch-analysis-ik-master

使用maven进行打包

mvn package -Pdist,native -DskipTests -Dtar

打包完成之后,会出现 target/releases/elasticsearch-analysis-ik-{version}.zip

pwd /opt/software/elasticsearch-analysis-ik-master/target/releases

对zip文件进行解压,并将解压完成之后的文件拷贝到es所在目录下的/plugins/

unzip elasticsearch-analysis-ik-6.0.0.zip

cp -r elasticsearch /opt/module/elasticsearch-6.1.1/plugins/

需要修改plugin-descriptor.properties文件,将其中的es版本号改为你所使用的版本号,即完成ik分词器的安装

vi plugin-descriptor.properties

71行

修改为

elasticsearch.version=6.1.1

至此,安装完成,重启ES!

注意:需选择与es相同版本的ik分词器。

安装方法(2种):

1.

./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.1.1/elasticsearch-analysis-ik-6.1.1.zip

2.

cp elasticsearch-analysis-ik-6.1.1.zip ./elasticsearch-6.1.1/plugins/

unzip elasticsearch-analysis-ik-6.1.1.zip -d ik-analyzer

3、elasticsearch-plugin install -f file:///usr/local/elasticsearch-analysis-ik-6.1.1.zip

2. IK分词器的使用

2.1 命令行查看结果

ik_smart模式

[itstar@hadoop102 elasticsearch]$ curl -XGET ‘http://hadoop104:9200/_analyze?pretty&analyzer=ik_smart’ -d ‘中华人民共和国’

curl -H “Content-Type:application/json” -XGET ‘http://192.168.116.121:9200/_analyze?pretty’ -d ‘{“analyzer”:”ik_smasysctl -prt”,”text”:”中华人民共和国”}’

{

“tokens” : [

{

“token” : “中华人民共和国”,

“start_offset” : 0,

“end_offset” : 7,

“type” : “CN_WORD”,

“position” : 0

}

]

}

ik_max_word模式

[itstar@hadoop102 elasticsearch]$ curl -XGET ‘http://hadoop121:9200/_analyze?pretty&analyzer=ik_max_word’ -d ‘中华人民共和国’

curl -H “Content-Type:application/json” -XGET ‘http://192.168.116.124:9200/_analyze?pretty’ -d ‘{“analyzer”:”ik_max_word”,”text”:”中华人民共和国”}’

{

“tokens” : [

{

“token” : “中华人民共和国”,

“start_offset” : 0,

“end_offset” : 7,

“type” : “CN_WORD”,

“position” : 0

},

{

“token” : “中华人民”,

“start_offset” : 0,

“end_offset” : 4,

“type” : “CN_WORD”,

“position” : 1

},

{

“token” : “中华”,

“start_offset” : 0,

“end_offset” : 2,

“type” : “CN_WORD”,

“position” : 2

},

{

“token” : “华人”,

“start_offset” : 1,

“end_offset” : 3,

“type” : “CN_WORD”,

“position” : 3

},

{

“token” : “人民共和国”,

“start_offset” : 2,

“end_offset” : 7,

“type” : “CN_WORD”,

“position” : 4

},

{

“token” : “人民”,

“start_offset” : 2,

“end_offset” : 4,

“type” : “CN_WORD”,

“position” : 5

},

{

“token” : “共和国”,

“start_offset” : 4,

“end_offset” : 7,

“type” : “CN_WORD”,

“position” : 6

},

{

“token” : “共和”,

“start_offset” : 4,

“end_offset” : 6,

“type” : “CN_WORD”,

“position” : 7

},

{

“token” : “国”,

“start_offset” : 6,

“end_offset” : 7,

“type” : “CN_CHAR”,

“position” : 8

}

]

}

2.2 JavaAPI操作

1)创建索引

//创建索引(数据库)


@Test

public void createIndex() {

//创建索引

client.admin().indices().prepareCreate("blog4").get();

//关闭资源

client.close();

}


2)创建mapping

//创建使用ik分词器的mapping


 @Test

public void createMapping() throws Exception {

// 1设置mapping

XContentBuilder builder = XContentFactory.jsonBuilder()

.startObject()

.startObject("article")

.startObject("properties")

.startObject("id1")

.field("type", "string")

.field("store", "yes")

.field("analyzer","ik_smart")

.endObject()

.startObject("title2")

.field("type", "string")

.field("store", "no")

.field("analyzer","ik_smart")

.endObject()

.startObject("content")

.field("type", "string")

.field("store", "yes")

.field("analyzer","ik_smart")

.endObject()

.endObject()

.endObject()

.endObject();

// 2 添加mapping

PutMappingRequest mapping = Requests.putMappingRequest("blog4").type("article").source(builder);

client.admin().indices().putMapping(mapping).get();

// 3 关闭资源

client.close();

}


3)插入数据

//创建文档,以map形式


@Test

public void createDocumentByMap() {

HashMap<String, String> map = new HashMap<>();

map.put("id1", "2");

map.put("title2", "Lucene");

map.put("content", "它提供了一个分布式的web接口");

IndexResponse response = client.prepareIndex("blog4", "article", "3").setSource(map).execute().actionGet();

//打印返回的结果

System.out.println("结果:" + response.getResult());

System.out.println("id:" + response.getId());

System.out.println("index:" + response.getIndex());

System.out.println("type:" + response.getType());

System.out.println("版本:" + response.getVersion());

//关闭资源

client.close();

}


4) 词条查询

//词条查询


@Test

public void queryTerm() {

SearchResponse response = client.prepareSearch("blog4").setTypes("article").setQuery(QueryBuilders.termQuery("content","提供")).get();

//获取查询命中结果

SearchHits hits = response.getHits();

System.out.println("结果条数:" + hits.getTotalHits());

for (SearchHit hit : hits) {

System.out.println(hit.getSourceAsString());

}

}


Store 的解释:

官方文档说 store 默认是 no ,想当然的理解为也就是说这个 field 是不会 store 的,但是查询的时候也能查询出来。

经过查找资料了解到原来 store 的意思是,是否在 _source 之外在独立存储一份。这里要说一下 _source 这是源文档,当索引数据的时候, elasticsearch 会保存一份源文档到 _source 。如果文档的某一字段设置了 store 为 yes (默认为 no),这时候会在 _source 存储之外再为这个字段独立进行存储,这么做的目的主要是针对内容比较多的字段。

如果放到 _source 返回的话,因为_source 是把所有字段保存为一份文档,命中后读取只需要一次 IO,包含内容特别多的字段会很占带宽影响性能。通常我们也不需要完整的内容返回(可能只关心摘要),这时候就没必要放到 _source 里一起返回了(当然也可以在查询时指定返回字段)。

三、Logstash

1. Logstash简介

Logstash is a tool for managing events and logs. You can use it to collect logs, parse them, and store them for later use (like, for searching).

logstash是一个数据分析软件,主要目的是分析log日志。整一套软件可以当作一个MVC模型,logstash是controller层,Elasticsearch是一个model层,kibana是view层。

首先将数据传给logstash,它将数据进行过滤和格式化(转成JSON格式),然后传给Elasticsearch进行存储、建搜索的索引,kibana提供前端的页面再进行搜索和图表可视化,它是调用Elasticsearch的接口返回的数据进行可视化。logstash和Elasticsearch是用Java写的,kibana使用node.js框架。

这个软件官网有很详细的使用说明,https://www.elastic.co/,除了docs之外,还有视频教程。这篇博客集合了docs和视频里面一些比较重要的设置和使用。

2. Logstash 安装

直接下载官方发布的二进制包的,可以访问 https://www.elastic.co/downloads/logstash 页面找对应操作系统和版本,点击下载即可。

在终端中,像下面这样运行命令来启动 Logstash 进程:

输入(读取数据):file、es。 输出:file、es、kafka


bin/logstash -e ‘input{stdin{}}output{stdout{codec=>rubydebug}}‘


-f文件 -e命令 标准输入、输出(命令行)

注意:如果出现如下报错,请调高虚拟机内存容量

Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c5330000, 986513408, 0) failed; error=’Cannot allocate memory’ (errno=12)

然后你会发现终端在等待你的输入。没问题,敲入 Hello World,回车,

{

“@version” => “1”,

“host” => “*“,

“message” => “hello world”,

“@timestamp” => 2019-03-18T02:51:18.578Z

}

每位系统管理员都肯定写过很多类似这样的命令

cat randdata | awk ‘{print $2}’ | sort | uniq -c | tee sortdata

Logstash 就像管道符一样!

你输入(就像命令行的 cat )数据,然后处理过滤(就像 awk 或者 uniq 之类)数据,最后输出(就像 tee )到其他地方

3. Logstash 配置

3.1 input配置

读取文件(File)


input {

file {

path => ["/var/log/*.log", "/var/log/message"]

type => "system"

start_position => "beginning"

}

}

output{stdout{codec=>rubydebug}}


有一些比较有用的配置项,可以用来指定 FileWatch 库的行为

discover_interval

logstash 每隔多久去检查一次被监听的 path 下是否有新文件。默认值是 15 秒

exclude

不想被监听的文件可以排除出去,这里跟 path 一样支持 glob 展开

close_older

一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是 3600 秒,即一小时

ignore_older

在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是 86400 秒,即一天

sincedb_path

如果你不想用默认的 $HOME/.sincedb(Windows 平台上在 C:\Windows\System32\config\systemprofile.sincedb),可以通过这个配置定义 sincedb 文件到其他位置

sincedb_write_interval

logstash 每隔多久写一次 sincedb 文件,默认是 15 秒

stat_interval

logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒

start_position

logstash 从什么位置开始读取文件数据,默认是结束位置,也就是说 logstash 进程会以类似 tail -F 的形式运行。如果你是要导入原有数据,把这个设定改成 “beginning”,logstash 进程就从头开始读取,类似 less +F 的形式运行

启动命令:../bin/logstash -f ./input_file.conf

测试命令:echo ‘hehe’ >> test.log

echo ‘hehe2’ >> message

标准输入(Stdin)

我们已经见过好几个示例使用 stdin 了。这也应该是 logstash 里最简单和基础的插件了


input {

stdin {

add_field => {"key" => "value"}

codec => "plain"

tags => ["add"]

type => "std"

}

}

output{stdout{codec=>rubydebug}}


用上面的新 stdin 设置重新运行一次最开始的 hello world 示例。我建议大家把整段配置都写入一个文本文件,然后运行命令:../bin/logstash -f ./input_stdin.conf。输入 “hello world” 并回车后,你会在终端看到如下输出


{

"message" => "hello world",

"@version" => "1",

"@timestamp" => "2014-08-08T06:48:47.789Z",

"type" => "std",

"tags" => [

[0] "add"

],

"key" => "value",

"host" => "raochenlindeMacBook-Air.local"

}


解释

type 和 tags 是 logstash 事件中两个特殊的字段。通常来说我们会在输入区段中通过 type 来标记事件类型。而 tags 则是在数据处理过程中,由具体的插件来添加或者删除的

最常见的用法是像下面这样


input {

stdin {

type => "web"

}

}

filter {

if [type] == "web" {

grok {

match => ["message", %{COMBINEDAPACHELOG}]

}

}

}

output {

if "_grokparsefailure" in [tags] {

nagios_nsca {

nagios_status => "1"

}

} else {

elasticsearch {

}

}

}


3.2 codec配置

Codec 是 logstash 从 1.3.0 版开始新引入的概念(Codec 来自 Coder/decoder 两个单词的首字母缩写)

在此之前,logstash 只支持纯文本形式输入,然后以过滤器处理它。但现在,我们可以在输入期处理不同类型的数据,这全是因为有了 codec 设置

所以,这里需要纠正之前的一个概念。Logstash 不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的

codec 的引入,使得 logstash 可以更好更方便的与其他有自定义数据格式的运维产品共存,比如 graphite、fluent、netflow、collectd,以及使用 msgpack、json、edn 等通用数据格式的其他产品等

事实上,我们在第一个 “hello world” 用例中就已经用过 codec 了 —— rubydebug 就是一种 codec!虽然它一般只会用在 stdout 插件中,作为配置测试或者调试的工具

采用 JSON 编码

在早期的版本中,有一种降低 logstash 过滤器的 CPU 负载消耗的做法盛行于社区(在当时的 cookbook 上有专门的一节介绍):直接输入预定义好的 JSON 数据,这样就可以省略掉 filter/grok 配置!

这个建议依然有效,不过在当前版本中需要稍微做一点配置变动 —— 因为现在有专门的 codec 设置

配置示例


input {

stdin {

add_field => {"key" => "value"}

codec => "json"

type => "std"

}

}

output{stdout{codec=>rubydebug}}


输入:

{“simCar”:18074045598,”validityPeriod”:”1996-12-06”,”unitPrice”:9,”quantity”:19,”amount”:35,”imei”:887540376467915,”user”:”test”}

运行结果:

{

“imei” => 887540376467915,

“unitPrice” => 9,

“user” => “test”,

“@timestamp” => 2019-03-19T05:01:53.451Z,

“simCar” => 18074045598,

“host” => “zzc-203”,

“amount” => 35,

“@version” => “1”,

“key” => “value”,

“type” => “std”,

“validityPeriod” => “1996-12-06”,

“quantity” => 19

}

3.3 filter配置

Grok插件

logstash拥有丰富的filter插件,它们扩展了进入过滤器的原始数据,进行复杂的逻辑处理,甚至可以无中生有的添加新的 logstash 事件到后续的流程中去!Grok 是 Logstash 最重要的插件之一。也是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美

这个工具非常适用于系统日志,Apache和其他网络服务器日志,MySQL日志等。

配置:


input {

stdin {

type => "std"

}

}

filter {

grok {

match=>{"message"=> "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }

}

}

output{stdout{codec=>rubydebug}}


输入:55.3.244.1 GET /index.html 15824 0.043

输出:

{

“@version” => “1”,

“host” => “zzc-203”,

“request” => “/index.html”,

“bytes” => “15824”,

“duration” => “0.043”,

“method” => “GET”,

“@timestamp” => 2019-03-19T05:09:55.777Z,

“message” => “55.3.244.1 GET /index.html 15824 0.043”,

“type” => “std”,

“client” => “55.3.244.1”

}

grok模式的语法如下:

%{SYNTAX:SEMANTIC}

SYNTAX:代表匹配值的类型,例如3.44可以用NUMBER类型所匹配,127.0.0.1可以使用IP类型匹配。

SEMANTIC:代表存储该值的一个变量名称,例如 3.44 可能是一个事件的持续时间,127.0.0.1可能是请求的client地址。所以这两个值可以用 %{NUMBER:duration} %{IP:client} 来匹配。

你也可以选择将数据类型转换添加到Grok模式。默认情况下,所有语义都保存为字符串。如果您希望转换语义的数据类型,例如将字符串更改为整数,则将其后缀为目标数据类型。例如%{NUMBER:num:int}将num语义从一个字符串转换为一个整数。目前唯一支持的转换是int和float。

Logstash附带约120个模式。你可以在这里找到它们https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

自定义类型

更多时候logstash grok没办法提供你所需要的匹配类型,这个时候我们可以使用自定义。

创建自定义 patterns 文件。

①创建一个名为patterns其中创建一个文件postfix (文件名无关紧要,随便起),在该文件中,将需要的模式写为模式名称,空格,然后是该模式的正则表达式。例如:

POSTFIX_QUEUEID [0-9A-F]{10,11}

②然后使用这个插件中的patterns_dir设置告诉logstash目录是你的自定义模式。

配置:


input {

stdin {

type => "std"

}

}

filter {

grok {

patterns_dir => ["./patterns"]

match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }

}

}

output{stdout{codec=>rubydebug}}


输入:

Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1

输出:

{

“queue_id” => “BEF25A72965”,

“message” => “Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver1”,

“pid” => “21403”,

“program” => “postfix/cleanup”,

“@version” => “1”,

“type” => “std”,

“logsource” => “mailserver14”,

“host” => “zzc-203”,

“timestamp” => “Jan 1 06:25:43”,

“syslog_message” => “message-id=<20130101142543.5828399CCAF@mailserver1”,

“@timestamp” => 2019-03-19T05:31:37.405Z

}

GeoIP 地址查询归类

GeoIP 是最常见的免费 IP 地址归类查询库,同时也有收费版可以采购。GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。

配置:


input {

stdin {

type => "std"

}

}

filter {

geoip {

source => "message"

}

}

output{stdout{codec=>rubydebug}}


输入:183.60.92.253

输出:

{

“type” => “std”,

“@version” => “1”,

“@timestamp” => 2019-03-19T05:39:26.714Z,

“host” => “zzc-203”,

“message” => “183.60.92.253”,

“geoip” => {

“country_code3” => “CN”,

“latitude” => 23.1167,

“region_code” => “44”,

“region_name” => “Guangdong”,

“location” => {

“lon” => 113.25,

“lat” => 23.1167

},

“city_name” => “Guangzhou”,

“country_name” => “China”,

“continent_code” => “AS”,

“country_code2” => “CN”,

“timezone” => “Asia/Shanghai”,

“ip” => “183.60.92.253”,

“longitude” => 113.25

}

}

3.4 output配置

标准输出(Stdout)

保存成文件(File)

通过日志收集系统将分散在数百台服务器上的数据集中存储在某中心服务器上,这是运维最原始的需求。Logstash 当然也能做到这点。

和 LogStash::Inputs::File 不同, LogStash::Outputs::File 里可以使用 sprintf format 格式来自动定义输出到带日期命名的路径。

配置:


input {

stdin {

type => "std"

}

}

output {

file {

path => "../data_test/%{+yyyy}/%{+MM}/%{+dd}/%{host}.log"

codec => line { format => "custom format: %{message}"}

}

}


启动后输入,可看到文件

服务器间传输文件(File)

配置:

接收日志服务器配置:


input {

tcp {

mode => "server"

port => 9600

ssl_enable => false

}

}

filter {

json {

source => "message"

}

}

output {

file {

path => "/home/hduser/app/logstash-6.6.2/data_test/%{+YYYY-MM-dd}/%{servip}-%{filename}"

codec => line { format => "%{message}"}

}

}


发送日志服务器配置:


input{

file {

path => ["/home/hduser/app/logstash-6.6.2/data_test/send.log"]

type => "ecolog"

start_position => "beginning"

}

}

filter {

if [type] =~ /^ecolog/ {

ruby {

code => "file_name = event.get(‘path‘).split(‘/‘)[-1]

event.set(‘file_name‘,file_name)

event.set(‘servip‘,‘接收方ip‘)"

}

mutate {

rename => {"file_name" => "filename"}

}

}

}

output {

tcp {

host  => "接收方ip"

port  => 9600

codec => json_lines

}

}


从发送方发送message,接收方可以看到写出文件。

写入到ES

配置:


input {

stdin {

type => "log2es"

}

}

output {

elasticsearch {

hosts => ["192.168.109.133:9200"]

index => "logstash-%{type}-%{+YYYY.MM.dd}"

document_type => "%{type}"

sniffing => true

template_overwrite => true

}

}


在head插件中可以看到数据。

sniffing : 寻找其他es节点

实战举例:将错误日志写入es。

配置:


input {

file {

path => ["/usr/local/logstash-6.6.2/data_test/run_error.log"]

type => "error"

start_position => "beginning"

}

}

output {

elasticsearch {

hosts => ["192.168.109.133:9200"]

index => "logstash-%{type}-%{+YYYY.MM.dd}"

document_type => "%{type}"

sniffing => true

template_overwrite => true

}

}


四、Kibana

Kibana是一个开源的分析和可视化平台,设计用于和Elasticsearch一起工作。

你用Kibana来搜索,查看,并和存储在Elasticsearch索引中的数据进行交互。

你可以轻松地执行高级数据分析,并且以各种图标、表格和地图的形式可视化数据

Kibana使得理解大量数据变得很容易。它简单的、基于浏览器的界面使你能够快速创建和共享动态仪表板,实时显示Elasticsearch查询的变化

安装步骤:

解压:tar -zxvf kibana-6.6.2-linux-x86_64.tar.gz

修改 kibana.yml 配置文件:

server.port: 5601

server.host: “192.168.116.121” ———-部署kinana服务器的ip

elasticsearch.hosts: [“http://192.168.116.121:9200“]

kibana.index: “.kibana”

启动kibana,报错:

[error][status][plugin:remote_clusters@6.6.2] Status changed from red to red - X-Pack plugin is not installed on the [data] Elasticsearch cluster.

解决,卸载x-pack插件

elasticsearch-plugin remove x-pack

kibana-plugin remove x-pack

安装好后启动即可。页面操作

原文地址:https://www.cnblogs.com/hsiehchou/p/10587761.html

时间: 2024-11-06 15:35:13

Elasticsearch(二)的相关文章

ElasticSearch(二) 关于DSL

关于Lucene里面的查询评分,其实是基于一个公式:TF/ IDF(Term-Frequency/ Inverse Document Frequency),词频率/ 倒排文档频率,这个公式讲了一个故事,就是一个不具备区分度的词,就是它的在各个文档中都有出现(在每个文档中出现次数并不重要),那么这个词就不具备区分度,这个词的权重也就越低,这个就是倒排文档频率的概念. 关于查询改写 我们知道ES是基于Lucene的,对上提供了良好的接口和简易的DSL,但是其实es是做了解析的,其中一种解析是可以通过

ELKStack 实战之 Elasticsearch [一]

ELKStack 实战之 Elasticsearch [一] 标签(空格分隔): ELKStack ELKStack简介 ELK Stack 是 Elasticsearch.Logstash.Kibana 三个开源软件的组合.在实时数据检索和分析场合,三者通常是配合共用,而且又都先后归于 Elastic.co 公司名下,故有此简称. ELK Stack 在最近两年迅速崛起,成为机器数据分析,或者说实时日志处理领域,开源界的第一选择.和传统的日志处理方案相比,ELK Stack 具有如下几个优点:

Elasticsearch 搜索引擎

简介: Elasticsearch 是一个实时的分布式搜索和分析引擎.它可以帮助你用前所未有的速度去处理大规模数据.它可以用于全文搜索,结构化搜索以及分析.    分布式实时文件存储,并将每一个字段都编入索引,使其可以被搜索. 实时分析的分布式搜索引擎. 可以扩展到上百台服务器,处理PB级别的结构化或非结构化数据. 下载地址:https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/e

为Elasticsearch安装中文分词IK

注:Elasticsearch版本:1.4.2 一.安装与配置 1.从https://github.com/medcl/elasticsearch-analysis-ik下载elasticsearch-analysis-ik-master.zip 2.解压elasticsearch-analysis-ik-master.zip unzip elasticsearch-analysis-ik-master.zip 3.进入elasticsearch-analysis-ik-master,编译源码

centos从零开始安装elasticSearch

前言:elasticSearch作为一款优秀的分布式搜索工具,被广泛用在数据搜集和整理的业务中,知名的比如有github就是采用es来精准的搜索几千万行代码,百度也大量应用es做数据爬取分析,本篇博客就来探讨一下es如何安装.我选择的环境为centos6.5,之所以说是从零开始,是因为这个服务器是海外租来的,目前刚不久被我清零然后重新装了一遍系统,所以基本的运行环境都是么有的,废话不多说,start! 本篇博客的目录 一:下载ElasticSearch 二:安装java的基础环境jdk 三:启动

Elasticsearch入门教程(一):Elasticsearch及插件安装

原文:Elasticsearch入门教程(一):Elasticsearch及插件安装 版权声明:本文为博主原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明. 本文链接:https://blog.csdn.net/vbirdbest/article/details/79194244 分享一个朋友的人工智能教程(请以"右键"->"在新标签页中打开连接"的方式访问).比较通俗易懂,风趣幽默,感兴趣的朋友可以去看看. 一:安装Elasti

Elasticsearch分布式搜索

ElasticSearch之介绍 一 Elasticsearch产生背景 1.1 大规模数据如何检索 如:当系统数据量上了10亿.100亿条的时候,我们在做系统架构的时候通常会从以下角度去考虑问题:1)用什么数据库好?(mysql.oracle.mongodb.hbase…)2)如何解决单点故障:(lvs.F5.A10.Zookeep.MQ)3)如何保证数据安全性:(热备.冷备.异地多活)4)如何解决检索难题:(数据库代理中间件:mysql-proxy.Cobar.MaxScale等;)5)如何

x-pack

x-pack安装>官网安装步骤https://www.elastic.co/downloads/x-pack >x-pack简介X-Pack是一个Elastic Stack的扩展,将安全,警报,监视,报告和图形功能包含在一个易于安装的软件包中.在Elasticsearch 5.0.0之前,您必须安装单独的Shield,Watcher和Marvel插件才能获得在X-Pack中所有的功能 >思考题思考一:为什么要elasticksearch 和 kabana 都装x-pack? 思考二:每个

Windows下安装Elasticsearch6.4.1

1.安装jdk1.8(步骤略) 2.安装git(步骤略)3.安装nodejs(步骤略)4.下载elasticsearch6.4.1,将下载后的es解压,进入bin文件夹,执行elasticsearch.bat,正常情况es发布在9200端口,访问http://localhost:9200,若出现下图则说明es安装正常. 一.安装ElasticSearch 二.安装ElasticSearch-Head 下载es-head.git clone git://github.com/mobz/elasti

使用docker-compose 一键部署你的分布式调用链跟踪框架skywalking

原文:使用docker-compose 一键部署你的分布式调用链跟踪框架skywalking 一旦你的程序docker化之后,你会遇到各种问题,比如原来采用的本地记日志的方式就不再方便了,虽然你可以挂载到宿主机,但你使用 --scale 的话,会导致 记录日志异常,所以最好的方式还是要做日志中心化,另一个问题,原来一个请求在一个进程中的痉挛失败,你可以在日志中巡查出调用堆栈,但是docker化之后, 原来一个进程的东西会拆成几个微服务,这时候最好就要有一个分布式的调用链跟踪,类似于wcf中的sv