pinpoint web报警机制源码解析

背景:(简述)

Pinpoint 是一套APM (Application Performance Management)工具,主要用于帮助分析系统的总体结构和组件如何相互调用,也可用于追踪线上性能问题,方便定位出现问题的点。

Pinpoint主要有如下几个组成部分:

  • Pinpoint Agent :通过字节码增强技术,附加到 用户的java 应用来做采样,程序启动时指定javaagent以及agentId,pplicationName。
  • HBase :用于存储agent采样的数据。
  • Pinpoint Collector :信息的收集者,部署在tomcat中,由于收集agent采样的数据并存入hbase。
  • Pinpoint Web :提供WEB_UI界面,部署在tomcat中,提供可视化页面,并且提供监控报警功能。(需要自行实现)

博文的主要内容与主要目的:

  pinpoint的报警功能是需要自行拓展实现的,网上有很多的实现方法,但是没有一个对于此报警机制的源码分析,本文旨在填补此处空白,让读者有一些基本的了解,如此调试报警的时候才能够得心应手。

背景知识:(详情可以百度一下,后面也会介绍相关内容)

  Spring Task:Spring内置的定时任务。

  Spring Batch: 一个大数据量的并行处理框架。

概述:

   通过Spring Task的定时任务,每分钟做一次通过SpringBatch的批处理检查。

该模块github源码地址:

  https://github.com/naver/pinpoint/tree/master/web/src/main/java/com/navercorp/pinpoint/web/alarm

源码解析:(本文重点)

  1.定时任务入口: src/main/resources/batch/applicationContext-batch-schedule.xml

<task:scheduled-tasks scheduler="scheduler">
       <task:scheduled ref="batchJobLauncher" method="alarmJob" cron="0 0/1 * * * *" />
       <!--省略-->
</task:scheduled-tasks>
<task:scheduler id="scheduler" pool-size="5"/>

此段代码标签使用的是SpringTask的标签,大概意思为,定义了一个线程池大小为5的调度器scheduler。

scheduler执行的任务有三个,batchJobLauncher的alarmJob方法,每一分钟执行一次,alarmJob,顾名思义,报警的任务。

  2.批处理任务入口: src/main/resources/batch/applicationContext-alarmJob.xml

前置说明-SpringBatch批处理框架中与此相关的解释:

如果是批处理的话,自然有批处理任务(对应下面的Job标签),每个任务自然有一个或者多个步骤(对应下面的Step标签)。

每个步骤有三个操作,读取数据(对应reader),处理数据(对应processor),回写数据(对应writer)。这三者中的参数是按照顺序传递的。

大家可能会想,报警机制和读取数据,处理数据,回写数据有什么关系吗?下面我说明一下pinpoint相关的对应的业务关系:

reader:读取数据 => 通过用户配置的规则提供Checker,即异常校验器。

processor:处理数据 => 用Checker进行校验,标记异常状态。

writer:回写数据 => 判断Checker是否有异常情况,有则报警。

下面的配置的源码:
<!--定义了一个alramJob的批处理任务-->
  <batch:job id="alarmJob">
    <batch:step id="alarmPartitionStep">
      <!--此alarmJob只有一个Step-->
        <batch:partition step="alarmStep" partitioner="alarmPartitioner">
            <!--设置执行的线程池-->
            <batch:handler task-executor="alarmPoolTaskExecutorForPartition" />
        </batch:partition>
    </batch:step>
    <batch:listeners>
       <batch:listener ref="jobFailListener"/>
    </batch:listeners>
</batch:job>

<batch:step id="alarmStep">
    <!--代表step的一种处理策略-->
    <batch:tasklet>
        <!--批处理流程-->
        <!-- 顺序执行
        reader:读取数据 => 提供Checker,即异常校验器,见下面的bean
        processor:处理数据 => 用Checker进行校验,见下面的bean
        writer:回写数据 => 判断Checker是否有异常情况,有则报警,见下面的bean
        -->
        <batch:chunk reader="reader" processor="processor" writer="writer" commit-interval="1"/>
    </batch:tasklet>
</batch:step>

<bean id="alarmPartitioner" class="com.navercorp.pinpoint.web.alarm.AlarmPartitioner"/>

<bean id="reader" class="com.navercorp.pinpoint.web.alarm.AlarmReader" scope="step"/>
<bean id="processor" class="com.navercorp.pinpoint.web.alarm.AlarmProcessor" scope="step"/>
<bean id="writer" class="com.navercorp.pinpoint.web.alarm.AlarmWriter" scope="step"/>

  3.通过对AlarmReader、AlarmProcessor、AlarmWriter的解析,梳理报警原理

(1) com.navercorp.pinpoint.web.alarm.AlarmReader:实现了ItemReader接口
// StepExecutionListener监听器,可以定义Step开始前后的操作
public class AlarmReader implements ItemReader<AlarmChecker>, StepExecutionListener {
    ...
   // 报警所用的Checker在内存里
    private final Queue<AlarmChecker> checkers = new ConcurrentLinkedDeque<>();
    ...

    // Checker出队,供processor使用,
   @Override
    public AlarmChecker read() {
        return checkers.poll();
    }

    // 批处理之前,将应用的报警规则加入Checker之中
    @Override
    public void beforeStep(StepExecution stepExecution) {
     // 查询所有的应用
        List<Application> applicationList = applicationIndexDao.selectAllApplicationNames();
     // 根据应用用户配置的规则,添加Checker到队列当中
        for (Application application : applicationList) {
            addChecker(application);
        }
    }

    private void addChecker(Application application) {
     // 根据应用名称获取所有的规则,应用名称就是配置agent的时候,指定的applicationName
        List<Rule> rules = alarmService.selectRuleByApplicationId(application.getName());
        long timeSlotEndTime = System.currentTimeMillis();
        Map<DataCollectorCategory, DataCollector> collectorMap = new HashMap<>();
        // 遍历规则
        for (Rule rule : rules) {
        // CheckerCategory是一个枚举类,预置了所有的报警规则模版,比如失败请求次数、慢请求次数等
            CheckerCategory checkerCategory = CheckerCategory.getValue(rule.getCheckerName());
        // 数据收集器是为检验规则准备的,例如Rule是失败请求次数,但是次数从哪里来,就是从这个收集器来的

            DataCollector collector = collectorMap.get(checkerCategory.getDataCollectorCategory());
       // 这里是一个基于Map的缓存
            if (collector == null) {
                collector = dataCollectorFactory.createDataCollector(checkerCategory, application, timeSlotEndTime);
                collectorMap.put(collector.getDataCollectorCategory(), collector);
            }
            // 创建Checker,有兴趣的读者可以看看CheckerCategroy的源码,设计的还是很不错的。
        // AlaramChecker是一个抽象方法,具体的功能由子类实现
            AlarmChecker checker = checkerCategory.createChecker(collector, rule);
       // 加入队列
            checkers.add(checker);
        }

    }
    ...
}
(2) com.navercorp.pinpoint.web.alarm.AlarmProcessor:实现了ItemProcessor接口
public class AlarmProcessor implements ItemProcessor<AlarmChecker, AlarmChecker> {
   // 此处的AlarmChecker是上面的read()方法传递过来的    @Override
    public AlarmChecker process(AlarmChecker checker) {
        // check,顾名思义,检验,标记是否有异常情况,check()方法见下
        checker.check();
        return checker;
    }

}
com.navercorp.pinpoint.web.alarm.checker.AlarmProcessor
protected abstract boolean decideResult(T value);

    public void check() {
        // 收集数据
        dataCollector.collect();
        // 标记是否有异常情况,意为是否满足报警的阀值,decideResult是一个抽象方法
     // detected字段在后续的Writter中会被检查
        detected = decideResult(getDetectedValue()); }
(3)com.navercorp.pinpoint.web.alarm.AlarmWriter:实现了ItemWriter接口
public class AlarmWriter implements ItemWriter<AlarmChecker> {

    // 需要用户自定义配置在Spring中的AlarmMessageSender,如果不配置,则是一个空实现
    @Autowired(required = false)
    private AlarmMessageSender alarmMessageSender = new EmptyMessageSender();

    @Autowired
    private AlarmService alarmService;
   // 实现的接口的方法,主要内容
    @Override
    public void write(List<? extends AlarmChecker> checkers) throws Exception {
        Map<String, CheckerResult> beforeCheckerResults = alarmService.selectBeforeCheckerResults(checkers.get(0).getRule().getApplicationId());
        // 遍历上面传递的Checker
        for (AlarmChecker checker : checkers) {
            CheckerResult beforeCheckerResult = beforeCheckerResults.get(checker.getRule().getCheckerName());

            if (beforeCheckerResult == null) {
                beforeCheckerResult = new CheckerResult(checker.getRule().getApplicationId(), checker.getRule().getCheckerName(), false, 0, 1);
            }
            // 对上面的Processor标记的detected值进行检查
            if (checker.isDetected()) {
                sendAlarmMessage(beforeCheckerResult, checker);
            }
            // 记录报警历史
            alarmService.updateBeforeCheckerResult(beforeCheckerResult, checker);
        }
    }

    private void sendAlarmMessage(CheckerResult beforeCheckerResult, AlarmChecker checker) {
        if (isTurnToSendAlarm(beforeCheckerResult)) {
            // 是否配置了发送报警短信
            if (checker.isSMSSend()) {
                alarmMessageSender.sendSms(checker, beforeCheckerResult.getSequenceCount() + 1);
            }
            // 是否配置了发送报警邮件
            if (checker.isEmailSend()) {
                alarmMessageSender.sendEmail(checker, beforeCheckerResult.getSequenceCount() + 1);
            }
        }

    }
  // ...
}

  

原文地址:https://www.cnblogs.com/langshiquan/p/9497464.html

时间: 2024-11-08 09:06:52

pinpoint web报警机制源码解析的相关文章

Qt高级——Qt信号槽机制源码解析

Qt高级--Qt信号槽机制源码解析 基于Qt4.8.6版本 一.信号槽机制的原理 1.信号槽简介 信号槽是观察者模式的一种实现,特性如下:A.一个信号就是一个能够被观察的事件,或者至少是事件已经发生的一种通知:B.一个槽就是一个观察者,通常就是在被观察的对象发生改变的时候--也可以说是信号发出的时候--被调用的函数:C.信号与槽的连接,形成一种观察者-被观察者的关系:D.当事件或者状态发生改变的时候,信号就会被发出:同时,信号发出者有义务调用所有注册的对这个事件(信号)感兴趣的函数(槽).信号和

Qt信号槽机制源码解析

Qt信号槽机制源码解析 来源 https://blog.51cto.com/9291927/2070398 一.信号槽机制的原理 1.信号槽简介 信号槽是观察者模式的一种实现,特性如下:A.一个信号就是一个能够被观察的事件,或者至少是事件已经发生的一种通知:B.一个槽就是一个观察者,通常就是在被观察的对象发生改变的时候——也可以说是信号发出的时候——被调用的函数:C.信号与槽的连接,形成一种观察者-被观察者的关系:D.当事件或者状态发生改变的时候,信号就会被发出:同时,信号发出者有义务调用所有注

安卓中的事件分发机制源码解析

安卓中的事件分发机制主要涉及到两类控件,一类是容器类控件ViewGroup,如常用的布局控件,另一类是显示类控件,即该控件中不能用来容纳其它控件,它只能用来显示一些资源内容,如Button,ImageView等控件.暂且称前一类控件为ViewGroup类控件(尽管ViewGroup本身也是一个View),后者为View类控件. 安卓中的事件分发机制主要涉及到dispatchTouchEvent(MotionEvent ev).onInterceptTouchEvent(MotionEvent e

Android Handler消息机制源码解析

好记性不如烂笔头,今天来分析一下Handler的源码实现 Handler机制是Android系统的基础,是多线程之间切换的基础.下面我们分析一下Handler的源码实现. Handler消息机制有4个类合作完成,分别是Handler,MessageQueue,Looper,Message Handler : 获取消息,发送消息,以及处理消息的类 MessageQueue:消息队列,先进先出 Looper : 消息的循环和分发 Message : 消息实体类,分发消息和处理消息的就是这个类 主要工

聊聊Dubbo - Dubbo可扩展机制源码解析

摘要: 在Dubbo可扩展机制实战中,我们了解了Dubbo扩展机制的一些概念,初探了Dubbo中LoadBalance的实现,并自己实现了一个LoadBalance.是不是觉得Dubbo的扩展机制很不错呀,接下来,我们就深入Dubbo的源码,一睹庐山真面目. 在Dubbo可扩展机制实战中,我们了解了Dubbo扩展机制的一些概念,初探了Dubbo中LoadBalance的实现,并自己实现了一个LoadBalance.是不是觉得Dubbo的扩展机制很不错呀,接下来,我们就深入Dubbo的源码,一睹庐

Android View 事件分发机制 源码解析 (上)

一直想写事件分发机制的文章,不管咋样,也得自己研究下事件分发的源码,写出心得~ 首先我们先写个简单的例子来测试View的事件转发的流程~ 1.案例 为了更好的研究View的事件转发,我们自定以一个MyButton继承Button,然后把跟事件传播有关的方法进行复写,然后添加上日志~ MyButton [java] view plain copy package com.example.zhy_event03; import android.content.Context; import andr

Guava Futures异步回调机制源码解析

本文是在学习中的总结,欢迎转载但请注明出处:http://blog.csdn.net/pistolove/article/details/51758194 1.前言 在前两篇文章中简单阐述了Java Future 和Guava ListenableFuture及其相关的应用.我们发现Guava ListenableFuture提供了比Java Future更加强大的功能,而在Google Guava并发包中,某些情况下,Futures这个类起到了不可或缺的作用,而ListenableFuture

android handler机制源码解析【异步回调】

流程总结 Looper.prepare():本线程中保存一个Looper实例,然后该实例中保存一个MessageQueue对象:因为Looper.prepare()在一个线程中只能调用一次,所以MessageQueue在一个线程中只会存在一个. Looper.loop():轮询MessageQueue,回调msg.target.dispatchMessage(msg)方法. Handler构造方法:得到当前线程中保存的Looper实例,进而与Looper实例中的MessageQueue想关联.

jquery源码解析:jQuery数据缓存机制详解1

jQuery中有三种添加数据的方法,$().attr(),$().prop(),$().data().但是前面两种是用来在元素上添加属性值的,只适合少量的数据,比如:title,class,name等.对于json这种数据量大的,就适合用data方法来添加,而data方法就是jQuery缓存机制最重要的方法. jQuery中为什么要用缓存机制系统呢?因为DOM元素与js对象之间互相引用,在大部分浏览器下会引起内存泄漏.为了解决这个问题,jQuery就写了一个缓存机制系统.举个例子: var di