Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集

本文的内容

  • 如何用filebeat kafka es做一个好用,好管理的日志收集工具
  • 放弃logstash,使用elastic pipeline
  • gunicron日志格式与filebeat/es配置
  • flask日志格式与异常日志采集与filebeat/es配置
  • 以上的配置

概况

我有一个HTTP请求,经过的路径为

Gateway(kong)-->WebContainer(gunicorn)-->WebApp(flask)

我准备以下流向处理我的日志

file --> filebeat --> kafka topic--> filebeat --> elastic pipeline --> elasticsearch
                       |
                       |  ----------> HBase

为什么这么做

Logstash去哪里了?

  • Logstash太重了,不过这不是问题,也就是多个机器加点钱的问题。能把事情处理就行。
  • Logstash不美,Logstash虽然是集中管理配置,但是一个logstash好像总是不够,Logstash好像可以分开配置,但是你永远不知道如何划分哪些配置应该放在一个配置文件,哪些应该分开。
  • 删除一个配置?不可能的,我怎么知道应该删除什么配置。
  • 如果用了Logstash. As a ‘poor Ops guys having to understand and keep up with all the crazy input possibilities. ^_^

Filebeat的痛处

  • 看看这个Issue吧, 万人血书让filebeat支持grok, 但是就是不支持,不过给了我们两条路,比如你可以用存JSON的日志啊, 或者用pipeline
  • Filebeat以前是没有一个好的kafka-input。只能自己写kafka-es的转发工具

简单点

我想要的日志采集就是简简单单,或者说微服务的内聚力。 一条日志采集线就不该和其他业务混合。最好的就是以下这种状态

onefile -> filebeat_config -> kafka_topic -> filebeat_config -> elastic pipepline -> es index

Gunicorn日志

gunicorn日志

gunicorn日志采集如下的信息

  • time
  • client_ip
  • http method
  • http scheme
  • url
  • url query string
  • response status code
  • client name
  • rt
  • trace id
  • remote ips

    日志格式

%(t)s [%(h)s] [%(m)s] [%(H)s] [%(U)s] [%(q)s] [%(s)s] [%(a)s] [%(D)s] [%({Kong-Request-ID}i)s] [%({X-Forwarded-For}i)s]

日志例子

[15/Nov/2019:10:23:37 +0000] [172.31.37.123] [GET] [HTTP/1.1] [/api/v1/_instance/json_schema/Team/list] [a=1] [200] [Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36] [936] [9cbf6a3b-9c3a-4835-a2ef-02e03ee826d7#16] [137.59.103.3, 172.30.17.253, 172.30.18.12]

Es processing解析

es processing是6.0之后的功能,相当于es之前自带了一个logstash.对于复杂日志有多种processing,
可以使用grok或者dissect.某些情况下dissect更加快一些.
经过kafka,再有filebeat打到ES, 需要删除多余的信息

PUT _ingest/pipeline/gunicorn
{
  "description" : "devops gunicorn pipeline",
  "processors" : [
    {
        "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
    },
    {
        "json": {
            "field": "message",
            "add_to_root": true
        }
    },
    {
        "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
    },
    {
      "dissect" : {
        "field": "message",
        "pattern": "[%{@timestamp}] [%{client_ip}] [%{method}] [%{scheme}] [%{path}] [%{query_string}] [%{status}] [%{client}] [%{rt_millo}] [%{trace_id}] [%{remote_ips}]"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

Es mapping

这里比较关键的是ES时间格式文档的定义, 如果某些字段我们觉得有必要分词,就是用text。否则使用keyword。这样可以更加
方便的聚合和查询日志数据, 开启_source方便做一些数据统计

PUT _template/gunicorn
{
  "index_patterns": ["*gunicorn*"],
  "settings": {
    "number_of_shards": 1
  },
  "version": 1,
  "mappings": {
    "_source": {
      "enabled": true
    },
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "dd/LLL/yyyy:HH:mm:ss Z"
      },
      "client_ip": {
"type": "ip"
      },
      "method": {
        "type": "keyword"
      },
      "scheme": {
        "type": "keyword"
      },
      "path": {
        "type": "text"
      },
     "query_string": {
        "type": "text"
      },
     "status": {
        "type": "integer"
      },
            "client": {
        "type": "text"
      },
            "rt_millo": {
        "type": "long"
      },
            "trace_id": {
        "type": "keyword"
      },
      "remote_ips": {
        "type": "text"
      }
    }
  }
}

filebeat 采集到kafka配置文件

filebeat.inputs:
  - type: log
    paths:
      - /yourpath/gunicorn-access.log
    multiline.pattern: '^\['
    multiline.negate: true
    multiline.match: after
    tail_files: true

queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 5s

output.kafka:
  hosts:  ["kafka-01","kafka-02","kafka-03"]
  topic: 'gunicron_access'
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

filebeat 从kafka消费配置文件

filebeat.inputs:
- type: kafka
  hosts:  ["kafka-01","kafka-02","kafka-03"]
  topics: ["gunicron_access"]
  group_id: "filebeat_gunicron"

output.elasticsearch:
  hosts: ["es-url"]
  pipeline: "gunicorn"
  index: "gunicorn-%{+yyyy.MM.dd}"

setup.template.name: "gunicorn"
setup.template.pattern: "gunicorn-*"
setup.ilm.enabled: false
setup.template.enabled: false

Flask日志

Flask日志是我们程序打印的,用于查看一些异常和错误的日志。在上线初期,info日志是可以打开debug的日志的。这样方便我们进行调试。
在稳定之后应该将日志接受级别调高。info日志不适合做统计,只是除了问题我们可以快速定位问题所在。 异常应该打到info日志中

INFO日志可以使用我建议的格式。我们关心

  • time
  • levelname: 日志级别
  • host, process, thread: 用于定位到某台机器的某个进程下的某个线程(一些复杂的bug需要,或者开启了异步进程)
  • name, funcname, filename, lineno: 用于定位日志发生的代码位置
  • message: 日志内容

日志格式

{
    "format": "[%(asctime)s.%(msecs)03d] [%(levelname)s] [{}:%(process)d:%(thread)d] [%(name)s:%(funcName)s] [%(filename)s:%(lineno)d] %(message)s".format(HOST),
    "datefmt": "%Y-%m-%d %H:%M:%S"
}

日志例子

[2019-11-18 08:47:49.424] [INFO] [cmdb-008069:5990:140482161399552] [cmdb:execute_global_worker] [standalone_scheduler.py:116] RUN_INFO: tiny_collector_ali starting at 2019-11-18 08:47:49, next run will be at approximately 2019-11-18 09:47:49
[2019-11-18 08:11:27.715] [ERROR] [cmdb-008069:5985:140184204932928] [cmdb:common_handler] [error.py:48] 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.
Traceback (most recent call last):
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1805, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1783, in dispatch_request
    self.raise_routing_exception(req)
  File "/home/server/venv3/lib/python3.6/site-packages/flask/app.py", line 1766, in raise_routing_exception
    raise request.routing_exception
  File "/home/server/venv3/lib/python3.6/site-packages/flask/ctx.py", line 336, in match_request
    self.url_adapter.match(return_rule=True)
  File "/home/server/venv3/lib/python3.6/site-packages/werkzeug/routing.py", line 1799, in match
    raise NotFound()
werkzeug.exceptions.NotFound: 404 Not Found: The requested URL was not found on the server. If you entered the URL manually please check your spelling and try again.

Es processing解析

经过kafka,再有filebeat打到ES, 需要删除多余的信息

PUT _ingest/pipeline/info
{
  "description" : "devops info pipeline",
  "processors" : [
    {
        "remove": {"field": ["agent", "ecs", "host", "input", "kafka"]}
    },
    {
        "json": {
            "field": "message",
            "add_to_root": true
        }
    },
    {
        "remove": {"field": ["@metadata", "ecs", "agent", "input"]}
    },
    {
      "dissect" : {
        "field": "message",
        "pattern": "[%{@timestamp}] [%{level}] [%{host}:%{process_id}:%{thread_id}] [%{name}:%{func_name}] [%{file}:%{line_no}] %{content}"
      }
    }
  ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

Es mapping

thread_id 要给一个long字段, python如果获取不到会给一个超出integer范围的数字

PUT _template/info
{
  "index_patterns": ["*info*"],
  "settings": {
    "number_of_shards": 1
  },
  "version": 1,
  "mappings": {
    "_source": {
      "enabled": true
    },
    "properties": {
      "@timestamp": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss.SSS"
      },
      "level": {
        "type": "keyword"
      },
      "host": {
        "type": "keyword"
      },
      "process_id": {
        "type": "integer"
      },
     "thread_id": {
        "type": "long"
      },
       "name": {
        "type": "keyword"
      },
            "func_name": {
        "type": "keyword"
      },
             "file": {
        "type": "keyword"
      },
             "line_no": {
        "type": "integer"
      },
      "content": {
          "type": "text"
      }
    }
  }
}

filebeat 采集到Kafka配置文件

这里采用^\[20\d{2}来区分行首

filebeat.inputs:
  - type: log
    paths:
      - /you_path/app.log
    multiline.pattern: '^\[20\d{2}'
    multiline.negate: true
    multiline.match: after
    tail_files: true

queue.mem:
  events: 4096
  flush.min_events: 512
  flush.timeout: 5s

output.kafka:
  hosts: ["kafka-01", "kafka-02", "kafka-03"]
  topic: 'devops_app'
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

filebeat 从kafka消费配置文件

filebeat.inputs:
- type: kafka
  hosts:   ["kafka-01", "kafka-02", "kafka-03"]
  topics: ["devops_app"]
  group_id: "filebeat_app"

output.elasticsearch:
  hosts: ["es_url"]
  pipeline: "info"
  index: "app-info-%{+yyyy.MM.dd}"

setup.template.name: "info"
setup.template.pattern: "app-info-*"
setup.ilm.enabled: false
setup.template.enabled: false

原文地址:https://www.cnblogs.com/ohbonsai/p/12092835.html

时间: 2024-10-11 08:21:41

Filebeat7 Kafka Gunicorn Flask Web应用程序日志采集的相关文章

如何使用Nginx和uWSGI或Gunicorn在Ubuntu上部署Flask Web应用

我在很多的博客中都看过有关Flask应用的部署,也有很多博主在开博后都记录了部署的教程,因为其中的坑可以说不少.一开始我在网上看到相比较与Ubuntu,CentOS因为更新少作为服务器的操作系统会更加稳定.所以在第一次购买云服务器时,我选择了CentOS,后来由于CentOS不同发行版的Nginx缘故,我又换成了Ubuntu的镜像 首先呢,我们先来了解下关于Web服务器与Web应用还有WSGI之间的联系 一.介绍 WSGI(Web Server Gateway Interface),翻译为Pyt

Flask web应用

Flask web应用 一.介绍 最近开发要用一个测试环境,是这样的Nginx+uwsgi+flask 的一个结构.下面是一些记录,在Centos 系统上使用Flask 架构部署一个简单的Python应用.然后使用Nginx作为前端反向代理,设置uWSGI应用网关处理web应用程序. 二.条件 1)         环境要求 Server OS:最小化安装 Centos 6.8 Nginx :反向代理 Python2.6~2.7:开发语言 Flask/uWSGI:框架 Pip:python包管理

基于Flume+LOG4J+Kafka的日志采集架构方案

本文将会介绍如何使用 Flume.log4j.Kafka进行规范的日志采集. Flume 基本概念 Flume是一个完善.强大的日志采集工具,关于它的配置,在网上有很多现成的例子和资料,这里仅做简单说明不再详细赘述.Flume包含Source.Channel.Sink三个最基本的概念: Source——日志来源,其中包括:Avro Source.Thrift Source.Exec Source.JMS Source.Spooling Directory Source.Kafka Source.

Flask之旅《Flask Web开发:基于Python的Web应用开发实战》学习笔记

<Flask Web开发:基于Python的Web应用开发实战> 点击上方的"目录"快速到达哦! 虽然简单的网站(Flask+Python+SAE)已经上线,但只是入门.开发大型网站,系统地学习一遍还是有必要的. 1 虚拟环境 2016-6-8 书上介绍了 virtualenv,每个venv都会拷贝一份packages到项目 /venv目录. virtualenv venv venv\Scripts\activate.bat (venv) $ pip freeze >

《Flask Web开发:基于Python的Web应用开发实战》pdf 免费下载

<Flask Web开发:基于Python的Web应用开发实战>pdf 免费下载链接: https://u253469.ctfile.com/fs/253469-292665036 第一部分 Flask 简介第1 章 安装 .........................................................................................................................................

对 Web 应用程序进行性能调优

动态的 Web 应用程序能够存储大量信息,让用户能够通过熟悉的界面立即访问这些信息.但是,随着应用程序越来越受欢迎,可能会发现对请求的响应速度没有以前那么快了.开发人员应该了解 Web 应用程序处理 Web 请求的方式,知道在 Web 应用程序开发中可以做什么,不能做什么,这有助于减少日后的麻烦. 静态的 Web 请求(比如图 1 所示的请求)很容易理解.客户机连接服务器(通常通过 TCP 端口 80),使用 HTTP 协议发出一个简单的请求. 图 1. 客户机通过 HTTP 请求静态的文件 服

利用C#自带组件强壮程序日志

前言 在项目正式上线后,如果出现错误.异常.崩溃等情况, 我们往往第一想到的事就是查看日志. 所以日志对于一个系统的维护是非常重要的. 声明 本文中的示例代码旨在这个框架是怎么工作的,具体实现可以自由发挥. 贯穿所有的日志系统 日志系统,往往是贯穿一个程序的所有代码的; 试想一下,如果你的日志完全是由第三方组件提供的; 那么就意味着,你的所有项目都必须引用这个dll; 也许你会说自己可以2次封装,那么依然需要所有项目都引用你的这个封装后的log项目, 另一方面 一些log组件需要实例化后才可以使

NET5 Web应用程序

ASP.NET5 Web应用程序结构 本文参考ASP.NET5 官方文档 Understanding ASP.NET 5 Web Apps,加入了一些个人理解,理解不对的地方希望大家能指出,互相学习. ASP.NET 5 针对WEB编程引入了几个新的基本概念,理解这些概念对快速开发WEB应用来说很重要.或许这些这些概念对你来说不是新的,但是对那些使用ASP.NET 和 Visual Studio 进行传统的WEB应用开发的程序员来说,这些概念可能是新的. 本文主要内容包括: ASP.NET 项目

SharePoint 2013 创建web应用程序报错&amp;quot;This page can’t be displayed&amp;quot;

错误描写叙述 This page can't be displayed ?Make sure the web address http://centeradmin is correct. ?Look for the page with your search engine. ?Refresh the page in a few minutes. 发生过程 创建web应用程序,填写完基本信息以后点击确定,就是"这不会花费非常长时间"的提示,然后就报错了,而且SharePoint和iis的