filebeat读取nginx日志并写入kafka

filebeat写入kafka的配置:

filebeat.inputs:
- type: log
  paths:
    - /tmp/access.log
  tags: ["nginx-test"]
  fields:
    type: "nginx-test"
    log_topic: "nginxmessages"
  fields_under_root: true
processors:
- drop_fields:
    fields: ["beat","input","source","offset"]
name: 10.10.5.119
output.kafka:
  enabled: true
  hosts: ["10.78.1.85:9092","10.78.1.87:9092","10.78.1.71:9092"]
  topic: "%{[log_topic]}"
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000

logstash从kafka中读取的配置:

input {
    kafka {
        bootstrap_servers => "10.78.1.85:9092,10.78.1.87:9092,10.78.1.71:9092"
        topics => ["nginxmessages"]
        codec => "json"
    }
}

原文地址:http://blog.51cto.com/liuzhengwei521/2323576

时间: 2024-10-08 22:51:22

filebeat读取nginx日志并写入kafka的相关文章

filebeat 获取nginx日志 发送给ElasticSearch

目的:通过filebeat获取nginx日志,发送给ElasticSearch,filebeat可以解析json格式日志,所以设置nginx日志为json格式. 1.配置nginx配置文件     log_format jsonTest '{"@timestamp":"$time_iso8601",'                   '"host":"$server_addr",'                   '&

python利用inotify实现把nginx日志实时写入数据库

利用了pyinotify库,我用的是这里的这个,https://github.com/seb-m/pyinotify 其实网上yum上也有pyinotify库可以安装. 写入数据库是pymysql这里做一下记录, 先务pyinotify实现一个tail -f 的功能: #!/opt/python3/bin/python3 # import pyinotify import time import os import sys class ProcessTransientFile(pyinotify

ELK整合Filebeat监控nginx日志

ELK 日志分析 1. 为什么用到 ELK 一般我们需要进行日志分析场景:直接在日志文件中 grep. awk 就可以获得自己想要的信息.但在规模较大的场景中,此方法效率低下,面临问题包括日志量太大如何归档.文本搜索太慢怎么办.如何多维度查询.需要集中化的日志管理,所有服务器上的日志收集汇总.常见解决思路是建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问. 一般大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位

利用Python实现读取Nginx日志,并将需要信息写入到数据库。

#!/usr/bin/env python # coding: utf-8 # Auther:liangkai # Date:2018/6/26 11:26 # License: (C) Copyright 2013-2017, Node Supply Chain Manager Corporation Limited. # Describe: import pymysql import re import datetime # DB variables dbhost = "192.168.18

使用Log4j将程序日志实时写入Kafka

第一部分 搭建Kafka环境 安装Kafka 下载:http://kafka.apache.org/downloads.html tar zxf kafka-<VERSION>.tgz cd kafka-<VERSION> 启动Zookeeper 启动Zookeeper前需要配置一下config/zookeeper.properties: 接下来启动Zookeeper bin/zookeeper-server-start.sh config/zookeeper.properties

Python读取NGINX日志将其存入数据库

日志数据: #/usr/bin/env python #-*-coding:UTF-8 -*- from  datetime  import  datetime stat_days = [] import   pymysql #print(datetime.now().strftime("%Y-%m-%d %H:%M:%S")) #2018-05-25 22:23:44 #print(datetime.now().strftime("%d/%b-%Y %H:%M:%S&quo

python的配置文件读取,日志文件写入,批量写入mysql(python2.7,win7)

# -*- coding: utf-8 -*-import log_configimport ConfigParserimport pymysqllogger = log_config.getlogger('getdataabdupdata', 'getdataabdupdata.log')conf = ConfigParser.ConfigParser()conf.read("mysql.conf")user = conf.get("mysql", "u

ELK系列一:ELK安装配置及nginx日志分析

本文分三个部分介绍了elk.elk安装配置及基于filebeat分析nginx日志的配置. 第一部分:elk介绍 一.什么是elk ELK 其实并不是一款软件,而是一整套解决方案,是三个软件产品的首字母缩写,Elasticsearch,Logstash 和 Kibana.这三款软件都是开源软件,通常是配合使用. 二.Elasticsearch 1.Elasticsearch介绍 Elasticsearch 是一个实时的分布式搜索和分析引擎,它可以用于全文搜索,结构化搜索以及分析.它是一个建立在全

(一个)kafka-jstorm集群实时日志分析 它 ---------kafka实时日志处理

package com.doctor.logbackextend; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import