Logstash详解
一,简介
? Logstash是一款开源的日志采集,处理,输出的软件,每秒可以处理数以万计条数据,可以同时从多个来源采集数据,转换数据,然后将数据输出至自己喜欢的存储库中(官方推荐的存储库为Elasticsearch)
? 如图所示logstash的工作机制,数据从来源进入logstash,进行过滤最后输出到目标.
Logstash处理日志需要借助于大量的插件来完成.主要有以下三类插件,
- Input plugins
- Filter plugins
- Output plugins
二,安装配置
2.1、二进制包方式安装
? 软件下载链接 https://www.elastic.co/downloads/logstash
? 由于logstash是java语言开发的,所以需要安装下jdk, 这里安装的logstash为6.x的版本,官方要求使用java8
#安装jdk,去oracle官网下载jdk
tar xvf jdk-8u181-linux-x64.tar.gz -C /usr/java/
#配置java环境变量
vim /etc/profile.d/java.sh #内容如下
JAVA_HOME=/usr/java/jdk1.8.0_181
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
#加载java的环境变量
source /etc/profile
#下载tar.gz安装包
wgethttps://artifacts.elastic.co/downloads/logstash/logstash-6.4.1.tar.gz
tar xvf logstash-6.4.1.tar.gz -C /usr/local/
cd /usr/local/
mv logstash-6.4.1 logstash
2.2、rpm安装
#yum方式安装openjdk
yum install java-1.8.0-openjdk
#下载logstash rpm 包
#下载页面,自行下载https://www.elastic.co/downloads/logstash
#下载完成后安装
rpm -ivh logstash-6.4.2.rpm
#配置文件目录
/etc/logstash
#执行程序目录
/usr/share/logstash/bin
2.3、命令行启动
#一个简单的测试示例,获取标准输入并且标准输出
/usr/local/logstash/bin/logstash -e ‘input { stdin{} } output { stdout{ codec => rubydebug} }‘
#等待进程启动,会有点慢等最底下一行出现Successfully字样则说明进程启动成功,此时随便输入一些字符串就会被logstash input 获取到.并根据定义输出
? 以配置文件的方式运行,如果是以rpm或yum方式安装的则配置文件需保存至/etc/logstash/conf.d/目录下, 如果是二进制 tar.gz zip安装则自己定义一个目录来存放配置文件启动的时候传入正确的路径即可
vim test.conf # 内容如下
input {
file {
path =>["/var/log/nginx/access.log"]
type =>"nginx"
start_position=> "beginning"
}
}
output {
stdout { }
}
# End
/usr/local/logstash/bin/logstash -f test.conf
以上两种方法一般用于调试的时候使用,
2.4、后台方式运行logsatsh进程
一般来说都是写配置文件来定义logsatsh的
如果是rpm或yum方式安装的则直接可以使用service来控制程序的启动关闭等
# centos 6.x
service logstash start
chkconfig logstash on
# cnetos 7.x
systemctl start logstash
systemctl enable logstash
? 如果是二进制方式安装则需自己编写sysv或system风格的启动脚本或者使用nohup命令来启动
三,插件
2.1, Input plugins
? 所谓的输入插件就是支持的数据来源,每一个插件就是一种数据来源.目前官方支持输入插件见官方文档 https://www.elastic.co/guide/en/logstash/current/input-plugins.html
? 这里列出一些常用的
插件名称 | 说明 |
---|---|
beats | 一款轻量级的数据收集引擎,占用资源较少 |
file | 从文件中读取数据 |
jdbc | 从数据库中读取数据,需写sql查询语句 |
kafka | 从kafka消息队列中消费数据 |
redis | 从redis缓存中取数据 |
log4j | 直接从log4j中获取数据 |
stdin | 标准输入获取数据 |
syslog | 从syslog中获取数据 |
2.2,Filter plugins
插件名称 | 说明 |
---|---|
grok | 可以将非结构化的数据通过正则表达式处理为结构化数据 |
geoip | 解析IP地址的经纬度及国家城市名称等 |
json | 处理原字段是json格式的数据 |
kv | 将字段拆分为key/value |
date | 解析日期字段,然后使用该日期字段作为日志存储的时间戳 |
mutate | 此插件可以对字段做出改变 如重命名、删除、替换和修改等 |
2.3, Output plugins
输出的目标不一定非得是存储目标,也有可能是其他的接收目标如kafka,redis,
官方文档 https://www.elastic.co/guide/en/logstash/current/output-plugins.html
插件名称 | 说明 |
---|---|
elasticsearch | 输出至elasticsearch存储 |
file | 输入至文件存储 |
Redis | 输出至redis中 |
Stdout | 标准输出 |
Hadoop_webhdfs | 输出至hdfs中 |
四,应用场景
4.1、Nginx日志处理
处理nginx日志一般可以使用grok、json、split来处理,最为方便的是json方式,
首先定制nginx日志为json格式输出,而后logstash的filter插件使用json就直接解析处理好日志格式了.如下
4.1.1、json
1、定义nginx输出日志格式为json格式
vim /etc/nginx/nginx.conf # 修改替换log_format 配置段
log_format main ‘{"remote_addr":"$remote_addr",‘
‘"host":"$host",‘
‘"status":"$status",‘
‘"@timestamp":"$time_local",‘
‘"request":"$request",‘
‘"size":$body_bytes_sent,‘
‘"body_bytes_sent":$body_bytes_sent,‘
‘"http_referer": "$http_referer",‘
‘"http_user_agent": "$http_user_agent",‘
‘"http_x_forwarded_for": "$http_x_forwarded_for",‘
‘"uri": "$uri",‘
‘"server_addr": "$server_addr",‘
‘"upstream_response_time": "$upstream_response_time",‘
‘"request_time": "$request_time"‘
‘}‘;
# End
#重启nginx
service nginx restart
2、配置logstash处理及存储数据
vim /etc/logstash/conf.d/nginx_log.conf
# 数据来源为redis
input{
redis {
type => "nginx-log"
host =>"10.57.1.202"
key =>"nginx-log"
data_type =>"list"
db => 0
}
}
filter {
# 这里就是使用json 来处理解析数据的配置. 配置很简单定义源字段直接就可以了
# 由于源字段已被解析成N个字段 所以源字段则就不需要存在了,
# 主要是为了节省存储空间
json {
source => "message"
remove_field => "message"
}
# 使用geoip解析IP地址的经纬度及城市信息等,提供地图位置展示等
# 这里判断如果内网地址的话则不做geoip解析处理
#这里的内网地址是机房及公司内部的IP段
if "10.57.1." not in [remote_addr] and "192.168." not in [remote_addr] {
geoip{
source => "remote_addr"
target => "geoip"
database => "/etc/logstash/geoip/GeoLite2-City.mmdb"
}
}
# 处理接口后面跟的参数,需要的话可将注释去掉
# kv {
# source => "request"
# field_split => "&"
# value_split => "="
# prefix => "url_args_"
# }
}
output {
# 如果是客户端是内网IP的则存储在xxx-lan-xxx的索引里,主要是为了区分内外网数据
if "10.57.1" in [remote_addr] or "192.168.1" in [remote_addr]{
elasticsearch {
hosts => "10.57.1.201"
# index这需要注意的是开头
# 一定要是 logstash才能匹配到es的index模板
# 名称必须全小写
index => "logstash-nginx-lan%{+YYYY.MM.dd}"
}
}
# 如果客户端IP不是内网的则肯定就是外网IP了,外网来源的数据则创建索引 *wan*
else{
elasticsearch {
hosts => "10.57.1.201"
index => "logstash-nginx-wan-%{+YYYY.MM.dd}"
}
}
}
# End
#重启logstash
service logstash restart
3、调试模式的输出结果
五、生产部署
? 生产环境使用logstash一般情况不单一使用,大多数场景需要结合缓存、消息队列、存储系统、分析展示系统等
更多好文关注马哥linux运维
原文地址:https://blog.51cto.com/ant595/2482947