logstash安装配置

vim /usr/local/logstash/etc/hello_search.conf

输入下面:

input {
stdin {
type => "human"
}
}

output {
stdout {
codec => rubydebug
}

elasticsearch {
host => "192.168.33.10"
port => 9300
}
}

注意事项 port 为9300 而不是9200

启动:
/usr/local/logstash/bin/logstash agent -f /usr/local/logstash/etc/hello_search.conf

标准流方式启动
bin/logstash -e ‘input { stdin { } } output { stdout {} }‘
改变输出表现
bin/logstash -e ‘input { stdin { } } output { stdout { codec => rubydebug } }‘
多重输出
bin/logstash -e ‘input { stdin { } } output { elasticsearch { host => localhost } stdout { } }‘
默认配置 - 按照每日日期建立索引

你将发现Logstash可以足够灵巧的在Elasticsearch上建立索引... 每天会按照默认格式是logstash-YYYY.MM.DD来建立索引。在午夜(GMT),Logstash自动按照时间戳更新索引。我们可以根据追溯多长时间的数据作为依据来制定保持多少数据,当然你也可以把比较老的数据迁移到其他的地方(重新索引)来方便查询,此外如果仅仅是简单的删除一段时间数据我们可以使用Elasticsearch Curator。

事件的生命周期

Inputs,Outputs,Codecs,Filters构成了Logstash的核心配置项。Logstash通过建立一条事件处理的管道,从你的日志提取出数据保存到Elasticsearch中,为高效的查询数据提供基础。为了让你快速的了解Logstash提供的多种选项,让我们先讨论一下最常用的一些配置。更多的信息,请参考Logstash事件管道。
Inputs
input 及输入是指日志数据传输到Logstash中。其中常见的配置如下: file:从文件系统中读取一个文件,很像UNIX命令 "tail -0a"syslog:监听514端口,按照RFC3164标准解析日志数据redis:从redis服务器读取数据,支持channel(发布订阅)和list模式。redis一般在Logstash消费集群中作为"broker"角色,保存events队列共Logstash消费。lumberjack:使用lumberjack协议来接收数据,目前已经改为 logstash-forwarder。 Filters
Fillters 在Logstash处理链中担任中间处理组件。他们经常被组合起来实现一些特定的行为来,处理匹配特定规则的事件流。常见的filters如下: grok:解析无规则的文字并转化为有结构的格式。Grok 是目前最好的方式来将无结构的数据转换为有结构可查询的数据。有120多种匹配规则,会有一种满足你的需要。mutate:mutate filter 允许改变输入的文档,你可以从命名,删除,移动或者修改字段在处理事件的过程中。drop:丢弃一部分events不进行处理,例如:debug events。clone:拷贝 event,这个过程中也可以添加或移除字段。geoip:添加地理信息(为前台kibana图形化展示使用) Outputs
outputs是logstash处理管道的最末端组件。一个event可以在处理过程中经过多重输出,但是一旦所有的outputs都执行结束,这个event也就完成生命周期。一些常用的outputs包括: elasticsearch:如果你计划将高效的保存数据,并且能够方便和简单的进行查询...Elasticsearch是一个好的方式。是的,此处有做广告的嫌疑,呵呵。file:将event数据保存到文件中。graphite:将event数据发送到图形化组件中,一个很流行的开源存储图形化展示的组件。http://graphite.wikidot.com/。statsd:statsd是一个统计服务,比如技术和时间统计,通过udp通讯,聚合一个或者多个后台服务,如果你已经开始使用statsd,该选项对你应该很有用。 Codecs
codecs 是基于数据流的过滤器,它可以作为input,output的一部分配置。Codecs可以帮助你轻松的分割发送过来已经被序列化的数据。流行的codecs包括 json,msgpack,plain(text)。 json:使用json格式对数据进行编码/解码multiline:将汇多个事件中数据汇总为一个单一的行。比如:java异常信息和堆栈信息 获取完整的配置信息,请参考 Logstash文档中 "plugin configuration"部分。
更多有趣Logstash内容

使用配置文件

使用-e参数在命令行中指定配置是很常用的方式,不过如果需要配置更多设置则需要很长的内容。这种情况,我们首先创建一个简单的配置文件,并且指定logstash使用这个配置文件。如我们创建一个文件名是"logstash-simple.conf"的配置文件并且保存在和Logstash相同的目录中。内容如下:
input { stdin { } }
output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}
接下来,执行命令:
bin/logstash -f logstash-simple.conf
我们看到logstash按照你刚刚创建的配置文件来运行例子,这样更加的方便。注意,我们使用-f参数来从文件获取而代替之前使用-e参数从命令行中获取配置。以上演示非常简单的例子,当然解析来我们继续写一些复杂一些的例子。
过滤器

filters是一个行处理机制将提供的为格式化的数据整理成你需要的数据,让我们看看下面的一个例子,叫grok filter的过滤器。
input { stdin { } }

filter {
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}
执行Logstash按照如下参数:
bin/logstash -f logstash-filter.conf
现在粘贴下面一行信息到你的终端(当然Logstash就会处理这个标准的输入内容):
127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] "GET /xampp/status.php HTTP/1.1" 200 3891 "http://cadenza/xampp/navi.php" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0"
你将看到类似如下内容的反馈信息:
{
"message" => "127.0.0.1 - - [11/Dec/2013:00:01:45 -0800] \"GET /xampp/status.php HTTP/1.1\" 200 3891 \"http://cadenza/xampp/navi.php\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\"",
"@timestamp" => "2013-12-11T08:01:45.000Z",
"@version" => "1",
"host" => "cadenza",
"clientip" => "127.0.0.1",
"ident" => "-",
"auth" => "-",
"timestamp" => "11/Dec/2013:00:01:45 -0800",
"verb" => "GET",
"request" => "/xampp/status.php",
"httpversion" => "1.1",
"response" => "200",
"bytes" => "3891",
"referrer" => "\"http://cadenza/xampp/navi.php\"",
"agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:25.0) Gecko/20100101 Firefox/25.0\""
}
正像你看到的那样,Logstash(使用了grok过滤器)能够将一行的日志数据(Apache的"combined log"格式)分割设置为不同的数据字段。这一点对于日后解析和查询我们自己的日志数据非常有用。比如:HTTP的返回状态码,IP地址相关等等,非常的容易。很少有匹配规则没有被grok包含,所以如果你正尝试的解析一些常见的日志格式,或许已经有人为了做了这样的工作。如果查看详细匹配规则,参考logstash grok patterns。
另外一个过滤器是date filter。这个过滤器来负责解析出来日志中的时间戳并将值赋给timestame字段(不管这个数据是什么时候收集到logstash的)。你也许注意到在这个例子中@timestamp字段是设置成December 11, 2013, 说明logstash在日志产生之后一段时间进行处理的。这个字段在处理日志中回添到数据中的,举例来说... 这个值就是logstash处理event的时间戳。
实用的例子

Apache 日志(从文件获取)

现在,让我们使用一些非常实用的配置... apache2访问日志!我们将从本地读取日志文件,并且通过条件设置处理满足我们需要的event。首先,我们创建一个文件名是logstash-apache.conf的配置文件,内容如下(你可以根据实际情况修改你的文件名和路径):
input {
file {
path => "/tmp/access_log"
start_position => beginning
}
}

filter {
if [path] =~ "access" {
mutate { replace => { "type" => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
}

output {
elasticsearch {
host => localhost
}
stdout { codec => rubydebug }
}
接下来,我们按照上面的配置创建一个文件(在例子中是"/tmp/access.log"),可以将下面日志信息作为文件内容(也可以用你自己的webserver产生的日志):
71.141.244.242 - kurt [18/May/2011:01:48:10 -0700] "GET /admin HTTP/1.1" 301 566 "-" "Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.9.2.3) Gecko/20100401 Firefox/3.6.3"
134.39.72.245 - - [18/May/2011:12:40:18 -0700] "GET /favicon.ico HTTP/1.1" 200 1189 "-" "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 2.0.50727; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; InfoPath.2; .NET4.0C; .NET4.0E)"
98.83.179.51 - - [18/May/2011:19:35:08 -0700] "GET /css/main.css HTTP/1.1" 200 1837 "http://www.safesand.com/information.htm" "Mozilla/5.0 (Windows NT 6.0; WOW64; rv:2.0.1) Gecko/20100101 Firefox/4.0.1"
现在使用-f参数来执行一下上面的例子:
bin/logstash -f logstash-apache.conf
你可以看到apache的日志数据已经导入到ES中了。这里logstash会按照你的配置读取,处理指定的文件,任何后添加到文件的内容也会被捕获处理最后保存到ES中。此外,数据中type的字段值会被替换成"apache_access"(这个功能在配置中已经指定)。
这个配置只是让Logstash监控了apache access_log,但是在实际中往往并不够用可能还需要监控error_log,只要在上面的配置中改变一行既可以实现,如下:
input {
file {
path => "/tmp/*_log"
...

现在你可以看到logstash处理了error日志和access日志。然而,如果你检查了你的数据(也许用elasticsearch-kopf),你将发现access_log日志被分成不同的字段,但是error_log确没有这样。这是因为我们使用了“grok”filter并仅仅配置匹配combinedapachelog日志格式,这样满足条件的日志就会自动的被分割成不同的字段。我们可以通过控制日志按照它自己的某种格式来解析日志,不是很好的吗?对吧。
此外,你也许还会发现Logstash不会重复处理文件中已经处理过得events。因为Logstash已经记录了文件处理的位置,这样就只处理文件中新加入的行数。漂亮!
条件判断

我们利用上一个例子来介绍一下条件判断的概念。这个概念一般情况下应该被大多数的Logstash用户熟悉掌握。你可以像其他普通的编程语言一样来使用if,else if和else语句。让我们把每个event依赖的日志文件类型都标记出来(access_log,error_log其他以log结尾的日志文件)。
input {
file {
path => "/tmp/*_log"
}
}

filter {
if [path] =~ "access" {
mutate { replace => { type => "apache_access" } }
grok {
match => { "message" => "%{COMBINEDAPACHELOG}" }
}
date {
match => [ "timestamp" , "dd/MMM/yyyy:HH:mm:ss Z" ]
}
} else if [path] =~ "error" {
mutate { replace => { type => "apache_error" } }
} else {
mutate { replace => { type => "random_logs" } }
}
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

我想你已经注意到了,我们使用"type"字段来标记每个event,但是我们实际上没有解析"error"和”random"类型的日志... 而实际情况下可能会有很多很多类型的错误日志,如何解析就作为练习留给各位读者吧,你可以依赖已经存在的日志。
Syslog

Ok,现在我们继续了解一个很实用的例子:syslog。Syslog对于Logstash是一个很长用的配置,并且它有很好的表现(协议格式符合RFC3164)。Syslog实际上是UNIX的一个网络日志标准,由客户端发送日志数据到本地文件或者日志服务器。在这个例子中,你根本不用建立syslog实例;我们通过命令行就可以实现一个syslog服务,通过这个例子你将会看到发生什么。
首先,让我们创建一个简单的配置文件来实现logstash+syslog,文件名是 logstash-syslog.conf
input {
tcp {
port => 5000
type => syslog
}
udp {
port => 5000
type => syslog
}
}

filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
add_field => [ "received_at", "%{@timestamp}" ]
add_field => [ "received_from", "%{host}" ]
}
syslog_pri { }
date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}

output {
elasticsearch { host => localhost }
stdout { codec => rubydebug }
}

执行logstash:
bin/logstash -f logstash-syslog.conf

通常,需要一个客户端链接到Logstash服务器上的5000端口然后发送日志数据。在这个简单的演示中我们简单的使用telnet链接到logstash服务器发送日志数据(与之前例子中我们在命令行标准输入状态下发送日志数据类似)。首先我们打开一个新的shell窗口,然后输入下面的命令:
telnet localhost 5000
你可以复制粘贴下面的样例信息(当然也可以使用其他字符,不过这样可能会被grok filter不能正确的解析):
Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154]
Dec 23 14:42:56 louis named[16000]: client 199.48.164.7#64817: query (cache) ‘amsterdamboothuren.com/MX/IN‘ denied
Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)
Dec 22 18:28:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type ‘lightweight‘.

之后你可以在你之前运行Logstash的窗口中看到输出结果,信息被处理和解析!
{
"message" => "Dec 23 14:30:01 louis CRON[619]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"@timestamp" => "2013-12-23T22:30:01.000Z",
"@version" => "1",
"type" => "syslog",
"host" => "0:0:0:0:0:0:0:1:52617",
"syslog_timestamp" => "Dec 23 14:30:01",
"syslog_hostname" => "louis",
"syslog_program" => "CRON",
"syslog_pid" => "619",
"syslog_message" => "(www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log)",
"received_at" => "2013-12-23 22:49:22 UTC",
"received_from" => "0:0:0:0:0:0:0:1:52617",
"syslog_severity_code" => 5,
"syslog_facility_code" => 1,
"syslog_facility" => "user-level",
"syslog_severity" => "notice"
}

恭喜各位,看到这里你已经成为一个合格的Logstash用户了。你将可以轻松的配置,运行Logstash,还可以发送event给Logstash,但是这个过程随着使用还会有很多值得深挖的地方。

参考http://www.2cto.com/os/201411/352015.html

kafka配置 http://www.tuicool.com/articles/IRZvIz

http://my.oschina.net/abcfy2/blog/372138

中文文档资料 http://kibana.logstash.es/content/

时间: 2024-11-04 18:13:47

logstash安装配置的相关文章

ELK 架构之 Logstash 和 Filebeat 安装配置

上一篇:ELK 架构之 Elasticsearch 和 Kibana 安装配置 阅读目录: 1. 环境准备 2. 安装 Logstash 3. 配置 Logstash 4. Logstash 采集的日志数据,在 Kibana 中显示 5. 安装配置 Filebeat 6. Filebeat 采集的日志数据,在 Kibana 中显示 7. Filebeat 采集日志数据,Logstash 过滤 8. Filebeat 采集的日志数据,Logstash 过滤后,在 Kibana 中显示 上一篇主要说

ELK部署logstash安装部署及应用(二)

Logstash 安装部署注意事项: Logstash基本概念: logstash收集日志基本流程: input-->codec-->filter-->codec-->output input:从哪里收集日志. filter:发出去前进行过滤 output:输出至Elasticsearch或Redis消息队列 codec:输出至前台,方便边实践边测试 数据量不大日志按照月来进行收集 如果通过logstash来采集日志,那么每个客户端都需要安装logstash 安装需要前置系统环境

ELK的安装配置使用

ELK的安装配置 一.ES集群的安装: 搭建ElasticSearch集群: 使用三台服务器搭建集群 node-1(主节点) 10.170.13.1 node-2(从节点) 10.116.35.133 node-3(从节点) 10.44.79.57 下载安装包 地址:https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.4.3.rpm 在三台服务器上分别下载安装elasticsearch-5.4.3.rpm 安装

ELK日志分析系统 介绍 安装配置

ELK日志分析系统 一.ELK介绍 ELK顾名思义:是由Elasticsearch,Logstash 和 Kibana三部分组成的. 其中Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析.它是一个建立在全文搜索引擎 Apache Lucene 基础上的搜索引擎,使用 Java 语言编写.目前,最新的版本是 5.4. 主要特点 实时分析 分布式实时文件存储,并将每一个字段都编入索引 文档导向,所有的对象全部是文档 高可用性,易扩展,支持集群(Cl

elk集群安装配置详解

#  一:简介 ``` Elasticsearch作为日志的存储和索引平台: Kibana 用来从 Elasticsearch获取数据,进行数据可视化,定制数据报表: Logstash 依靠强大繁多的插件作为日志加工平台: Filebeat 用来放到各个主机中收集指定位置的日志,将收集到日志发送到 Logstash: Log4j 直接与 Logstash 连接,将日志直接 Logstash(当然此处也可以用 Filebeat 收集 tomcat 的日志). ``` ####  port ```

ELK 学习笔记之 Logstash安装

Logstash安装: https://www.elastic.co/downloads/logstash 下载解压: tar –zxvf logstash-5.6.1.tar.gz 在/usr/local/logstash-5.6.1/bin下编辑conf:(因为配置了输出到es和console上,所以必须先启动es.) vi logstash-simple.conf input { stdin { } } output { elasticsearch { hosts => ["loca

ELK系列一:ELK安装配置及nginx日志分析

本文分三个部分介绍了elk.elk安装配置及基于filebeat分析nginx日志的配置. 第一部分:elk介绍 一.什么是elk ELK 其实并不是一款软件,而是一整套解决方案,是三个软件产品的首字母缩写,Elasticsearch,Logstash 和 Kibana.这三款软件都是开源软件,通常是配合使用. 二.Elasticsearch 1.Elasticsearch介绍 Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析.它是一个建立在全

Logstash安装及部署

Logstash 安装及部署 一.环境配置 操作系统:Cent OS 7 Logstash版本:2.1.1.tar.gz JDK版本:1.7.0_51 SSH Secure Shell版本:XShell 5 二.操作过程 1.下载指定版本的logstash并解压 下载:curl -O https://download.elasticsearch.org/logstash/logstash/logstash-2.1.1.tar.gz 解压:tar zxvf logstash-2.1.1.tar.g

Elasticsearch+Logstash+Kibana配置

Elasticsearch+Logstash+Kibana配置 关于Elasticsearch+Logstash+Kibana的安装有很多文章,这里不复述了,这里仅记录一些比较细节的内容. AWS EC2中安装需要的注意事项 9200,9300,5601端口要记得打开 elasticsearch的地址不要写外部IP,否则会很浪费data,写内部ip elasticsearch { host => "ip-10-160-94-102.ap-northeast-1.compute.intern