CAT跨语言服务加拿大28平台搭建链监控(七)消息分析器与报表

CrossAnalyzer-调用链加拿大28平台搭建论坛:haozbbs.com Q1446595067分析

在分布式环境中,应用是运行在独立的进程中的,有可能是不同的机器,或者不同的服务器进程。那么他们如果想要彼此联系在一起,形成一个调用链,在Cat中,CrossAnalyzer会统计不同服务之间调用的情况,包括服务的访问量,错误量,响应时间,QPS等,这里的服务主要指的是 RPC 服务,在微服务监控中,这是核心。

在讲 CrossAnalyzer 的处理逻辑之前,我们先看下客户端的埋点的一个模拟情况。

一般情况下不同服务会通过几个ID进行串联。这种串联的模式,基本上都是一样的。在Cat中,我们需要3个ID:

RootId,用于标识唯一的一个调用链
ParentId,父Id是谁?谁在调用我
ChildId,我在调用谁?
那么我们如何传递这些ID?Cat为我们提供了一个内部接口 Cat.Context,但是我们需要自己实现Context,在下面代码中我们首先在before函数中实现了Context 上下文,然后在rpcClient中开启消息事务,并调用 Cat.logRemoteCallClient(context) 去填充Context的这3个MessageID。当然,该函数还记录了一个RemoteCall类型的Event消息。

 随后我们用rpcService函数中开启新线程模拟远程RPC服务,并将context上传到 RPC 服务器,在真实环境中,Context是需要跨进程网络传输,因此需要实现序列化接口。

在rpcService中,我们会调用 Cat.logRemoteCallServer(context) 将从rpcClient传过来的Context设置到自己的 Transaction 当中。

随着业务处理逻辑的结束, rpcServer 和 rpcClient 都会分别将自己的消息树上传到CAT服务器分析。

需要注意的是,Service的client和app需要和Call的server以及app对应上,要不然图表是分析不出东西的!

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
public Map<String, String> maps = new HashMap<String, String>();

public Cat.Context context;

@Before
public void before() {
    context = new Cat.Context() {
        @Override
        public void addProperty(String key, String value) { maps.put(key, value); }

        @Override
        public String getProperty(String key) { return maps.get(key); }
    };
}

@Test
public void simulateHierarchyTransaction() throws Exception {
        ...
        //RPC调用开始
        rpcClient();
        rpcClient2();
        ...
}

protected void rpcClient() {
    //客户端埋点,Domain为RpcClient,调用服务端提供的Echo服务
    Transaction parent = Cat.newTransaction("Call", "CallServiceEcho");
    Cat.getManager().getThreadLocalMessageTree().setDomain("RpcClient");

    Cat.logEvent("Call.server","localhost");
    Cat.logEvent("Call.app","RpcService");
    Cat.logEvent("Call.port","8888");
    Cat.logRemoteCallClient(context, "RpcClient");

    //开启新线程模拟远程RPC服务,将context上传到 RPC 服务器
    rpcService(context);

    parent.complete();
}

protected void rpcClient2() {
    ...
    //模拟另外一个RpcClient调用Echo服务
    rpcService(context, "RpcClient2");
    ...
}

protected void rpcService(final Cat.Context context, final String clientDomain) {
    Thread thread = new Thread() {
        @Override
        public void run() {
            //服务器埋点,Domain为 RpcService 提供Echo服务
            Transaction child = Cat.newTransaction("Service", "Echo");
            Cat.getManager().getThreadLocalMessageTree().setDomain("RpcService");

            Cat.logEvent("Service.client", localhost); //填客户端地址
            Cat.logEvent("Service.app", clientDomain);
            Cat.logRemoteCallServer(context);

            //to do your business

            child.complete();
        }
    };

    thread.start();

    try {
        thread.join();
    } catch (InterruptedException e) {
    }
}

}

接下来我们看看CAT服务器端CrossAnalyzer的逻辑。

    我们依然会为每个周期时间内的每个Domain创建一张报表(Cro***eport),然后不同的IP会分配不同的Local对象统计,每个IP又可能会接收来自不同Remote端的调用。

由于这里一个完整的调用链会涉及多个端的多个消息树,我们首先会根据Transaction的类型来判断是RpcService还是RpcClient,如果Type等于PigeonService或Service则该消息来自RpcService,如果Type等于 PigeonCall或Call则来自RpcClient。

先来看看RpcService端消息树的上报处理逻辑,CAT会调用 parsePigeonServerTransaction 函数去填充 CrossInfo 信息,CrossInfo包含的具体内容如下:

 localAddress : RpcService的IP地址

 remoteAddress : 服务调用者(RpcClient)的IP地址,由type="Service.client" 的Event子消息提供,注意,在处理RpcClient的上报时,我们会根据上报信息中的remoteAddress再次统计该RpcService数据,大家可能会疑惑这里是不是重复统计,事实上他们所处的视角是不一样的,前者是站在服务提供者的视角来统计我完成这次服务所耗费的时间、资源等,而后者则是站在RpcClient视角去统计自己从发出请求到得到结果所需的时长、资源等等,比如这中间就包含网络IO的消耗,这些在后续的报表中会有体现。

   app:客户端的Domain, 由type="Service.app"的Event子消息提供。

   remoteRole:固定为 Pigeon.Client , 表示远端角色为 Rpc 客户端。

   detailType: 固定为 PigeonService , 表示自己角色为 Rpc 服务端。

    最后,我们将用CrossInfo信息来更新报表(Cro***eport),我们首先根据 localAddress 即 RpcService的 IP 找到或创建 Local对象,然后根据 remoteAddress+remoteRole 找到或创建 Remote 对象,然后统计服务的访问量,错误量,处理时间,QPS。

   RpcService提供不只一个服务,不同的服务我们按名字分别统计在不同的Name对象里,比如上面案例,RpcService提供的是Echo服务。

我们再来看看RpcClient端上报处理逻辑,CAT调用parsePigeonClientTransaction函数填充CrossInfo信息,具体如下:

localAddress : RpcClient的IP地址

remoteAddress :服务提供者(RpcService)的地址,由 type="Call.server" 的Event子消息提供。

app:服务提供者的Domain,由type="Call.app" 的Event子消息提供,在统计完RpcClient端数据之后,会通过该属性获取服务提供者的CrossInfo。从RpcClient的视角再次统计RpcService的数据。

port:客户端端口,由 type="Call.port" 的Event子消息提供。

remoteRole:固定为 Pigeon.Server, 表示远端角色为服务提供者。

detailType: 固定为 PigeonCall , 表示自己角色为服务调用者。

然后,我们将用CrossInfo信息来更新报表(Cro***eport),也是根据 localAddress 找到Local对象,然后根据 remoteAddress+remoteRole 找到 Remote 对象,进行统计。

接着,我们通过convertCrossInfo函数利用RpcClient的CrossInfo信息去生成服务提供者的CrossInfo信息,这里实际上是为了从RpcClient的视角去统计服务提供者的报表!

public class CrossAnalyzer extends AbstractMessageAnalyzer<Cro***eport> implements LogEnabled {
private void processTransaction(Cro***eport report, MessageTree tree, Transaction t) {
CrossInfo crossInfo = parseCorssTransaction(t, tree);

    if (crossInfo != null && crossInfo.validate()) {
        updateCro***eport(report, t, crossInfo);

        String targetDomain = crossInfo.getApp();

        if (m_serverConfigManager.isRpcClient(t.getType()) && !DEFAULT.equals(targetDomain)) {
            CrossInfo serverCrossInfo = convertCrossInfo(tree.getDomain(), crossInfo);

            if (serverCrossInfo != null) {
                Cro***eport serverReport = m_reportManager.getHourlyReport(getStartTime(), targetDomain, true);

                updateCro***eport(serverReport, t, serverCrossInfo);
            }
        } else {
            m_errorAppName++;
        }
    }
    ...
}

}
这里的 serverCrossInfo 被填充了什么数据:

localAddress : RpcClient 的 remoteAddress。

remoteAddress :RpcClient 的 localAddress + clientPort

app:RpcClient 的 Domain。

remoteRole:固定为 Pigeon.Caller, 表示远端角色为服务调用者。

detailType: 固定为 PigeonCall

 最后还是用CrossInfo信息来更新报表(Cro***eport)。

最后我们看看我们生成了哪些报表数据,3个报表数据,分别是服务调用方 RpcClient和 RpcClient2,以及服务提供方RpcService。

接下来我们看看服务提供方的remotes数据信息,一共3条数据,第1条记录是站在RpcService角度统计服务器完成这2次服务所耗费的时间、资源等,后面2条记录则是站在RpcClient视角去统计自己从发出请求到得到结果所需的时长、资源等等。

第1条记录 duration 为 0.154ms, 第2,3条记录 duration 分别为 1072.62ms、1506.38ms, 两者巨大的时间差一般就是网络 IO 所需的时间,事实上大多数的服务时间的消耗都是在各种IO上。这类服务统称为IO密集型。

StorageAnalyzer --数据库/缓存分析

StorageAnalyzer主要分析一段时间内数据库、Cache访问情况:各种操作访问次数、响应时间、错误次数、长时间访问量等等,当客户端消息过来,StorageAnalyzer首先会分析事务属于数据库操作还是缓存操作,然后进行不同的处理,消息类型如果是SQL则是数据库操作,如果以Cache.memcached开头则认为是缓存操作。

我们首先看看数据库操作的分析过程,下面源码是客户端的案例,这是一个获取cat库config表全部数据的sql查询,我们将数据库操作所有信息都放在一个type="SQL" 的子事务消息中。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {br/>@Test
public void simulateHierarchyTransaction() throws Exception {
...
Transaction sqlT = cat.newTransaction("SQL", "Select");

    //do your SQL query

    cat.logEvent("SQL.Database", "jdbc:mysql://192.168.20.67:3306/cat");
    cat.logEvent("SQL.Method", "select");
    cat.logEvent("SQL.Statement", "SELECT", SUCCESS, "select * from cat.config");
    sqlT.complete();
    ...
}

}
上面消息上报到服务端之后,分析器将SQL类型子事务取出,调用processSQLTransaction去处理,将结果写入报表StorageReport

processSQLTransaction 首先通过DatabaseParser提取数据库的IP和数据库名称,该信息由type="SQL.Database"的Event子消息提供,该Event消息上报的是数据库连接的URL。

接着我们会获取数据库操作名,type="SQL.Method" 的Event子消息提供,数据库操作分4类,分别是select, update, delete, insert,如果不上报,分析器默认客户端在做select查询。

最后我们会为周期内的每个数据库创建一个Storage报表。并将提取信息放入StorageUpdateParam对象,然后将对象交给StorageReportUpdater来更新Storage报表。

public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {br/>@Inject
private DatabaseParser m_databaseParser;

@Inject
private StorageReportUpdater m_updater;

private void processSQLTransaction(MessageTree tree, Transaction t) {
    String databaseName = null;
    String method = "select";
    String ip = null;
    String domain = tree.getDomain();
    List<Message> messages = t.getChildren();

    for (Message message : messages) {
        if (message instanceof Event) {
            String type = message.getType();

            if (type.equals("SQL.Method")) {
                method = message.getName().toLowerCase();
            }
            if (type.equals("SQL.Database")) {
                Database database = m_databaseParser.queryDatabaseName(message.getName());

                if (database != null) {
                    ip = database.getIp();
                    databaseName = database.getName();
                }
            }
        }
    }
    if (databaseName != null && ip != null) {
        String id = querySQLId(databaseName);
        StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
        StorageUpdateParam param = new StorageUpdateParam();

        param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
              .setThreshold(LONG_SQL_THRESHOLD);// .setSqlName(sqlName).setSqlStatement(sqlStatement);
        m_updater.updateStorageReport(report, param);
    }
}

}
数据库与缓存的报表更新逻辑相同,不同ip地址的数据库/缓存的统计信息在不同Machine里面,同时也可能会有不同的Domain访问同一个数据库/缓存,每个Domain的访问都会被单独统计,每个Domain对数据库/缓存不同的操作会统计在不同Operation里,除了当前小时周期的统计汇总外,我们还会用Segment记录每分钟的汇总数据。访问时间超过1秒的数据库操作(缓存是50ms) 会被认为是长时间访问记录。

缓存操作

接下来我们看下缓存的案例,获取memcached中key="uid_1234567"的值,Storage分析器会判断Type是否以"Cache.memcached"开头,如果是,则认为这是一个缓存操作,(这里代码我认为有些稍稍不合理,如果我用的是Redis缓存,我希望上报的Type="Cache.Redis",所以我这里讲源码稍稍做了修改,判断Type如果以"Cache."开头,就认为是缓存)。

@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
br/>@Test
public void simulateHierarchyTransaction() throws Exception {
...
Transaction cacheT = cat.newTransaction("Cache.memcached", "get:uid_1234567");

    //do your cache operation

    cat.logEvent("Cache.memcached.server", "192.168.20.67:6379");
    cacheT.complete();
    ...
}

}
接下来我们看下Storage分析器的处理逻辑,processCacheTransaction负责分析消息, 事务类型"Cache.memcached"的“Cache.”后面部分将会被提取作为缓存类型,分析器会为每个类型的缓存都创建一个报表,事务名称":"前面部分会被提取作为操作名称,一般缓存有 add,get,hGet,mGet,remove等操作,缓存地址将由type="Cache.memcached.server"的Event子消息提供,最后我们还是将domain、ip、method、事务、阈值等消息放入StorageUpdateParam交由StorageReportUpdater来更新报表,更新逻辑与数据库一致。
public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {
br/>@Inject
private StorageReportUpdater m_updater;

private void processCacheTransaction(MessageTree tree, Transaction t) {
    String cachePrefix = "Cache.";
    String ip = "Default";
    String domain = tree.getDomain();
    String cacheType = t.getType().substring(cachePrefix.length());
    String name = t.getName();
    String method = name.substring(name.lastIndexOf(":") + 1);
    List<Message> messages = t.getChildren();

    for (Message message : messages) {
        if (message instanceof Event) {
            String type = message.getType();

            if (type.equals("Cache.memcached.server")) {
                ip = message.getName();
                int index = ip.indexOf(":");

                if (index > -1) {
                    ip = ip.substring(0, index);
                }
            }
        }
    }
    String id = queryCacheId(cacheType);
    StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
    StorageUpdateParam param = new StorageUpdateParam();

    param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
          .setThreshold(LONG_CACHE_THRESHOLD);
    m_updater.updateStorageReport(report, param);
}

}
StateAnalyzer

主要是分析CAT服务器自身的异常,他在周期任务运行中,不搜集任何数据,而是在周期结束后,对CAT的整体状况做一个汇总后生成报表,他的报表结构如下。

HeartbeatAnalyzer

分析器HeartbeatAnalyzer用于上报的心跳数据的分析。我们先看看客户端的收集逻辑,CAT客户端在初始化CatClientModule的时候,会开启一个StatusUpdateTask的线程任务,每隔一分钟去收集客户端的心跳状态,通过 Heartbeat 消息上报到客户端,心跳数据以xml格式存在于Heartbeat消息中。

public class CatClientModule extends AbstractModule {br/>@Override
protected void execute(final ModuleContext ctx) throws Exception {
...
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

        Threads.forGroup("cat").start(statusUpdateTask);
        ...
    }
}

}

Cat客户端会为Heartbeat消息创建一个System类型事务消息,然后将 Heartbeat 消息放入该事务,信息的收集靠StatusInfoCollector来完成,StatusInfoCollector将收集的数据写入StatusInfo对象,然后StatusUpdateTask将StatusInfo转化成xml之后放到Heartbeat消息数据段上报。

public class StatusUpdateTask implements Task, Initializable {br/>@Override
public void run() {
//创建类目录, 上报CAT客户端启动信息
...

    while (m_active) {
        long start = MilliSecondTimer.currentTimeMillis();

        if (m_manager.isCatEnabled()) {
            Transaction t = cat.newTransaction("System", "Status");
            Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
            StatusInfo status = new StatusInfo();

            t.addData("dumpLocked", m_manager.isDumpLocked());
            try {
                StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);

                status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

                buildExtensionData(status);
                h.addData(status.toString());
                h.setStatus(Message.SUCCESS);
            } catch (Throwable e) {
                h.setStatus(e);
                cat.logError(e);
            } finally {
                h.complete();
            }
            t.setStatus(Message.SUCCESS);
            t.complete();
        }

        //sleep 等待下一次心跳上报
        ...
    }
}

}
我们上报的XML到底包含哪些数据,我们看下StatusInfo的结构,StatusInfo除了包含上报时间戳之外,还有哪些系统状态信息、附加扩展信息(Extension)会被StatusInfoCollector收集?

1、运行时数据RuntimeInfo:JAVA版本 java.version、用户名user.name、项目目录user.dir、java类路径等等。

2、操作系统信息 OsInfo,同时创建System附加扩展信息。

3、磁盘信息DiskInfo,磁盘的总量、空闲与使用情况,同时创建Disk附加扩展信息

4、内存使用情况MemoryInfo,同时创建垃圾回收扩展信息、JAVA虚拟机堆附加扩展信息

CAT跨语言服务加拿大28平台搭建链监控(七)消息分析器与报表

原文地址:http://blog.51cto.com/13855531/2137024

时间: 2024-07-30 15:45:59

CAT跨语言服务加拿大28平台搭建链监控(七)消息分析器与报表的相关文章

深入详解美团点评CAT跨语言服务监控(一) CAT简介与部署

前言: CAT是一个实时和接近全量的监控系统,它侧重于对Java应用的监控,除了与点评RPC组件融合的很好之外,他将会能与Spring.MyBatis.Dubbo 等框架以及Log4j 等结合,支持PHP.C++.Go等多语言应用,基本接入了美团点评上海侧所有核心应用.目前在中间件(MVC.RPC.数据库.缓存等)框架中得到广泛应用,为美团点评各业务线提供系统的性能指标.健康状况.监控告警等,在微服务监控领域也是非常有用的一套组件.支撑这美团每天450亿的消息,50TB的数据监控,应用于 700

Apache Thrift 跨语言服务开发框架

Apache Thrift 是一种支持多种编程语言的远程服务调用框架,由 Facebook 于 2007 年开发,并于 2008 年进入 Apache 开源项目管理.Apache Thrift 通过 IDL 来定义 RPC 的接口和数据类型,然后通过代码生成工具来生成针对不同编程语言的代码,目前支持 C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCa

【转】Apache Thrift - 可伸缩的跨语言服务开发框架

Apache Thrift - 可伸缩的跨语言服务开发框架 Apache Thrift 是 Facebook 实现的一种高效的.支持多种编程语言的远程服务调用的框架.本文将从 Java 开发人员角度详细介绍 Apache Thrift 的架构.开发和部署,并且针对不同的传输协议和服务类型给出相应的 Java 实例,同时详细介绍 Thrift 异步客户端的实现,最后提出使用 Thrift 需要注意的事项. 12 评论 黄 晓军, 实习生, IBM 张 静, 软件工程师, IBM 张 凯, 高级软件

thrift框架总结,可伸缩的跨语言服务开发框架

thrift框架总结,可伸缩的跨语言服务开发框架 前言: 目前流行的服务调用方式有很多种,例如基于 SOAP 消息格式的 Web Service,基于 JSON 消息格式的 RESTful 服务等.其中所用到的数据传输方式包括 XML,JSON 等,然而 XML 相对体积太大,传输效率低,JSON 体积较小,新颖,但还不够完善.本文将介绍由 Facebook 开发的远程服务调用框架 Apache Thrift,它采用接口描述语言定义并创建服务,支持可扩展的跨语言服务开发,所包含的代码生成引擎可以

DoTwe幸运28平台搭建下载en全解析(入门篇)

DoTween,Itw幸运28平台搭建下载[征途源码论坛zhengtuwl.com]联系方式:QQ:2747044651幸运28平台搭建下载een,这些名字作为一个Unity开发人员听起来并不陌生,它们在动画方面表现出了令人折服的能力,今天我带着大家来一起认识一下这款插件. [征途源码论坛zhengtuwl.com]联系方式:QQ:2747044651幸运28平台搭建下载 原文地址:http://blog.51cto.com/13978873/2177158

第八章 跨语言服务治理方案 Service Mesh

8.1 Service Mesh 概述 新兴的下一代微服务架构,被称为下一代微服务,同时也是云原生技术栈的代表技术之一. 8.1.1 Service Mesh的由来 从2016年到2018年,service mesh经历了从无到有的过程 8.1.2 Service Mesh的定义 服务网格是一个基础设施层,用于处理服务间通信.现代云原生应用有着复杂的服务拓扑结构,服务网格负责在这些拓扑结构中实现请求的可靠传递.实践中,服务网格通常被实现为一组轻量级网络代理,它们与应用程序部署在一起,对应用程序透

Dubbo服务合买平台搭建出售发布之服务暴露&amp;心跳机制&amp;服务注册

Dubbo服务发布 Dubbo合买平台搭建出售 dsluntan.com Q:3393756370 VX:17061863513服务发布影响流程的主要包括三个部分,依次是: 服务暴露 心跳 服务注册 服务暴露是对外提供服务及暴露端口,以便消费端可以正常调通服务.心跳机制保证服务器端及客户端正常长连接的保持,服务注册是向注册中心注册服务暴露服务的过程. Dubbo服务暴露 此处只记录主要代码部分以便能快速定位到主要的核心代码: ServiceConfig.java中代码 if (registryU

零基础学Android开发之Java语言学习02-JAVA开发平台搭建

window系统安装java 下载JDK 首先我们需要下载java开发工具包JDK,下载地址:http://www.oracle.com/technetwork/java/javase/downloads/index.html,点击如下下载按钮: 在下载页面中你需要选择接受许可,并根据自己的系统选择对应的版本(每个人的操作系统是不一样的),本文以 Window 64位系统为例: 下载后JDK的安装根据提示进行,还有安装JDK的时候也会安装JRE,一并安装就可以了. 安装JDK,安装过程中可以自定

linux监控平台搭建(1)监控平台介绍、zabbix监控介绍、安装zabbix、忘记Admin密码

            (一)监控平台介绍 (二)zabbix监控介绍 (三)安装zabbix 实验需要两台机器:服务端(133)和客户端(134) 1.首先在服务端安装Zabbix的yum扩展源. 下载地址: www.zabbix.com/download wget http://repo.zabbix.com/zabbix/3.2/rhel/7/x86_64/zabbix-release-3.2-1.el7.noarch.rpm 2.安装一下这个rpm包 rpm -ivh zabbix-re