分布式框架-日志系统思路及实现

转自:https://www.jianshu.com/p/ce30c31111ca

背景

随着互联网时代数据规模的爆发式增长,传统的单机系统在性能和可用性上已经无法胜任,分布式应用和服务化应用开始走进大家的视野,但是分布式的部署也会带来另外的问题,日志分散在各个应用服务节点中,出现问题不方便及时排查,尤其是服务化的应用中,分析问题时可能需要查看多个日志文件才能定位问题,如果相关项目不是一个团队维护时沟通成本更是直线上升,怎么将日志文件归集,怎么将日志文件呈现成了很多公司需要面对的问题,因此日志系统应运而生。

dapeng日志系统的选型

日志系统通常有三部分组成,采集器、解析器、存储器

采集器通常部署在各个应用结点中,它监控本地文件的变化,对于新产生的日志变化,它实时收集并发送给对应的解析器,常见的采集器有flume、logstash、fluentd以及更轻量级的fluent-bit

解析器通常和采集器结合在一起,也有一部分解析器是通过接收缓冲队列,将日志解析成json格式数据后,把数据发往存储器进行存储

存储器用于存储对应的数据,提供相关的查询,常见的存储有hdfs、elasticsearch

我们dapeng选取的是fluent-bit+fluentd+kafka+elasticsearch作为日志系统的方案,zookeeper、elasticsearch、kafka都采用集群模式,示例图中采用单结点fluent-bit收集各个docker容器中的日志文件发往fluentd,fluentd做为中转收集所有的日志发往kafak用于削峰填谷,削峰后的数据再经由fluentd发送给elasticsearch进行存储

image

为了支持fluent-bit<=>fluentd的高可用, 我们改动了fluent-bit的源码. 另外, 生产环境上, 上述结构图中的每一个环节都不能省, 以免数据量太大发生不可预料的错误.

目前我们生产环境, 小规模应用的情况下, 每天大概产生5千万条日志记录.

关于MDC的小插曲

Logback中有一项功能很好使-MDC,映射诊断环境(Mapped Diagnostic Context)MDC本质上是使用的ThreadLocal。系统调用链可能很长,为了方便日志跟踪,统一打印标识。我们dapeng使用MDC来保存sessionTid,在一个完整的调用链中使sessionTid在各个服务中进行传递,将服务进行串联,方便问题定位,具体的logback如下

<appender name="SIMPLEFILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <prudent>false</prudent>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${soa.base}/logs/simple-dapeng-container.%d{yyyy-MM-dd}.log</fileNamePattern>
            <maxHistory>30</maxHistory>
        </rollingPolicy>
        <encoder>
            <pattern>%d{MM-dd HH:mm:ss SSS} %t %p [%X{sessionTid}] - %m%n</pattern>
        </encoder>
    </appender>

配置采集器

[SERVICE]
    Flush        5
    Daemon       On
    Log_Level    error
    Log_File     /fluent-bit/log/fluent-bit.log
    Parsers_File parse_dapeng.conf

[INPUT]
    Name tail
    Path /dapeng-container/logs/*.log
    Exclude_Path  /dapeng-container/logs/fluent*.log,/dapeng-container/logs/gc*.log
    Tag  dapeng
    Multiline  on
    Buffer_Chunk_Size 2m
    buffer_max_size  30m
    Mem_Buf_Limit  32m
    DB.Sync  Normal
    db_count 400
    Parser_Firstline dapeng_multiline
    db  /fluent-bit/db/logs.db

[FILTER]
    Name record_modifier
    Match *
    Record hostname ${HOSTNAME}
    Record tag ${serviceName}

[OUTPUT]
    Name  Forward
    Match *
    Host  fluentd
    Port  24224
    HostStandby fluentdStandby
    PortStandby 24224

record_modifer用于在解析出的json中增加hostname标识和tag标识方便日志检索

chunk及buffer块的设置根据各系统日志的大小来进行设置

HostStandby和PortStandby是我们dapeng基于原生fluent-bit进行改造添,当主fluentd挂掉后,日志事件会相应的发送给fluentdstandBy进行处理

解析器的配置

[PARSER]
    Name        dapeng_multiline
    Format      regex
    Regex       (?<logtime>\d{1,2}-\d{1,2} \d{1,2}:\d{1,2}:\d{1,2} \d{1,3}) (?<threadPool>.*) (?<level>.*) \[(?<sessionTid>.*)\] - (?<message>.*)

解析器这块对应上面的logback配置,将日志消息处理成比较直观的JSON数据进行存储

转发器fluentd的配置(用于接收消息发送kafka)

<system>
        log_level error
        flush_thread_count 8
        workers 8
</system>
<source>
        @type  forward
        port  24224
</source>
<source>
        @type monitor_agent
        port 24225
</source>

<match dapeng tomcat>
        @type kafka_buffered
        brokers ${kafkabrokers}
        topic_key messages
       #zookeeper 192.168.20.200:2181
        buffer_type file
        buffer_path /tmp/buffer
        flush_interval 60s
        default_topic messages
        output_data_type json
        compression_codec gzip
        max_send_retries 3
        required_acks -1
        discard_kafka_delivery_failed true
</match>

monitor_agent是fluentd的一个插件,可以及时获取fluentd响应用于fluentd的健康度检查

[[email protected] etc]# curl 192.168.20.200:24225/api/plugins.json
{"plugins":[{"plugin_id":"object:3ff681f97a88","plugin_category":"input","type":"forward","config":{"@type":"forward","port":"24224"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3ff681c37078","plugin_category":"input","type":"monitor_agent","config":{"@type":"monitor_agent","port":"24225"},"output_plugin":false,"retry_count":null},{"plugin_id":"object:3ff681c19ca8","plugin_category":"output","type":"kafka_buffered","config":{"@type":"kafka_buffered","brokers":"192.168.20.200:9092","topic_key":"messages","buffer_type":"file","buffer_path":"/tmp/buffer","flush_interval":"60s","default_topic":"messages","output_data_type":"json","compression_codec":"gzip","max_send_retries":"3","required_acks":"-1","discard_kafka_delivery_failed":"true"},"output_plugin":true,"buffer_queue_length":0,"buffer_total_queued_size":1174144,"retry_count":6,"retry":{}}]}

转发器fluentd的配置(用于接收kafka中的消息发送elasticsearch)

<system>
        log_level info
        flush_thread_count 8
        workers 8
</system>
<source>
        @type kafka_group
        brokers 192.168.20.200:9092
        consumer_group dapeng_consume2
        topics messages
        format json
        start_from_beginning false
</source>
<source>
        @type monitor_agent
        port 24225
</source>
<match>
        @type elasticsearch
        host 192.168.20.200
        port 9200
        index_name dapeng_log_index
        type_name  dapeng_log
        content_type application/x-ndjson
        buffer_type file
        buffer_path /tmp/buffer_file
        buffer_chunk_limit 30m
        buffer_queue_limit 512
        flush_mode interval
        flush_interval 60s
        request_timeout 15s
        flush_thread_count 8
        reload_on_failure true
        resurrect_after 30s
        reconnect_on_error true
        with_transporter_log true
        logstash_format true
        logstash_prefix dapeng_log_index
        template_name dapeng_log_index
        template_file  /fluentd/etc/template.json
        num_threads 8
        utc_index  false
</match>

start_from_beginning默认为true,代表从消息队列中起始读取数据,当fluentd重启会造成日志消息冗余,因此这里配置false,如果需要恢复日志索引,可以配置成true让日志消息再消息一次(我们日志kafka消息保留的策略是保留1天,因此当出现故障时我们可以快速恢复1天内的日志)

logstash_format 用于配置将日志索引按天数来存放

template.json模板配置

{
 "mappings": {
   "dapeng_log": {
    "properties": {

       "logtime": {
        "type": "date",
        "format": "MM-dd HH:mm:ss SSS"
       },
       "threadPool": {
        "type": "string",
        "index": "not_analyzed"
       },
       "level": {
        "type": "string",
        "index": "not_analyzed"
       },
        "tag": {
        "type": "string",
        "index": "not_analyzed"
       },
       "message": {
        "type": "string",
        "index": "not_analyzed",
        "ignore_above":256
       },
       "hostname":{
        "type": "string",
        "index": "not_analyzed"
       },
       "sessionTid":{
        "type": "string",
        "index": "not_analyzed"
       },
       "log":{
        "type": "string",
        "index": "not_analyzed"
       }
    }
   }
  },
  "settings": {
   "index": {
    "max_result_window": "100000000",
    "number_of_shards": "1",
    "number_of_replicas": "1",
    "refresh_interval": "60s"
   }
  },
  "warmers": {},
  "template": "dapeng_log_index-*"
}

配置使用的是es2的配置,线上我们使用的是5.6.9的版本,es这块可以向下兼容将string类型的转换为keyword

日志查询

查询服务调用关系

通过sessionTid来查询服务间的调用关系,这里sessionTid正是上面MDC中设置的,在服务的调用中通过InvocationContext(dapeng上下文)进行传递

服务调用关系.png

查询堆栈异常

堆栈异常.png

按天进行错误分组

GET dapeng_log_index-2018.07.25/_search
{
  "size": 0,
  "query": {
    "bool": {
      "must": [
        {
          "term": {
            "level": "ERROR"
          }
        }
      ],
      "filter": {
        "script": {
          "script": {
            "source": "doc[‘message‘].values.length==0"
          }
        }
      }
    }
  },
  "aggs": {

    "group_by_tag": {

      "terms": {
        "field": "tag",
        "size": 100
      }
    }
  }
}

坑及优化

fluent-bit报Invalid indentation level

fluent-bit对配置文件的要求比较高,请保持配置用空格对齐,不要使用tab键

fluent-bit高内存占用

根据官方文档描述,在某些环境中,通常会发现被摄取的日志或数据比将其刷新到某些目的地的速度要快。 常见的情况是从大日志文件读取并通过网络将日志分派到后端,这需要一些时间来响应,这样会产生背压,导致服务中的高内存消耗。为了避免背压,Fluent Bit在引擎中实现了一种限制数据量的机制,通过配置参数Mem_Buf_Limit完成的。

我们这里通过配置Mem_Buf_Limit来优化,另外fluent-bit默认使用Glibc来管理分配内存,这里我们使用jmalloc,这是一种替代内存分配器,它具有更好的策略来减少其他碎片以获得更好的性能

image

fluentd隔天写入索引

写入es中的日志会比当前时间提前8个小时,例如0-8点的日志会写入到昨天的索引中,这里我们配置utc-index为false即可

elasticsearch长期报GC

由于业务高峰日志量导致瞬时写入较大,es会长时间报gc,影响数据的写入,这里我们引入kafka作消息缓冲,另外我们弃用elasticsearch默认的垃圾回收器,使用G1回收器

jdb2高io使用

最开始,我们在网站上检索关于jdb2高iowait的解决方案,给出的方案都是ext4的bug,差一点我就信了,linux的bug也能遇到,但是转过来一想这bug也好多年了,内核早就修复了,应该不是这方面的问题,我们使用top查看cpu的使用情况,比较空闲,但是wait比较高

image

使用iotop来查看磁盘的io使用情况,基本都是fluent-bit产生的

image

接下来我们使用 blktrace来收集更进一步的详细信息

image

最后我们使用wc来统计43这一秒内fluent-bit产生的IO请求数(Q表示即将生成IO请求)

image

问题元凶找到了,fluent-bit读取的日志文件后会在写出的时候更新文件位置索引,将索引保存在sqllite中,根据上面的统计,每秒钟产生的IO操作在101次(由于有4个fluent-bit)正是由于fluent-bit频繁的更新sqlite中的文件索引,造成文件合并引起的高iowait,因此需要对sqlite的写入次数加限制,这里我们基于fluent-bit改造了两种 方案,第一种,每次都只从尾部读取文件,这样就省掉了文件索引的保存达到减少磁盘IO,第二种,增加db_count参数用于对chunk块计数,当发送chunk块计数达到配置的参数时保存文件的位置索引,我们dapeng对这两块都进行了个性化改造实现,改造后的效果对比图如下

image

elasticsearch内存使用优化

es这块按天来存放对应的日志索引,长期不用的索引会占用大量内存。一般日志索引只需要开放近三天的索引即可,日志索引保留近一月即可

  #!/bin/bash
date=`date -d "3 days ago" +%Y.%m.%d`
date1=`date -d "30 days ago" +%Y.%m.%d`
echo $date
echo $date1
curl -XPOST http://192.168.20.200:9200/dapeng_log_index-$date/_close
curl -XDELETE "http://192.168.20.200:9200/dapeng_log_index-${date1}"

基于日志系统的衍生扩展

目前我们基于现有的日志系统,做了生产故障实时告警系统,直接钉钉推送给相关的业务系统负责人,具体方案有两种,一种是根据索引去过滤近30分钟的日志异常推送,另外一种是从kafak中提取消息后过滤推送,第一种是假实时,错误有所延迟,第二种是完全实时,我们现在采取的是第一种方案,第二种方案有待实现

image

总结

到这一步,我们的日志系统已经搭建成功了,当服务器扩容时,由于fluent-bit是集成在dapeng容器中,只需要在环境变量中简单配置serviceName和hostname以及fluentdhost即可,日志消息就会写入到es存储中。

日志系统是一个非常重要的功能组成部分,我们可以使用日志系统来进行错误编排,系统优化,根据这些信息调整系统的行为,提高系统的可用性。(想了解更多?请关注dapeng开源)

作者:洋洋_3720
链接:https://www.jianshu.com/p/ce30c31111ca
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

原文地址:https://www.cnblogs.com/barrywxx/p/10288441.html

时间: 2024-10-17 05:26:58

分布式框架-日志系统思路及实现的相关文章

分布式实时日志系统(一)环境搭建之 Jstorm 集群搭建过程/Jstorm集群一键安装部署

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 淘宝开源了许多产品组件

分布式实时日志系统(二) 环境搭建之 flume 集群搭建/flume ng资料

最近公司业务数据量越来越大,以前的基于消息队列的日志系统越来越难以满足目前的业务量,表现为消息积压,日志延迟,日志存储日期过短,所以,我们开始着手要重新设计这块,业界已经有了比较成熟的流程,即基于流式处理,采用 flume 收集日志,发送到 kafka 队列做缓冲,storm 分布式实时框架进行消费处理,短期数据落地到 hbase.mongo中,长期数据进入 hadoop 中存储. 接下来打算将这其间所遇到的问题.学习到的知识记录整理下,作为备忘,作为分享,带给需要的人. 学习flume ng的

分布式实时日志系统(四) 环境搭建之centos 6.4下hbase 1.0.1 分布式集群搭建

一.hbase简介 HBase是一个开源的非关系型分布式数据库(NoSQL),它参考了谷歌的BigTable建模,实现的编程语言为 Java.它是Apache软件基金会的Hadoop项目的一部分,运行于HDFS文件系统之上,为 Hadoop 提供类似于BigTable 规模的服务.因此,它可以容错地存储海量稀疏的数据.HBase在列上实现了BigTable论文提到的压缩算法.内存操作和布隆过滤器.HBase的表能够作为MapReduce任务的输入和输出,可以通过Java API来存取数据,也可以

java日志框架与日志系统

日志框架:提供日志调用的接口,实际的日志输出委托给日志系统实现. JCL(Jakarta Commons Logging):比较流行的日志框架,很多框架都依赖JCL,例如Spring等. SLF4j:提供新的API,初衷是配合Logback使用,但同时兼容Log4j. 日志系统:负责输出日志 Log4j:较早的日志系统,可以单独使用,也可配合日志框架JCL使用 Logback:Log4j的替代产品,需要配合日志框架SLF4j使用 JUL(java.util.logging):JDK提供的日志系统

基于zipkin分布式链路追踪系统预研第一篇

分布式服务追踪系统起源于Google的论文“Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”(译文可参考此处),Twitter的zipkin是基于此论文上线较早的分布式链路追踪系统了,而且由于开源快速被各社区所研究,也诞生了很多的版本. 在这里也是对zipkin进行研究,先贴出Twitter zipkin结构图. 结构比较简单,大概流程为: Trace数据的收集至Scribe(Facebook开源的日志传输通路)或

zipkin分布式链路追踪系统

基于zipkin分布式链路追踪系统预研第一篇 分布式服务追踪系统起源于Google的论文“Dapper, a Large-Scale Distributed Systems Tracing Infrastructure”(译文可参考此处),Twitter的zipkin是基于此论文上线较早的分布式链路追踪系统了,而且由于开源快速被各社区所研究,也诞生了很多的版本. 在这里也是对zipkin进行研究,先贴出Twitter zipkin结构图. 结构比较简单,大概流程为: Trace数据的收集至Scr

利用开源架构ELK构建分布式日志系统

本文介绍了如何使用成熟的经典架构ELK(即Elastic search,Logstash和Kibana)构建分布式日志监控系统,很多公司采用该架构构建分布式日志系统,包括新浪微博,freewheel,畅捷通等. 背景日志,对每个系统来说,都是很重要,又很容易被忽视的部分.日志里记录了程序执行的关键信息,ERROR和WARNING信息等等.我们可以根据日志做很多事情,做数据分析,系统监控,排查问题等等 .但是,任何一个中大型系统都不可能是单台Server,日志文件散落在几十台甚至成千上万台Serv

asp.Net Core免费开源分布式异常日志收集框架Exceptionless安装配置以及简单使用图文教程

原文:asp.Net Core免费开源分布式异常日志收集框架Exceptionless安装配置以及简单使用图文教程 最近在学习张善友老师的NanoFabric 框架的时了解到Exceptionless : https://exceptionless.com/ !因此学习了一下这个开源框架!下面对Exceptionless的学习做下笔记! Exceptionless是什么?能做什么呢? “Exceptionless”这个词的定义是:没有异常.Exceptionless可以为您的ASP.NET.We

日志系统之基于Zookeeper的分布式协同设计

最近这段时间在设计和实现日志系统,在整个日志系统系统中Zookeeper的作用非常重要--它用于协调各个分布式组件并提供必要的配置信息和元数据.这篇文章主要分享一下Zookeeper的使用场景.这里主要涉及到Zookeeper在日志系统中的使用,但其实它在我们的消息总线和搜索模块中也同样非常重要. 日志元数据 日志的类型和日志的字段这里我们统称为日志的元数据.我们构建日志系统的目的最终主要是为了:日志搜索,日志分析.这两大块我们很大程度上依赖于--ElasticSearch(关于什么是Elast