log4j2发送消息至Kafka

title: 自定义log4j2发送日志到Kafka


图片描述(最多50字)

tags: log4j2,kafka

为了给公司的大数据平台提供各项目组的日志,而又使各项目组在改动上无感知。做了一番调研后才发现log4j2默认有支持将日志发送到kafka的功能,惊喜之下赶紧看了下log4j对其的实现源码!发现默认的实现是同步阻塞的,如果kafka服务一旦挂掉会阻塞正常服务的日志打印,为此本人在参考源码的基础上做了一些修改。

log4j日志工作流程

log4j2对于log4j在性能上有着显著的提升,这点官方上已经有了明确的说明和测试,所以不多赘述。在为了更熟练的使用,还是有必要了解其内部的工作流程。这是 官网 log4j的一张类图

Applications using the Log4j 2 API will request a Logger with a specific name from the LogManager. The LogManager will locate the appropriate LoggerContext and then obtain the Logger from it. If the Logger must be created it will be associated with the LoggerConfig that contains either a) the same name as the Logger, b) the name of a parent package, or c) the root LoggerConfig. LoggerConfig objects are created from Logger declarations in the configuration. The LoggerConfig is associated with the Appenders that actually deliver the LogEvents.

官网已经解释他们之间的关系了,这里不再对每个类的功能和作用做具体介绍,今天的重点是 Appender 类,因为他将决定将日志输出至何方。

Appender
The ability to selectively enable or disable logging requests based on their logger is only part of the picture. Log4j allows logging requests to print to multiple destinations. In log4j speak, an output destination is called an Appender. Currently, appenders exist for the console, files, remote socket servers, Apache Flume, JMS, remote UNIX Syslog daemons, and various database APIs. See the section on Appenders for more details on the various types available. More than one Appender can be attached to a Logger.

核心配置

图片描述(最多50字)

图片描述(最多50字)

上图是log4j2发送日志到kafka的核心类,其实最主要的 KafkaAppender ,其他的几个类是连接 kafka 服务的。

KafkaAppender核心配置
@Plugin(name = "Kafka", category = "Core", elementType = "appender", printObject = true)
public final class KafkaAppender extends AbstractAppender {
/**

<MyKafka name="Kafka" topic="log-test">
自定义配置

有时候我们会用到的属性由于默认的 KafkaAppender 不一定支持,所以需要一定程度的改写。但是改写也比较方便,只需要从构造器的 Properties kafkaProps 属性中取值即可。为了满足项目要求,我这边定义了platform和serviceName两个属性。

通过 KafkaAppender 的源码可知,他发送消息采取的是同步阻塞的方式。经过测试,一旦kafka服务挂掉,那么将会影响项目服务正常的日志输出,而这不是我希望看到的,所以我对他做了一定的程度的修改。

feature::

kafka服务一直正常
这种情况属于最理想的情况,消息将源源不断的发送至kafka broker
kafka服务挂掉,过一段时间后恢复正常
当kafka服务在挂掉的那一刻,后续所有的消息将会输出至 ConcurrentLinkedQueue 队列里面去。同时该队列的消息也会不断的被消费,输出至本地文件。当心跳检测到kafka broker恢复正常了,本地文件的内容将会被读取,然后发送至kafka broker。需要注意的时候, 此时会有大量消息被实例化为ProducerRecord 对象,堆内存的占用率非常高,所以我用线程阻塞了一下!
kafka服务一直挂
所有的消息都会被输出至本地文件。

log4j2发送消息至Kafka

原文地址:http://blog.51cto.com/13952953/2300849

时间: 2024-08-03 12:57:28

log4j2发送消息至Kafka的相关文章

Golang之发送消息至kafka

windows下安装zookeeper 1.安装JAVA-JDK,从oracle下载最新的SDK安装(我用的是1.8的) 2.安装zookeeper3.3.6,下载地址:http://apache.fayea.com/zookeeper/ 3.重命名conf/zoo_sample.cfg 为conf/zoo.cfg 4.编辑 conf/zoo.cfg,修改dataDir=D:\zookeeper-3.3.6\data\ 4.运行bin/zkServer.cmd 启动结果如下: 安装kafka 1

kafka无法发送消息问题处理

背景 在服务器上搭建了一个单机环境的kafka broker,在服务器上使用命令生产消息时,一切正常.当在本地使用JAVA程序发送消息时,一直出错. 抛出的错误为: Exception in thread "main" Failed to send requests for topics test with correlation ids in [0,12] kafka.common.FailedToSendMessageException: Failed to send messag

【原创】Kafka接受发送消息对象Object基础版

首先感谢 kafka 中国社区 王扬庭例子的帮助和指导~~~~~(kafka_2.9.2-0.8.1.1) kafka常用的发送消息的方法如下: Properties props = new Properties(); props.put("zookeeper.connect", "slaves2:2181,slaves3:2181,slaves4:2181"); props.put("serializer.class", "kafka

kafka producer batch 发送消息

1. 使用 KafkaProducer 发送消息,是按 batch 发送的,producer 首先把消息放入 ProducerBatch 中: org.apache.kafka.clients.producer.internals.ProducerBatch 2. KafkaProduer 类中有一个 Thread 属性,负责 IO,发送和接收数据: this.sender = new Sender(logContext, client, this.metadata, this.accumula

java客户端向单机版kafka发送消息没有接收到

kafka版本:kafka_2.11-0.10.0.0 在kafka服务器命令发送消息,消费者可以接受到, 但是在java客户端向kafka发送消息时消费者接受不到, 在kafka/config/sever.properties把这个注解打开 advertised.listeners=PLAINTEXT://ip.137:9092        #本机服务器ip 意思就是说:hostname.port都会广播给producer.consumer.如果你没有配置了这个属性的话,则使用listene

分布式消息队列kafka

下载地址:http://kafka.apache.org/downloads.html 先启动zookeeper服务器 bin/zookeeper-server-start.sh config/zookeeper.properties & 再启动kafka服务器 bin/kafka-server-start.sh -daemon config/server.properties & 创建topic bin/kafka-topics.sh --create --zookeeper local

分布式公布订阅消息系统 Kafka 架构设计

我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础. 如今它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是全部站点在对其站点使用情况做报表时要用到的数据中最常规的部分.活动数据包含页面訪问量(page view).被查看内容方面的信息以及搜索情况等内容.这样的数据通常的处理方式是先把各种活动以日志的形式写

分布式发布订阅消息系统 Kafka 架构设计[转]

分布式发布订阅消息系统 Kafka 架构设计 转自:http://www.oschina.net/translate/kafka-design 我们为什么要搭建该系统 Kafka是一个消息系统,原本开发自LinkedIn,用作LinkedIn的活动流(activity stream)和运营数据处理管道(pipeline)的基础.现在它已为多家不同类型的公司 作为多种类型的数据管道(data pipeline)和消息系统使用. 活动流数据是所有站点在对其网站使用情况做报表时要用到的数据中最常规的部

分布式消息系统-kafka

消息中间件MessageQuene 解耦且可扩展:业务复杂度的提升带来的也是耦合度的提高,消息队列在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一接口.这允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束. 冗余:有些业务在处理过程中如果失败了,数据在未进行持久化的时候就已经消失,消息队列把数据持久化直到他们被处理,避免了数据的丢失 处理并发:大数据量访问的时候我们可以将消息放入队列中,然后在队列里面按照系统的吞吐能力来进行稳定的抽取数据并进行业务处