使用JMX监控Kafka

Kafka可以配置使用JMX进行运行状态的监控,既可以通过JDK自带Jconsole来观察结果,也可以通过Java API的方式来.

关于监控指标的描述,可以参考:http://kafka.apache.org/documentation.html#monitoring

开启JMX端口

修改bin/kafka-server-start.sh,添加JMX_PORT参数,添加后样子如下

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

通过Jconsole测试时候可以连接

通过JavaAPI来访问

通过以下方法获取目标值

public class KafkaDataProvider{
    protected final Logger LOGGER = LoggerFactory.getLogger(getClass());
    private static final String MESSAGE_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec";
    private static final String BYTES_IN_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec";
    private static final String BYTES_OUT_PER_SEC = "kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec";
    private static final String PRODUCE_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce";
    private static final String CONSUMER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchConsumer";
    private static final String FLOWER_REQUEST_PER_SEC = "kafka.network:type=RequestMetrics,name=RequestsPerSec,request=FetchFollower";
    private static final String ACTIVE_CONTROLLER_COUNT = "kafka.controller:type=KafkaController,name=ActiveControllerCount";
    private static final String PART_COUNT = "kafka.server:type=ReplicaManager,name=PartitionCount";
    public String extractMonitorData() {
        //TODO 通过调用API获得IP以及参数
        KafkaRoleInfo monitorDataPoint = new KafkaRoleInfo();
        String jmxURL = "service:jmx:rmi:///jndi/rmi://192.168.40.242:9999/jmxrmi";
        try {
            MBeanServerConnection jmxConnection = MetricDataUtils.getMBeanServerConnection(jmxURL);
            ObjectName messageCountObj = new ObjectName(MESSAGE_IN_PER_SEC);
            ObjectName bytesInPerSecObj = new ObjectName(BYTES_IN_PER_SEC);
            ObjectName bytesOutPerSecObj = new ObjectName(BYTES_OUT_PER_SEC);
            ObjectName produceRequestsPerSecObj = new ObjectName(PRODUCE_REQUEST_PER_SEC);
            ObjectName consumerRequestsPerSecObj = new ObjectName(CONSUMER_REQUEST_PER_SEC);
            ObjectName flowerRequestsPerSecObj = new ObjectName(FLOWER_REQUEST_PER_SEC);
            ObjectName activeControllerCountObj = new ObjectName(ACTIVE_CONTROLLER_COUNT);
            ObjectName partCountObj = new ObjectName(PART_COUNT);
            Long messagesInPerSec = (Long) jmxConnection.getAttribute(messageCountObj, "Count");
            Long bytesInPerSec = (Long) jmxConnection.getAttribute(bytesInPerSecObj, "Count");
            Long bytesOutPerSec = (Long) jmxConnection.getAttribute(bytesOutPerSecObj, "Count");
            Long produceRequestCountPerSec = (Long) jmxConnection.getAttribute(produceRequestsPerSecObj, "Count");
            Long consumerRequestCountPerSec = (Long) jmxConnection.getAttribute(consumerRequestsPerSecObj, "Count");
            Long flowerRequestCountPerSec = (Long) jmxConnection.getAttribute(flowerRequestsPerSecObj, "Count");
            Integer activeControllerCount = (Integer) jmxConnection.getAttribute(activeControllerCountObj, "Value");
            Integer partCount = (Integer) jmxConnection.getAttribute(partCountObj, "Value");
            monitorDataPoint.setMessagesInPerSec(messagesInPerSec);
            monitorDataPoint.setBytesInPerSec(bytesInPerSec);
            monitorDataPoint.setBytesOutPerSec(bytesOutPerSec);
            monitorDataPoint.setProduceRequestCountPerSec(produceRequestCountPerSec);
            monitorDataPoint.setConsumerRequestCountPerSec(consumerRequestCountPerSec);
            monitorDataPoint.setFlowerRequestCountPerSec(flowerRequestCountPerSec);
            monitorDataPoint.setActiveControllerCount(activeControllerCount);
            monitorDataPoint.setPartCount(partCount);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (MalformedObjectNameException e) {
            e.printStackTrace();
        } catch (AttributeNotFoundException e) {
            e.printStackTrace();
        } catch (MBeanException e) {
            e.printStackTrace();
        } catch (ReflectionException e) {
            e.printStackTrace();
        } catch (InstanceNotFoundException e) {
            e.printStackTrace();
        }
        return monitorDataPoint.toString();
    }
    public static void main(String[] args) {
        System.out.println(new KafkaDataProvider().extractMonitorData());
    }
    /**
     * 获得MBeanServer 的连接
     *
     * @param jmxUrl
     * @return
     * @throws IOException
     */
    public MBeanServerConnection getMBeanServerConnection(String jmxUrl) throws IOException {
        JMXServiceURL url = new JMXServiceURL(jmxUrl);
        JMXConnector jmxc = JMXConnectorFactory.connect(url, null);
        MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
        return mbsc;
    }
}

其他工具

除了自己编写定制化的监控程序外

kafka-web-console

https://github.com/claudemamo/kafka-web-console

部署sbt:

http://www.scala-sbt.org/0.13/tutorial/Manual-Installation.html

http://www.scala-sbt.org/release/tutorial/zh-cn/Installing-sbt-on-Linux.html

KafkaOffsetMonitor

https://github.com/quantifind/KafkaOffsetMonitor/releases/tag/v0.2.0

java -cp KafkaOffsetMonitor-assembly-0.2.0.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb --zk localhost:12181 --port 8080 --refresh 5.minutes --retain 1.day

Mx4jLoader

时间: 2024-07-31 11:23:38

使用JMX监控Kafka的相关文章

jmxtrans监控kafka

我们知道jmx可以将程序内部的信息暴露出来,但是要想监控这些信息的话,就还需要自己写java程序调用jmx接口去获取数据,并按照某种格式发送到其他地方(如监控程序zabbix,ganglia).这时jmxtrans就派上用场了,jmxtrans的作用是自动去jvm中获取所有jmx格式数据,并按照某种格式(json文件配置格式)输出到其他应用程序(常用的有ganglia) 安装 主页:https://github.com/jmxtrans/jmxtrans(这里面也有一个下载地址,貌似版本更高)

监控Kafka消费进度

使用Kafka作为消息中间件消费数据时,监控Kafka消费的进度很重要.其中,在监控消费进度的过程中,主要关注消费Lag. 常用监控Kafka消费进度的方法有三种,分别是使用Kafka自带的命令行工具.使用Kafka Consumer API和Kafka自带的JMX监控指标,这里介绍前两种方法. 注: 内网IP:10.12.100.126 10.12.100.127 10.12.100.128 外网IP:47.90.133.76 47.90.133.77 47.90.133.78 用户名:ser

运用JMX监控Tomcat

1.先配Tomcat的启动语句,window下tomcat的bin/catalina.bat(linux为catalina.sh),在头上注释部分(.bat为rem..sh为#)后面加上set JAVA_OPTS=-Dcom.sun.management.jmxremote.port=8999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=true (linux为JAVA_

监控应用服务器使用JMX监控Tomcat (转收藏)

前言:做了一个监控应用服务器的项目(支持Tocmat.WebSphere.WebLogic各版本), 过程也算是磕磕绊绊,由于网上缺少相关资料,或者深陷于知识的海洋难以寻觅到有效的资料,因而走过不少弯路,遇过不少困难.为了留下点印记,给后来人留下 点经验之谈,助之少走弯路,故将这些经验整理出来,与大家分享.水平有限,难免疏漏,还望指正.如有疑问,欢迎留言,或者加入Q群参与讨 论:35526521. 一.激活Tomcat的JMX远程配置 要通过JMX远程监控Tomcat,首先需要激活Tomcat的

zabbix通过JMX监控tomcat状态

因为公司大量使用tomcat作为应用服务,所以,这两天催生了一个想法,通过zabbix监控tomcat的运行状态,从而能够更快的发现tomcat服务出现的问题以及判断问题出现在哪块. 在网上找了一些资料来看,写的都不是很全面(PS:对于我这种菜鸟来说,还有很多东西不知道的,所以需要有解释的详细点的文档来帮助我更好理解原理,于是就有了本篇博文的诞生!) 首先,zabbix监控tomcat等这一类java平台的应用不是直接通过agentd来实现的,而是使用jmx来获取到tomcat这类应用的状态,然

zabbix jmx监控

jmx监控的步骤: 1. 添加主机监控的时候,要加上JMX interface要填写, ip为主机IP, 端口为java端开启的远程端口,默认是12345 2. 前端添加item的时候,Type选择JMX agent, key为 jmx[....]样子(jmx怎么获取的就不知道了) 3. 代理或者Server端的软件 确定安装了yum install zabbix.xx-java-gateway.x86_64 4. 代理或者Server端确定配置文件开启了JavaGateway JavaGate

14、Zabbix如何使用JMX监控

JMX(Java Management Extensions,即Java管理扩展)是java平台上为应用程序.设备.系统等植入管理功能的框架. JMX工作原理:  zabbix_server想知道一台主机上的特定的JMX值时,它向Zabbix-Java-gateway询问,而Zabbix-Javagateway使用"JMXmanagementAPI"去查询特定的应用程序,而前提是应用程序这端在开启时需要"-Dcom.sun.management.jmxremote"

Jetty服务器jmx监控

Jetty 服务器增加jmx,jmx-remote模块 1.修改对应jetty服务器的配置文件start.ini追加如下两行–module=jmx–module=jmx-remote 2.取消etc/jetty-jmx.xml关于jmx的配置注释 [html] view plain copy print? <New id="ConnectorServer" class="org.eclipse.jetty.jmx.ConnectorServer"> &l

Kafka 消息监控 - Kafka Eagle

1.概述 在开发工作当中,消费 Kafka 集群中的消息时,数据的变动是我们所关心的,当业务并不复杂的前提下,我们可以使用 Kafka 提供的命令工具,配合 Zookeeper 客户端工具,可以很方便的完成我们的工作.随着业务的复杂化,Group 和 Topic 的增加,此时我们使用 Kafka 提供的命令工具,已预感到力不从心,这时候 Kafka 的监控系统此刻便尤为显得重要,我们需要观察消费应用的详情. 监控系统业界有很多杰出的开源监控系统.我们在早期,有使用 KafkaMonitor 和