新版插件:
说明: 从5.0开始,插件都独立拆分成gem包,每个插件可独立更新,无需等待Logstash自身整体更新,具体管理命令可参考./bin/logstash-plugin --help帮助信息../bin/logstash-plugin list其实所有的插件就位于本地./vendor/bundle/jruby/1.9/gems/目录下
扩展: 如果GitHub上面(https://github.com/logstash-plugins/)发布了扩展插件,可通过./bin/logstash-plugin install <plugin-name>,当然升级也很方便./bin/logstash-plugin update <plugin-name>,如果要安装更新本地已有的插件可通过./bin/logstash-plugin install/update <plugin-path>即可.
注意: 默认./bin/logstash-plugin install/update时是到https://rubygems.org/下载包,速度非常慢,所以强烈推荐手动从https://github.com/logstash-plugins/或https://rubygems.org/下载下来更新
输入插件: https://www.elastic.co/guide/en/logstash/current/input-plugins.html
说明: 输入插件只用于input区段中,可以通过不同的方式来采集数据,如上为目前Logstash支持的所有输入插件.
插件名称: udp (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-udp.html)
input { udp { port => 25826 workers => 2 queue_size => 20000 buffer_size => 1452 codec => collectd { } type => "collectd" } } output{ stdout{ codec => rubydebug } }
生产案例:
yum -y install collectd* cp -rfp /etc/collectd.conf /etc/collectd.conf_org vim /etc/collectd.conf # Collectd基础信息采集配置 Hostname "xm-server-00001" FQDNLookup false LoadPlugin df LoadPlugin cpu LoadPlugin disk LoadPlugin memory LoadPlugin network <Plugin network> Server "10.2.5.51" "25826" </Plugin> LoadPlugin interface <Plugin interface> Interface "eth0" IgnoreSelected false </Plugin> Include "/etc/collectd.d"
说明: 如上是Logstash Udp插件的生产案例,Logstash配置文件中udp区段port表示监听的udp端口,workers表示读取socket数据的线程数,一般设置为CPU个数即可,queue_size表示udp包可堆积在内存中个数,buffer_size表示读取缓冲区最大大小,codec 表示编码器,type表示手工定义的类型,可用于filter区段中以及后面前端Kibana检索
插件名称: file (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-file.html)
input { file { path => ["/xm-workspace/xm-webs/xmcloud/logs/*.log"] type => "dss-pubserver" codec => json start_position => "beginning" } } output{ stdout{ codec => rubydebug } }
生产案例:
log_format json ‘{"@timestamp":"$time_iso8601",‘ ‘"host":"$server_addr",‘ ‘"clientip":"$remote_addr",‘ ‘"size":$body_bytes_sent,‘ ‘"responsetime":$request_time,‘ ‘"upstreamtime":"$upstream_response_time",‘ ‘"upstreamhost":"$upstream_addr",‘ ‘"http_host":"$host",‘ ‘"url":"$uri",‘ ‘"xff":"$http_x_forwarded_for",‘ ‘"referer":"$http_referer",‘ ‘"agent":"$http_user_agent",‘ ‘"status":"$status"}‘;
说明: 分析网站/接口访问日志时可监听匹配日志文件,同时会生成.sincedb数据库文件,记录被监听文件的inode/major/minor/position,file区段中还支持delimiter表示行分割符,discover_interval表示检查是否有新文件频率,默认15秒,exclude表示排除监听的匹配列表,close_older表示多久没修改就关闭监听句柄,默认3600秒,ignore_older表示只检查多久内的文件,默认86400,sincedb_path表示sincedb存放位置,默认位于(Linux: $HOME/.,sincedb | Windows: C:\Windows\System32\config\systemprofile\.sincedb),sincedb_write_interval表示多久写一次sincedb,默认为15秒,stat_interval为每隔多久检查一次被监听的文件状态,默认为1秒,start_position表示从什么位置读取文件数据,默认为结束位置,类似tail -f,当然也可设定beginning从头读取.
注意: 通常导入原有数据进Elasticsearch前需通过filter/date插件规范默认的@timestamp字段值,file插件并不支持递归监视,如果有需求可以数组形式匹配,支持/path/to/*/*/*/*.log写法,如果需要反复测试可将sincedb_path定义为/dev/null,这样每次重启都会自动从头开始读,还有一点儿需要注意的是Windows由于没有inode概念,所以监听文件不靠谱,推荐使用nxlog作为收集端.
插件名称: stdin (https://www.elastic.co/guide/en/logstash/current/plugins-inputs-stdin.html)
input { stdin { codec => json tags => ["dss-pubserver"] type => "stdin" add_field => { "runenv" => "docker" } } } output{ stdout{ codec => rubydebug } }
生产案例:
{"status": "", "content_length": "", "body_bytes_sent": "", "http_x_forwarded_for": "", "request_time": "", "errors": {"text": "res body is invalid protocol format args", "line": 128, "func": "process_msg()", "file": "dsspub-server.lua"}, "request_body": "", "http_referer": "", "level": "", "hostname": "", "request": "", "remote_addr": "", "upstream_response_time": "", "time_local": "", "upstream_addr": ""}
说明: type和tags是Logstash事件中两个特殊的字段,通常会在输入区段中来标记事件类型,然后在数据处理区域根据事件类型来调用插件处理消息或添加删除tags,最后在输出中通过tags来判断并选择输出
插件名称: syslog(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-syslog.html)
input { syslog { port => "6514" } } output{ stdout{ codec => rubydebug } }
生产案例:
vim /etc/rsyslog.conf *.* @@127.0.0.1:6514
说明: syslog在收集网络设备端日志时也许是几乎唯一可行办法,简单测试只需修改/etc/rsyslog.conf配置,加入网络传输支持,发送到本地的Logstash监听端口6514,默认接收过滤操作都在input中完成,虽然启动时可通过-w指定worker数但是同一个客户端数据的处理以及过滤都在同一个线程中完成,导致处理性能严重下降,如果使用udp+grok的match的%{SYSLOGLINE}和syslog_pri实现,虽然将input和grok拆分拆分到两个线程,但udp接收缓冲区有限制(netstat -plnu | awk ‘NR==1 || $4~/:514$/{print $2}‘),当大于228096时就开始丢包,所以还是推荐使用tcp方式
扩展: 如果实在没法切换到tcp协议,可通过这个小工具来实现异步IO的UDP监听数据输入写入Elasticsearch(https://gist.github.com/chenryn/7c922ac424324ee0d695)
插件名称: tcp(https://www.elastic.co/guide/en/logstash/current/plugins-inputs-tcp.html)
input { tcp { port => "8888" mode => "server" ssl_enable => false } } output{ stdout{ codec => rubydebug } }
生产实例:
nc 127.0.0.1 8888 < error.log
说明: Logstash也可通过Tcp组件实现简单消息队列,但千万别用于生产环境,生产环境还是换用专业的Logstash broker,比较常用的就是配合nc实现旧数据导入,这种导入旧数据的方式相比file好处在于可以准确的知道何时导入完成.