基于flume的日志系统

思路

  1. 日志统一输出至kafka
  2. flume agent充当kafka消费者,将日志输出至elasticsearch
  3. kibana负责展示日志信息

准备工作

  1. flume 1.8 kafka 1.1.0 elasticsearch&kibana 6.5.4
  2. 项目中一般使用log4j等日志框架,需自定义JsonLayout
  3. flume支持的elasticsearch较低,需自定义flume es sink
  4. elasticsearch默认使用utc时间,日志时间需保持一致

JsonLayout

只需要在自定义的JsonLayout中构造一个PatternLayout帮助format日志消息即可 日志格式内容如下

public class JsonLogBean {
    private String system;
    private String ip;
    private String message;
    private String level;
    private String time;

    //get set 略
    public JsonLogBean(){}

    public JsonLogBean(String system, String ip,String message, String level, String time) {
        this.system = system;
        this.ip = ip;
        this.message = message;
        this.level = level;
        this.time = time;
    }

}

log4j1

public class JsonPartternLayout extends PatternLayout{

    private String system;

    //PatternLayout 默认将异常交给WriterAppender处理 这里改为false
    public boolean ignoresThrowable() {
        return false;
    }

    private static String ip;

    private static SimpleDateFormat utcFormater;

    static {
        utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘");
        utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            //ignore
        }
    }

    @Override
    public String format(LoggingEvent event) {
        StringBuilder sb = new StringBuilder();
        sb.append(super.format(event));
        String[] s = event.getThrowableStrRep();
        if (s != null) {
            int len = s.length;
            for (int i = 0; i < len; i++) {
                sb.append(s[i]);
                sb.append("\n");
            }
        }
        String time = utcFormater.format(new Date(event.getTimeStamp()));
        return JSON.toJSONString(new JsonLogBean(system, ip, sb.toString(), event.getLevel().toString(), time)) + "\n";

    }

    public String getSystem() {
        return system;
    }

    public void setSystem(String system) {
        this.system = system;
    }
}

log4j2

@Plugin(name = "JsonPartternLayout", category = Node.CATEGORY, elementType = Layout.ELEMENT_TYPE, printObject = true)
public class JsonPartternLayout extends AbstractStringLayout {

    private PatternLayout patternLayout;

    private String system;

    private static String ip;

    private static SimpleDateFormat utcFormater;

    static {
        utcFormater = new SimpleDateFormat("yyyy-MM-dd‘T‘HH:mm:ss.SSS‘Z‘");
        utcFormater.setTimeZone(TimeZone.getTimeZone("UTC"));//时区定义并进行时间获取
        try {
            ip = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            //ignore
        }

    }

    public JsonPartternLayout(final Configuration config, final RegexReplacement replace, final String eventPattern,
             final PatternSelector patternSelector, final Charset charset, final boolean alwaysWriteExceptions,
             final boolean disableAnsi, final boolean noConsoleNoAnsi, final String headerPattern,
             final String footerPattern, final String system) {
        super(config, charset,
                newSerializerBuilder()
                        .setConfiguration(config)
                        .setReplace(replace)
                        .setPatternSelector(patternSelector)
                        .setAlwaysWriteExceptions(alwaysWriteExceptions)
                        .setDisableAnsi(disableAnsi)
                        .setNoConsoleNoAnsi(noConsoleNoAnsi)
                        .setPattern(headerPattern)
                        .build(),
                newSerializerBuilder()
                        .setConfiguration(config)
                        .setReplace(replace)
                        .setPatternSelector(patternSelector)
                        .setAlwaysWriteExceptions(alwaysWriteExceptions)
                        .setDisableAnsi(disableAnsi)
                        .setNoConsoleNoAnsi(noConsoleNoAnsi)
                        .setPattern(footerPattern)
                        .build());

        this.patternLayout = PatternLayout.newBuilder()
                .withPattern(eventPattern)
                .withPatternSelector(patternSelector)
                .withConfiguration(config)
                .withRegexReplacement(replace)
                .withCharset(charset)
                .withDisableAnsi(disableAnsi)
                .withAlwaysWriteExceptions(alwaysWriteExceptions)
                .withNoConsoleNoAnsi(noConsoleNoAnsi)
                .withHeader(headerPattern)
                .withFooter(footerPattern)
                .build();
        this.system = system;
    }

    /**
     *
     * @param event
     * @return
     */
    public String toSerializable(LogEvent event) {
        String msg = this.patternLayout.toSerializable(event);
        String time = utcFormater.format(new Date(event.getTimeMillis()));
        return JSON.toJSONString(new JsonLogBean(system, ip,msg, event.getLevel().name(), time)) + "\n";
    }

    @PluginBuilderFactory
    public static Builder newBuilder() {
        return new Builder();
    }

    /**
     *
     */
    public static class Builder implements org.apache.logging.log4j.core.util.Builder<JsonPartternLayout> {

        @PluginBuilderAttribute
        private String pattern = DEFAULT_CONVERSION_PATTERN;

        @PluginElement("PatternSelector")
        private PatternSelector patternSelector;

        @PluginConfiguration
        private Configuration configuration;

        @PluginElement("Replace")
        private RegexReplacement regexReplacement;

        // LOG4J2-783 use platform default by default
        @PluginBuilderAttribute
        private Charset charset = Charset.defaultCharset();

        @PluginBuilderAttribute
        private boolean alwaysWriteExceptions = true;

        @PluginBuilderAttribute
        private boolean disableAnsi = !useAnsiEscapeCodes();

        @PluginBuilderAttribute
        private boolean noConsoleNoAnsi;

        @PluginBuilderAttribute
        private String header;

        @PluginBuilderAttribute
        private String footer;

        @PluginBuilderAttribute
        private String system;

        private Builder() {
        }

        private boolean useAnsiEscapeCodes() {
            PropertiesUtil propertiesUtil = PropertiesUtil.getProperties();
            boolean isPlatformSupportsAnsi = !propertiesUtil.isOsWindows();
            boolean isJansiRequested = !propertiesUtil.getBooleanProperty("log4j.skipJansi", true);
            return isPlatformSupportsAnsi || isJansiRequested;
        }

        public JsonPartternLayout build() {
            // fall back to DefaultConfiguration
            if (configuration == null) {
                configuration = new DefaultConfiguration();
            }
            return new JsonPartternLayout(configuration, regexReplacement, pattern, patternSelector, charset,
                    alwaysWriteExceptions, disableAnsi, noConsoleNoAnsi, header, footer, system);
        }
    }

}

Es Sink

这里我们直接在flume-ng-elasticsearch-sink的源码上做修改,为避免冲突我改了相关的包名 源码见这里 pom配置如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-high-eslog-sink</artifactId>
    <version>1.8</version>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.rat</groupId>
                <artifactId>apache-rat-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <dependencies>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-sdk</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.8.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.5.4</version>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>transport</artifactId>
            <version>6.5.4</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.6.1</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.6.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.5</version>
        </dependency>

        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>11.0.2</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>

    </dependencies>
</project>
        

flume-ng-elasticsearch-sink的源码还是比较少的,以下几个比较重要的改动点

将elasticsearch lib下的jar包copy到flume lib下 以下是我copy到flume的相关jar包 如果有jar缺失或冲突查看下flume的日志很容易就能够解决

[email protected] 1 zhanghuan  staff    10M  1  4 22:26 elasticsearch-6.5.4.jar
[email protected] 1 zhanghuan  staff    16K  1  4 22:26 elasticsearch-cli-6.5.4.jar
[email protected] 1 zhanghuan  staff    36K  1  4 22:26 elasticsearch-core-6.5.4.jar
[email protected] 1 zhanghuan  staff    12K  1  4 22:26 elasticsearch-launchers-6.5.4.jar
[email protected] 1 zhanghuan  staff    11K  1  4 22:26 elasticsearch-secure-sm-6.5.4.jar
[email protected] 1 zhanghuan  staff   110K  1  4 22:26 elasticsearch-x-content-6.5.4.jar
[email protected] 1 zhanghuan  staff   1.6M  1  4 22:37 lucene-analyzers-common-7.5.0.jar
[email protected] 1 zhanghuan  staff    98K  1  4 22:37 lucene-backward-codecs-7.5.0.jar
[email protected] 1 zhanghuan  staff   2.9M  1  4 22:37 lucene-core-7.5.0.jar
[email protected] 1 zhanghuan  staff    85K  1  4 22:37 lucene-grouping-7.5.0.jar
[email protected] 1 zhanghuan  staff   202K  1  4 22:37 lucene-highlighter-7.5.0.jar
[email protected] 1 zhanghuan  staff   143K  1  4 22:37 lucene-join-7.5.0.jar
[email protected] 1 zhanghuan  staff    50K  1  4 22:37 lucene-memory-7.5.0.jar
[email protected] 1 zhanghuan  staff    93K  1  4 22:37 lucene-misc-7.5.0.jar
[email protected] 1 zhanghuan  staff   259K  1  4 22:37 lucene-queries-7.5.0.jar
[email protected] 1 zhanghuan  staff   373K  1  4 22:37 lucene-queryparser-7.5.0.jar
[email protected] 1 zhanghuan  staff   259K  1  4 22:37 lucene-sandbox-7.5.0.jar
[email protected] 1 zhanghuan  staff    14K  1  4 22:37 lucene-spatial-7.5.0.jar
[email protected] 1 zhanghuan  staff   231K  1  4 22:37 lucene-spatial-extras-7.5.0.jar
[email protected] 1 zhanghuan  staff   295K  1  4 22:37 lucene-spatial3d-7.5.0.jar
[email protected] 1 zhanghuan  staff   240K  1  4 22:37 lucene-suggest-7.5.0.jar
-rw-r--r--  1 zhanghuan  staff   7.4K  1  7 23:04 transport-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    77K  1  7 23:08 transport-netty4-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff   107K  1  7 23:10 reindex-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    72K  1  7 23:11 percolator-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    60K  1  7 23:12 lang-mustache-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff    75K  1  7 23:13 parent-join-client-6.5.4.jar
-rw-r--r--  1 zhanghuan  staff   1.1M  1  7 23:16 hppc-0.7.1.jar
-rw-r--r--  1 zhanghuan  staff   258K  1  7 23:18 log4j-api-2.11.1.jar
-rw-r--r--  1 zhanghuan  staff   1.5M  1  7 23:19 log4j-core-2.11.1.jar
-rw-r--r--  1 zhanghuan  staff    50K  1  8 19:29 t-digest-3.2.jar
[email protected] 1 zhanghuan  staff   3.6M  1  8 19:38 netty-all-4.1.25.Final.jar
[email protected] 1 zhanghuan  staff   276K  1  8 19:42 jackson-core-2.8.11.jar
[email protected] 1 zhanghuan  staff    50K  1  8 19:42 jackson-dataformat-cbor-2.8.11.jar
[email protected] 1 zhanghuan  staff    72K  1  8 19:42 jackson-dataformat-smile-2.8.11.jar
[email protected] 1 zhanghuan  staff    40K  1  8 19:42 jackson-dataformat-yaml-2.8.11.jar
-rw-r--r--  1 zhanghuan  staff    28K  1 10 21:33 flume-ng-high-eslog-sink-1.8.jar
[email protected] 1 zhanghuan  staff    62K  1 12 19:35 log4j-1.2-api-2.11.1.jar

测试

这里我们使用log4j2 配置如下

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="WARN" packages="com.mine.log">
    <Appenders>
        <Console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="%d{yyyy-MM-dd HH:mm:ss.SSS}[%t] %-5p %c %msg%xEx%n" />
        </Console>

        <Kafka name="Kafka" topic="test1">
            <JsonPartternLayout system = "mlog" pattern = "[%t] %c %msg%xEx%n"/>
            <Property name="bootstrap.servers">localhost:9092</Property>
        </Kafka>
    </Appenders>

    <Loggers>
        <Root level="debug">
            <AppenderRef ref="Console"/>
            <AppenderRef ref="Kafka"/>
        </Root>
    </Loggers>
</Configuration>

测试代码

public class LogTest {

    private static final Logger logger = LogManager.getLogger(LogTest.class);

    @org.junit.Test
    public void test() {
        logger.info("输出信息……");
        logger.trace("随意打印……");
        logger.debug("调试信息……");
        logger.warn( "警告信息……");
        try {
            new Thread(new Runnable() {
                public void run() {
                    logger.warn("test……");
                }
            }).start();
            LogTest.class.getClass().forName("123");
        } catch (Exception e) {
            logger.error("处理业务逻辑的时候发生一个错误……", e);
        }
    }
}

flume配置

#Name the components on this agent
agent.sources = r1
agent.sinks = k1
agent.channels = c1

#Describe/configure the source
agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.r1.channels = channel1
agent.sources.r1.batchSize = 5000
agent.sources.r1.batchDurationMillis = 2000
agent.sources.r1.kafka.bootstrap.servers = localhost:9092
agent.sources.r1.kafka.topics = test1
agent.sources.r1.kafka.consumer.group.id = custom.g.id

#Describe the sink
agent.sinks.k1.type = org.apache.flume.sink.hielasticsearch.ElasticSearchSink
agent.sinks.k1.hostNames = 127.0.0.1:9300
agent.sinks.k1.indexName = log_index
agent.sinks.k1.indexType = log_table
#agent.sinks.k1.clusterName = log_cluster
agent.sinks.k1.batchSize = 500
agent.sinks.k1.ttl = 5d
agent.sinks.k1.serializer = org.apache.flume.sink.hielasticsearch.ElasticSearchDynamicSerializer

#Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
agent.sources.r1.channels = c1
agent.sinks.k1.channel = c1

启动kafka flume elasticsearch kibana即可 配置相关基本下载即用我就略过了哈 日志展示如下

原文地址:https://www.cnblogs.com/adia/p/10261273.html

时间: 2024-11-08 21:31:06

基于flume的日志系统的相关文章

教你一步搭建Flume分布式日志系统

在前篇几十条业务线日志系统如何收集处理?中已经介绍了Flume的众多应用场景,那此篇中先介绍如何搭建单机版日志系统. 环境 CentOS7.0 Java1.8 下载 官网下载 http://flume.apache.org/download.html 当前最新版  apache-flume-1.7.0-bin.tar.gz 下载后上传到CentOS中的/usr/local/ 文件夹中,并解压到当前文件中重命名为flume170    /usr/local/flume170 tar -zxvf a

基于flume的日志管理系统实现

一.flume概述 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集.聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力.我们选用flume对内部多个系统的日志进行信号的采集.管理和查询,目前仅实现了信息管理功能,进一步会对报警.统计等功能进行开发. flume的主要组件包括: Source,SourceRunner,Interceptor,Channel,Cha

.NET基于Eleasticsearch搭建日志系统实战演练(公开版)

一.需求背景介绍 1.1.需求描述 大家都知道C/S架构模式的客户端应用程序(比如:WinForm桌面应用.WPF.移动App应用程序.控制台应用程序.Windows服务等等)的日志记录都存储在本地客户端中,这样就导致有时候一旦客户端出现了的异常问题,为了快速地定位对应服务端出现的位置却极其的繁琐和不方便,就需要找到当时的客户端出现异常问题的日志快照记录. 所以说为了解决实时日志监控问题,ELK提供的一套的解决方案就应运而生了 .然而真实的现状往往比你现实的想象中的还有更糟糕,很有可能在我们的项

基于Log4Net的日志系统

阅读目录 日志系统应具备的特性 Log4Net 配置文件:log4net.config 初始化 输出信息 对Log4Net的封装 log4net.config复杂配置 不管是Web应用程序还是WinForm应用程序,Visual Studio所带的调试功能都是足够强大,足以应付开发中的各种调试需求.但是,对于已经发布的应用,要记录错误.记载运行中的各种状态信息,就需要依靠日志系统了. 回到顶部 日志系统应具备的特性 一个好的日志系统,应该具备以下的特性: 1.运行稳定.因为日志的作用就是要在系统

基于Flume的美团日志收集系统(一)架构和设计【转】

美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团日志收集系统>将分两部分给读者呈现美团日志收集系统的架构设计和实战经验. 第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计. 第二部分改进和优化,将主要着眼于实际部署和使用过程中遇到的问题,对Flume做的功能修改和优化等. 1 日志收集系统简介 日志收集是大数据的基石.

基于Flume的美团日志收集系统(一)架构和设计

来自:美团技术博客 http://tech.meituan.com/mt-log-system-arch.html 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团日志收集系统>将分两部分给读者呈现美团日志收集系统的架构设计和实战经验. 第一部分架构和设计,将主要着眼于日志收集系统整体的架构设计,以及为什么要做这样的设计. 第二部分改进和优化,将主要着眼于

基于Flume的美团日志收集系统(二)改进和优化

问题导读: 1.Flume-NG与Scribe对比,Flume-NG的优势在什么地方? 2.架构设计考虑需要考虑什么问题? 3.Agent死机该如何解决? 4.Collector死机是否会有影响? 5.Flume-NG可靠性(reliability)方面做了哪些措施? 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团日志收集系统>将分两部分给读者呈现美团日

基于Flume+LOG4J+Kafka的日志采集架构方案

本文将会介绍如何使用 Flume.log4j.Kafka进行规范的日志采集. Flume 基本概念 Flume是一个完善.强大的日志采集工具,关于它的配置,在网上有很多现成的例子和资料,这里仅做简单说明不再详细赘述.Flume包含Source.Channel.Sink三个最基本的概念: Source——日志来源,其中包括:Avro Source.Thrift Source.Exec Source.JMS Source.Spooling Directory Source.Kafka Source.

分布式框架-日志系统思路及实现

转自:https://www.jianshu.com/p/ce30c31111ca 背景 随着互联网时代数据规模的爆发式增长,传统的单机系统在性能和可用性上已经无法胜任,分布式应用和服务化应用开始走进大家的视野,但是分布式的部署也会带来另外的问题,日志分散在各个应用服务节点中,出现问题不方便及时排查,尤其是服务化的应用中,分析问题时可能需要查看多个日志文件才能定位问题,如果相关项目不是一个团队维护时沟通成本更是直线上升,怎么将日志文件归集,怎么将日志文件呈现成了很多公司需要面对的问题,因此日志系