核心类:
ZorkaAsyncThread.java
protected BlockingQueue<T> submitQueue; /** * Processes single item from submit queue (if any). */ public void runCycle() { try { T obj = submitQueue.take(); if (obj != null) { List<T> lst = new ArrayList<T>(plen); lst.add(obj); if (plen > 1) { submitQueue.drainTo(lst, plen-1); } process(lst); flush(); } } catch (InterruptedException e) { log.error(ZorkaLogger.ZAG_ERRORS, "Cannot perform run cycle", e); } } /** * Submits object to a queue. * * @param obj object to be submitted */ public boolean submit(T obj) { try { return submitQueue.offer(obj, 1, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { return false; } }
主要分为两个关键流程:
- ZorkaAgent插桩之后将record提交到相应线程对象的队列submitQueue
- beanshell创建线程,线程启动后,从提交队列submitQueue里不断取记录进行处理并发送到目标终端(日志或者其他监控平台)。
一、ZorkaAgent将record提交到队列
插桩过程中的probe会调用MainSubmitter的traceEnter、traceReturn、traceError、submit四个方法,它们主要用于处理trace record,并提交到相应的队列。
在traceReturn、traceError底层会调用pop()、pop()方法底层调用ZorkaAsyncThread继承类里实现的submit方法,将record提交到队列。
二、从队列获取record并处理,发送到目标流。
tracer.bsh创建异步线程,线程启动后,自动从队列获取record对象,处理并发送。每个线程对应一种tracer方式。
ZorkaAsyncThread是一个抽象类,有很多继承,比如ZabbixTrapper、SnmpTrapper、ZicoTraceOutput等。
继承类里面实现的process方法用于讲最终的record提交到目的流。submit方法用于将record提交到待处理队列。
时间: 2024-10-16 07:29:22