CrossAnalyzer-调用链加拿大28平台搭建论坛:haozbbs.com Q1446595067分析
在分布式环境中,应用是运行在独立的进程中的,有可能是不同的机器,或者不同的服务器进程。那么他们如果想要彼此联系在一起,形成一个调用链,在Cat中,CrossAnalyzer会统计不同服务之间调用的情况,包括服务的访问量,错误量,响应时间,QPS等,这里的服务主要指的是 RPC 服务,在微服务监控中,这是核心。
在讲 CrossAnalyzer 的处理逻辑之前,我们先看下客户端的埋点的一个模拟情况。
一般情况下不同服务会通过几个ID进行串联。这种串联的模式,基本上都是一样的。在Cat中,我们需要3个ID:
RootId,用于标识唯一的一个调用链
ParentId,父Id是谁?谁在调用我
ChildId,我在调用谁?
那么我们如何传递这些ID?Cat为我们提供了一个内部接口 Cat.Context,但是我们需要自己实现Context,在下面代码中我们首先在before函数中实现了Context 上下文,然后在rpcClient中开启消息事务,并调用 Cat.logRemoteCallClient(context) 去填充Context的这3个MessageID。当然,该函数还记录了一个RemoteCall类型的Event消息。
随后我们用rpcService函数中开启新线程模拟远程RPC服务,并将context上传到 RPC 服务器,在真实环境中,Context是需要跨进程网络传输,因此需要实现序列化接口。
在rpcService中,我们会调用 Cat.logRemoteCallServer(context) 将从rpcClient传过来的Context设置到自己的 Transaction 当中。
随着业务处理逻辑的结束, rpcServer 和 rpcClient 都会分别将自己的消息树上传到CAT服务器分析。
需要注意的是,Service的client和app需要和Call的server以及app对应上,要不然图表是分析不出东西的!
@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {
public Map<String, String> maps = new HashMap<String, String>();
public Cat.Context context;
@Before
public void before() {
context = new Cat.Context() {
@Override
public void addProperty(String key, String value) { maps.put(key, value); }
@Override
public String getProperty(String key) { return maps.get(key); }
};
}
@Test
public void simulateHierarchyTransaction() throws Exception {
...
//RPC调用开始
rpcClient();
rpcClient2();
...
}
protected void rpcClient() {
//客户端埋点,Domain为RpcClient,调用服务端提供的Echo服务
Transaction parent = Cat.newTransaction("Call", "CallServiceEcho");
Cat.getManager().getThreadLocalMessageTree().setDomain("RpcClient");
Cat.logEvent("Call.server","localhost");
Cat.logEvent("Call.app","RpcService");
Cat.logEvent("Call.port","8888");
Cat.logRemoteCallClient(context, "RpcClient");
//开启新线程模拟远程RPC服务,将context上传到 RPC 服务器
rpcService(context);
parent.complete();
}
protected void rpcClient2() {
...
//模拟另外一个RpcClient调用Echo服务
rpcService(context, "RpcClient2");
...
}
protected void rpcService(final Cat.Context context, final String clientDomain) {
Thread thread = new Thread() {
@Override
public void run() {
//服务器埋点,Domain为 RpcService 提供Echo服务
Transaction child = Cat.newTransaction("Service", "Echo");
Cat.getManager().getThreadLocalMessageTree().setDomain("RpcService");
Cat.logEvent("Service.client", localhost); //填客户端地址
Cat.logEvent("Service.app", clientDomain);
Cat.logRemoteCallServer(context);
//to do your business
child.complete();
}
};
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
}
}
}
接下来我们看看CAT服务器端CrossAnalyzer的逻辑。
我们依然会为每个周期时间内的每个Domain创建一张报表(Cro***eport),然后不同的IP会分配不同的Local对象统计,每个IP又可能会接收来自不同Remote端的调用。
由于这里一个完整的调用链会涉及多个端的多个消息树,我们首先会根据Transaction的类型来判断是RpcService还是RpcClient,如果Type等于PigeonService或Service则该消息来自RpcService,如果Type等于 PigeonCall或Call则来自RpcClient。
先来看看RpcService端消息树的上报处理逻辑,CAT会调用 parsePigeonServerTransaction 函数去填充 CrossInfo 信息,CrossInfo包含的具体内容如下:
localAddress : RpcService的IP地址
remoteAddress : 服务调用者(RpcClient)的IP地址,由type="Service.client" 的Event子消息提供,注意,在处理RpcClient的上报时,我们会根据上报信息中的remoteAddress再次统计该RpcService数据,大家可能会疑惑这里是不是重复统计,事实上他们所处的视角是不一样的,前者是站在服务提供者的视角来统计我完成这次服务所耗费的时间、资源等,而后者则是站在RpcClient视角去统计自己从发出请求到得到结果所需的时长、资源等等,比如这中间就包含网络IO的消耗,这些在后续的报表中会有体现。
app:客户端的Domain, 由type="Service.app"的Event子消息提供。
remoteRole:固定为 Pigeon.Client , 表示远端角色为 Rpc 客户端。
detailType: 固定为 PigeonService , 表示自己角色为 Rpc 服务端。
最后,我们将用CrossInfo信息来更新报表(Cro***eport),我们首先根据 localAddress 即 RpcService的 IP 找到或创建 Local对象,然后根据 remoteAddress+remoteRole 找到或创建 Remote 对象,然后统计服务的访问量,错误量,处理时间,QPS。
RpcService提供不只一个服务,不同的服务我们按名字分别统计在不同的Name对象里,比如上面案例,RpcService提供的是Echo服务。
我们再来看看RpcClient端上报处理逻辑,CAT调用parsePigeonClientTransaction函数填充CrossInfo信息,具体如下:
localAddress : RpcClient的IP地址
remoteAddress :服务提供者(RpcService)的地址,由 type="Call.server" 的Event子消息提供。
app:服务提供者的Domain,由type="Call.app" 的Event子消息提供,在统计完RpcClient端数据之后,会通过该属性获取服务提供者的CrossInfo。从RpcClient的视角再次统计RpcService的数据。
port:客户端端口,由 type="Call.port" 的Event子消息提供。
remoteRole:固定为 Pigeon.Server, 表示远端角色为服务提供者。
detailType: 固定为 PigeonCall , 表示自己角色为服务调用者。
然后,我们将用CrossInfo信息来更新报表(Cro***eport),也是根据 localAddress 找到Local对象,然后根据 remoteAddress+remoteRole 找到 Remote 对象,进行统计。
接着,我们通过convertCrossInfo函数利用RpcClient的CrossInfo信息去生成服务提供者的CrossInfo信息,这里实际上是为了从RpcClient的视角去统计服务提供者的报表!
public class CrossAnalyzer extends AbstractMessageAnalyzer<Cro***eport> implements LogEnabled {
private void processTransaction(Cro***eport report, MessageTree tree, Transaction t) {
CrossInfo crossInfo = parseCorssTransaction(t, tree);
if (crossInfo != null && crossInfo.validate()) {
updateCro***eport(report, t, crossInfo);
String targetDomain = crossInfo.getApp();
if (m_serverConfigManager.isRpcClient(t.getType()) && !DEFAULT.equals(targetDomain)) {
CrossInfo serverCrossInfo = convertCrossInfo(tree.getDomain(), crossInfo);
if (serverCrossInfo != null) {
Cro***eport serverReport = m_reportManager.getHourlyReport(getStartTime(), targetDomain, true);
updateCro***eport(serverReport, t, serverCrossInfo);
}
} else {
m_errorAppName++;
}
}
...
}
}
这里的 serverCrossInfo 被填充了什么数据:
localAddress : RpcClient 的 remoteAddress。
remoteAddress :RpcClient 的 localAddress + clientPort
app:RpcClient 的 Domain。
remoteRole:固定为 Pigeon.Caller, 表示远端角色为服务调用者。
detailType: 固定为 PigeonCall
最后还是用CrossInfo信息来更新报表(Cro***eport)。
最后我们看看我们生成了哪些报表数据,3个报表数据,分别是服务调用方 RpcClient和 RpcClient2,以及服务提供方RpcService。
接下来我们看看服务提供方的remotes数据信息,一共3条数据,第1条记录是站在RpcService角度统计服务器完成这2次服务所耗费的时间、资源等,后面2条记录则是站在RpcClient视角去统计自己从发出请求到得到结果所需的时长、资源等等。
第1条记录 duration 为 0.154ms, 第2,3条记录 duration 分别为 1072.62ms、1506.38ms, 两者巨大的时间差一般就是网络 IO 所需的时间,事实上大多数的服务时间的消耗都是在各种IO上。这类服务统称为IO密集型。
StorageAnalyzer --数据库/缓存分析
StorageAnalyzer主要分析一段时间内数据库、Cache访问情况:各种操作访问次数、响应时间、错误次数、长时间访问量等等,当客户端消息过来,StorageAnalyzer首先会分析事务属于数据库操作还是缓存操作,然后进行不同的处理,消息类型如果是SQL则是数据库操作,如果以Cache.memcached开头则认为是缓存操作。
我们首先看看数据库操作的分析过程,下面源码是客户端的案例,这是一个获取cat库config表全部数据的sql查询,我们将数据库操作所有信息都放在一个type="SQL" 的子事务消息中。
@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {br/>@Test
public void simulateHierarchyTransaction() throws Exception {
...
Transaction sqlT = cat.newTransaction("SQL", "Select");
//do your SQL query
cat.logEvent("SQL.Database", "jdbc:mysql://192.168.20.67:3306/cat");
cat.logEvent("SQL.Method", "select");
cat.logEvent("SQL.Statement", "SELECT", SUCCESS, "select * from cat.config");
sqlT.complete();
...
}
}
上面消息上报到服务端之后,分析器将SQL类型子事务取出,调用processSQLTransaction去处理,将结果写入报表StorageReport
processSQLTransaction 首先通过DatabaseParser提取数据库的IP和数据库名称,该信息由type="SQL.Database"的Event子消息提供,该Event消息上报的是数据库连接的URL。
接着我们会获取数据库操作名,type="SQL.Method" 的Event子消息提供,数据库操作分4类,分别是select, update, delete, insert,如果不上报,分析器默认客户端在做select查询。
最后我们会为周期内的每个数据库创建一个Storage报表。并将提取信息放入StorageUpdateParam对象,然后将对象交给StorageReportUpdater来更新Storage报表。
public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {br/>@Inject
private DatabaseParser m_databaseParser;
@Inject
private StorageReportUpdater m_updater;
private void processSQLTransaction(MessageTree tree, Transaction t) {
String databaseName = null;
String method = "select";
String ip = null;
String domain = tree.getDomain();
List<Message> messages = t.getChildren();
for (Message message : messages) {
if (message instanceof Event) {
String type = message.getType();
if (type.equals("SQL.Method")) {
method = message.getName().toLowerCase();
}
if (type.equals("SQL.Database")) {
Database database = m_databaseParser.queryDatabaseName(message.getName());
if (database != null) {
ip = database.getIp();
databaseName = database.getName();
}
}
}
}
if (databaseName != null && ip != null) {
String id = querySQLId(databaseName);
StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
StorageUpdateParam param = new StorageUpdateParam();
param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
.setThreshold(LONG_SQL_THRESHOLD);// .setSqlName(sqlName).setSqlStatement(sqlStatement);
m_updater.updateStorageReport(report, param);
}
}
}
数据库与缓存的报表更新逻辑相同,不同ip地址的数据库/缓存的统计信息在不同Machine里面,同时也可能会有不同的Domain访问同一个数据库/缓存,每个Domain的访问都会被单独统计,每个Domain对数据库/缓存不同的操作会统计在不同Operation里,除了当前小时周期的统计汇总外,我们还会用Segment记录每分钟的汇总数据。访问时间超过1秒的数据库操作(缓存是50ms) 会被认为是长时间访问记录。
缓存操作
接下来我们看下缓存的案例,获取memcached中key="uid_1234567"的值,Storage分析器会判断Type是否以"Cache.memcached"开头,如果是,则认为这是一个缓存操作,(这里代码我认为有些稍稍不合理,如果我用的是Redis缓存,我希望上报的Type="Cache.Redis",所以我这里讲源码稍稍做了修改,判断Type如果以"Cache."开头,就认为是缓存)。
@RunWith(JUnit4.class)
public class AppSimulator extends CatTestCase {br/>@Test
public void simulateHierarchyTransaction() throws Exception {
...
Transaction cacheT = cat.newTransaction("Cache.memcached", "get:uid_1234567");
//do your cache operation
cat.logEvent("Cache.memcached.server", "192.168.20.67:6379");
cacheT.complete();
...
}
}
接下来我们看下Storage分析器的处理逻辑,processCacheTransaction负责分析消息, 事务类型"Cache.memcached"的“Cache.”后面部分将会被提取作为缓存类型,分析器会为每个类型的缓存都创建一个报表,事务名称":"前面部分会被提取作为操作名称,一般缓存有 add,get,hGet,mGet,remove等操作,缓存地址将由type="Cache.memcached.server"的Event子消息提供,最后我们还是将domain、ip、method、事务、阈值等消息放入StorageUpdateParam交由StorageReportUpdater来更新报表,更新逻辑与数据库一致。
public class StorageAnalyzer extends AbstractMessageAnalyzer<StorageReport> implements LogEnabled {br/>@Inject
private StorageReportUpdater m_updater;
private void processCacheTransaction(MessageTree tree, Transaction t) {
String cachePrefix = "Cache.";
String ip = "Default";
String domain = tree.getDomain();
String cacheType = t.getType().substring(cachePrefix.length());
String name = t.getName();
String method = name.substring(name.lastIndexOf(":") + 1);
List<Message> messages = t.getChildren();
for (Message message : messages) {
if (message instanceof Event) {
String type = message.getType();
if (type.equals("Cache.memcached.server")) {
ip = message.getName();
int index = ip.indexOf(":");
if (index > -1) {
ip = ip.substring(0, index);
}
}
}
}
String id = queryCacheId(cacheType);
StorageReport report = m_reportManager.getHourlyReport(getStartTime(), id, true);
StorageUpdateParam param = new StorageUpdateParam();
param.setId(id).setDomain(domain).setIp(ip).setMethod(method).setTransaction(t)
.setThreshold(LONG_CACHE_THRESHOLD);
m_updater.updateStorageReport(report, param);
}
}
StateAnalyzer
主要是分析CAT服务器自身的异常,他在周期任务运行中,不搜集任何数据,而是在周期结束后,对CAT的整体状况做一个汇总后生成报表,他的报表结构如下。
HeartbeatAnalyzer
分析器HeartbeatAnalyzer用于上报的心跳数据的分析。我们先看看客户端的收集逻辑,CAT客户端在初始化CatClientModule的时候,会开启一个StatusUpdateTask的线程任务,每隔一分钟去收集客户端的心跳状态,通过 Heartbeat 消息上报到客户端,心跳数据以xml格式存在于Heartbeat消息中。
public class CatClientModule extends AbstractModule {br/>@Override
protected void execute(final ModuleContext ctx) throws Exception {
...
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
...
}
}
}
Cat客户端会为Heartbeat消息创建一个System类型事务消息,然后将 Heartbeat 消息放入该事务,信息的收集靠StatusInfoCollector来完成,StatusInfoCollector将收集的数据写入StatusInfo对象,然后StatusUpdateTask将StatusInfo转化成xml之后放到Heartbeat消息数据段上报。
public class StatusUpdateTask implements Task, Initializable {br/>@Override
public void run() {
//创建类目录, 上报CAT客户端启动信息
...
while (m_active) {
long start = MilliSecondTimer.currentTimeMillis();
if (m_manager.isCatEnabled()) {
Transaction t = cat.newTransaction("System", "Status");
Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
StatusInfo status = new StatusInfo();
t.addData("dumpLocked", m_manager.isDumpLocked());
try {
StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);
status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));
buildExtensionData(status);
h.addData(status.toString());
h.setStatus(Message.SUCCESS);
} catch (Throwable e) {
h.setStatus(e);
cat.logError(e);
} finally {
h.complete();
}
t.setStatus(Message.SUCCESS);
t.complete();
}
//sleep 等待下一次心跳上报
...
}
}
}
我们上报的XML到底包含哪些数据,我们看下StatusInfo的结构,StatusInfo除了包含上报时间戳之外,还有哪些系统状态信息、附加扩展信息(Extension)会被StatusInfoCollector收集?
1、运行时数据RuntimeInfo:JAVA版本 java.version、用户名user.name、项目目录user.dir、java类路径等等。
2、操作系统信息 OsInfo,同时创建System附加扩展信息。
3、磁盘信息DiskInfo,磁盘的总量、空闲与使用情况,同时创建Disk附加扩展信息
4、内存使用情况MemoryInfo,同时创建垃圾回收扩展信息、JAVA虚拟机堆附加扩展信息
CAT跨语言服务加拿大28平台搭建链监控(七)消息分析器与报表
原文地址:http://blog.51cto.com/13855531/2137024