应用监控CAT之cat-consumer源码阅读

  之前讲了 cat-client 进行cat埋点上报,那么上报给谁呢?以及后续故事如何?让我们来看看 cat-consumer 是如何接收处理的?

  由cat-client发送数据,cat-consumer进行接收请求处理,开始了处理问题之旅!

首先,让我们来回顾一下 TcpSocketSender 是如何发送数据的:

// TcpSocketSender 往channel中写入数据,此处有兴趣的同学可以延伸下 netty 的源码!
    private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

        m_codec.encode(tree, buf);

        int size = buf.readableBytes();
        Channel channel = future.channel();

        // 以 ByteBuf 形式发送数据
        channel.writeAndFlush(buf);
        // 更新统计数据
        if (m_statistics != null) {
            m_statistics.onBytes(size);
        }
    }

// TcpSocketReceiver, 接收发送过来的数据,默认端口 2280, 注册服务,线上为分布式部署,即为接口调用式。

    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }

    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();

        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        // 添加处理handler, 进行请求逻辑处理
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                // 此处仅为一个解码器,实际功能在该解码器中完成
                pipeline.addLast("decode", new MessageDecoder());
            }
        });

        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }

// 消息解码器,并处理具体业务逻辑,先确认数据已上传完成,再进行逻辑处理
    public class MessageDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
            if (buffer.readableBytes() < 4) {
                return;
            }
            buffer.markReaderIndex();
            int length = buffer.readInt();
            buffer.resetReaderIndex();
            if (buffer.readableBytes() < length + 4) {
                return;
            }
            try {
                if (length > 0) {
                    ByteBuf readBytes = buffer.readBytes(length + 4);
                    readBytes.markReaderIndex();
                    readBytes.readInt();

                    DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);

                    readBytes.resetReaderIndex();
                    tree.setBuffer(readBytes);
                    // 交由handler处理实际逻辑
                    m_handler.handle(tree);
                    m_processCount++;

                    long flag = m_processCount % CatConstants.SUCCESS_COUNT;

                    if (flag == 0) {
                        m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
                    }
                } else {
                    // client message is error
                    buffer.readBytes(length);
                }
            } catch (Exception e) {
                m_serverStateManager.addMessageTotalLoss(1);
                m_logger.error(e.getMessage(), e);
            }
        }
    }
    

// handler 处理流程,由DefaultMessageHandler接手,安排后续工作。

    // DefaultMessageHandler, 接过处理器的第一棒, 交由另一实际的consumer(RealtimeConsumer) handler处理
    @Override
    public void handle(MessageTree tree) {
        if (m_consumer == null) {
            m_consumer = lookup(MessageConsumer.class);
        }

        try {
            m_consumer.consume(tree);
        } catch (Throwable e) {
            m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
        }
    }
    // RealtimeConsumer, 进行消费数据
    @Override
    public void consume(MessageTree tree) {
        String domain = tree.getDomain();
        String ip = tree.getIpAddress();

        // 进行权限检测,ip,domain
        if (!m_blackListManager.isBlack(domain, ip)) {
            long timestamp = tree.getMessage().getTimestamp();
            Period period = m_periodManager.findPeriod(timestamp);

            // 找到period, 再将消息分配过去,否则算作网络异常
            if (period != null) {
                period.distribute(tree);
            } else {
                m_serverStateManager.addNetworkTimeError(1);
            }
        } else {
            m_black++;

            if (m_black % CatConstants.SUCCESS_COUNT == 0) {
                Cat.logEvent("Discard", domain);
            }
        }
    }

// Period.distribute, 将消息依次取出,进行分发到队列

    public void distribute(MessageTree tree) {
        // 统计进行数进行加1
        m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
        boolean success = true;
        String domain = tree.getDomain();

        // 将各种类型的监控数据分别取出进行处理
        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            List<PeriodTask> tasks = entry.getValue();
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;

            if (manyTasks) {
                index = Math.abs(domain.hashCode()) % length;
            }
            PeriodTask task = tasks.get(index);
            // 如果有金条消息,将task重新入队
            boolean enqueue = task.enqueue(tree);

            if (enqueue == false) {
                if (manyTasks) {
                    task = tasks.get((index + 1) % length);
                    enqueue = task.enqueue(tree);

                    if (enqueue == false) {
                        success = false;
                    }
                } else {
                    success = false;
                }
            }
        }

        if (!success) {
            m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
        }
    }
    // PeriodTask.enqueue, 重新入队消息,让消费线程自行消费 LinkedBlockingQueue.offer(..)
    public boolean enqueue(MessageTree tree) {
        boolean result = m_queue.offer(tree);

        if (!result) { // trace queue overflow, 记录入队失败日志
            m_queueOverflow++;

            if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
                m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow);
            }
        }
        return result;
    }

到此,一条消费线路就完成了。

// PeriodTask 线程,作为第二个消费线路

    @Override
    public void run() {
        try {
            // 分析各消息数据,做后台消费处理
            m_analyzer.analyze(m_queue);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
    // 调用统一的抽象类的模板方法,由各类进行具体的 process 处理
    @Override
    public void analyze(MessageQueue queue) {
        while (!isTimeout() && isActive()) {
            MessageTree tree = queue.poll();

            if (tree != null) {
                try {
                    // 调用具体类的process
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            }
        }

        // 如果出现超时或者停止动作,则把剩余队列处理完成再退出线程
        while (true) {
            MessageTree tree = queue.poll();

            if (tree != null) {
                try {
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            } else {
                break;
            }
        }
    }
    // 超时规则,当前时间 > 开始时间+1小时+设置额外超时时间
    protected boolean isTimeout() {
        long currentTime = System.currentTimeMillis();
        long endTime = m_startTime + m_duration + m_extraTime;

        return currentTime > endTime;
    }
    

// 具体的 Anlalyzer示例: DumpAnlalyzer.process

// 具体的 Anlalyzer示例: DumpAnlalyzer.process
    @Override
    public void process(MessageTree tree) {
        String domain = tree.getDomain();

        if ("PhoenixAgent".equals(domain)) {
            return;
        } else {
            MessageId messageId = MessageId.parse(tree.getMessageId());

            if (messageId.getVersion() == 2) {
                // 计算出当前时间范围,
                long time = tree.getMessage().getTimestamp();
                long fixedTime = time - time % (TimeHelper.ONE_HOUR);
                long idTime = messageId.getTimestamp();
                long duration = fixedTime - idTime;

                if (duration == 0 || duration == ONE_HOUR || duration == -ONE_HOUR) {
                    m_bucketManager.storeMessage(tree, messageId);
                } else {
                    m_serverStateManager.addPigeonTimeError(1);
                }
            }
        }
    }
// 存储log消息到本地文件,并后续上传到hdfs
    @Override
    public void storeMessage(final MessageTree tree, final MessageId id) {
        boolean errorFlag = true;
        int hash = Math.abs((id.getDomain() + ‘-‘ + id.getIpAddress()).hashCode());
        int index = (int) (hash % m_gzipThreads);
        MessageItem item = new MessageItem(tree, id);
        LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1));
        boolean result = queue.offer(item);

        if (result) {
            errorFlag = false;
        } else {
            if (m_last.offer(item)) {
                errorFlag = false;
            }
        }

        if (errorFlag) {
            m_serverStateManager.addMessageDumpLoss(1);
        }
        logStorageState(tree);
    }
    // 每1000个消息添加一个messageDump=1000
    protected void logStorageState(final MessageTree tree) {
        String domain = tree.getDomain();
        int size = ((DefaultMessageTree) tree).getBuffer().readableBytes();

        m_serverStateManager.addMessageSize(domain, size);
        if ((++m_total) % CatConstants.SUCCESS_COUNT == 0) {
            m_serverStateManager.addMessageDump(CatConstants.SUCCESS_COUNT);
        }
    }

// EventAnalyzer.process 处理event消息

    @Override
    public void process(MessageTree tree) {
        String domain = tree.getDomain();

        if (m_serverFilterConfigManager.validateDomain(domain)) {
            EventReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
            Message message = tree.getMessage();
            String ip = tree.getIpAddress();

            if (message instanceof Transaction) {
                processTransaction(report, tree, (Transaction) message, ip);
            } else if (message instanceof Event) {
                processEvent(report, tree, (Event) message, ip);
            }
        }
    }
    // 循环处理多个transation
    private void processTransaction(EventReport report, MessageTree tree, Transaction t, String ip) {
        List<Message> children = t.getChildren();

        for (Message child : children) {
            if (child instanceof Transaction) {
                processTransaction(report, tree, (Transaction) child, ip);
            } else if (child instanceof Event) {
                processEvent(report, tree, (Event) child, ip);
            }
        }
    }
// StateAnalyzer.process 对cat的机器作展示
    @Override
    protected void process(MessageTree tree) {
        String domain = tree.getDomain();

        if (m_serverFilterConfigManager.validateDomain(domain)) {
            StateReport report = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true);
            String ip = tree.getIpAddress();
            Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());

            machine.findOrCreateProcessDomain(domain).addIp(ip);
        }
    }

// 所有分析线程,由 Period 进行初始化启动所有的Analyzer备用

    public void start() {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(),
              df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));

        for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
            List<PeriodTask> taskList = tasks.getValue();

            for (int i = 0; i < taskList.size(); i++) {
                PeriodTask task = taskList.get(i);

                task.setIndex(i);

                Threads.forGroup("Cat-RealtimeConsumer").start(task);
            }
        }
    }

// 为保证高可用,使用 ChannelManager, 专门检查channel通道是否仍然存活,如果出问题,则发起重连。

    @Override
    public void run() {
        while (m_active) {
            // make save message id index asyc
            m_idfactory.saveMark();
            checkServerChanged();

            ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
            List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

            doubleCheckActiveServer(activeFuture);
            reconnectDefaultServer(activeFuture, serverAddresses);

            try {
                Thread.sleep(10 * 1000L); // check every 10 seconds
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }
    

总结起来就几个东西:
  1. 使用netty开启高性能的接收服务;
  2. 使用队列进行保存消息;
  3. 使用单独线程检测channel有效性,保证高可用;
  4. 所有单小时的数据,保存在内存中,速度特别快;

等等,来个图展示下:。。。

task 运行过程

原文地址:https://www.cnblogs.com/yougewe/p/9494904.html

时间: 2024-10-13 02:55:39

应用监控CAT之cat-consumer源码阅读的相关文章

edwin报警和监控平台开源了(python源码)

简单介绍一下edwin edwin是一个报警和监控平台, 可以使用它监控任意东西, 如有异常(分为警告级和严重级), 可以发出报警. 可以自定义报警的通知方式, 比如邮件/短信/电话. 另外, 它提供一个web UI,  上, 能以dashboard形式展现监控指标的状态. edwin对于监控项目的组织形式, 由小到大是:  check item -> pagelet ->page -> dashboard.  另外,可以为 check item指定一个或多个 team 来负责.  这样

CI框架源码阅读笔记1 - 环境准备、基本术语和框架流程

最开始使用CI框架的时候,就打算写一个CI源码阅读的笔记系列,可惜虎头蛇尾,一直没有行动.最近项目少,总算是有了一些时间去写一些东西.于是准备将之前的一些笔记和经验记录下来,一方面权作备忘,另一方面时刻提醒自己:借鉴和学习才有出路,忘记过去意味着背叛! 基本术语说明 在本文开始之前,有必要对文中反复出现的术语做一个简单的说明,如果你对这一部分已经熟谙,完全可以略过.本文中反复出现和提及的术语包括: 前端控制器(Front Controller): 用于集中控制用户的所有请求的组件,将用户的请求发

【Java】【Fulme】Flume-NG源码阅读之SpoolDirectorySource

org.apache.flume.source.SpoolDirectorySource是flume的一个常用的source,这个源支持从磁盘中某文件夹获取文件数据.不同于其他异步源,这个源能够避免重启或者发送失败后数据丢失.flume可以监控文件夹,当出现新文件时会读取该文件并获取数据.当一个给定的文件被全部读入到通道中时,该文件会被重命名以标志已经完成.同时,该源需要一个清理进程来定期移除完成的文件. 通道可选地将一个完成路径的原始文件插入到每个事件的hearder域中.在读取文件时,sou

Android系统源码阅读(12):InputChannel的注册过程

Android系统源码阅读(12):InputChannel的注册过程 请对照AOSP版本:6.0.1_r50. InputManager可以获得输入事件并分发,Activity需要处理这些输入事件.那么,这两者之间如何建立的连接呢?这就需要InputChannel作为桥梁建立两者之间的通道. 1. ViewRootImpl创建InputChannel 这里ViewRoot类已经消失了,由ViewRootImpl替代.Activity在创建时会将自己的DecorView设置给对应的ViewRoo

《java.util.concurrent 包源码阅读》03 锁

Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了. 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码: class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition

Redis源码阅读(一)事件机制

Redis源码阅读(一)事件机制 Redis作为一款NoSQL非关系内存数据库,具有很高的读写性能,且原生支持的数据类型丰富,被广泛的作为缓存.分布式数据库.消息队列等应用.此外Redis还有许多高可用特性,包括数据持久化,主从模式备份等等,可以满足对数据完整有一定要求的场景. 而且Redis的源码结构简单清晰,有大量材料可以参阅:通过阅读Redis源码,掌握一些常用技术在Redis中的实现,相信会对个人编程水平有很大帮助.这里记录下我阅读Redis源码的心得.从我自己比较关心的几个技术点出发,

Redis源码阅读(二)高可用设计——复制

Redis源码阅读(二)高可用设计-复制 复制的概念:Redis的复制简单理解就是一个Redis服务器从另一台Redis服务器复制所有的Redis数据库数据,能保持两台Redis服务器的数据库数据一致. 使用场景:复制机制很实用,在客户端并发访问量很大,单台Redis扛不住的情况下,可以部署多台Redis复制相同的数据,共同对外提供服务,提高Redis并发访问处理能力.当然这种通过复制方式部署多台Redis以提高并发处理能力的方式只适用于客户端大部分访问为读数据请求的场景.此外,Redis从2.

【Dubbo源码阅读系列】服务暴露之远程暴露

引言 什么叫 远程暴露 ?试着想象着这么一种场景:假设我们新增了一台服务器 A,专门用于发送短信提示给指定用户.那么问题来了,我们的 Message 服务上线之后,应该如何告知调用方服务器,服务器 A 提供了 Message 功能?那么我们是不是可以把目前已提供的服务暴露在一个地方,让调用方知道某台机器提供了某个特定功能?带着这样的假设,我们今天就来聊聊 Dubbo 服务暴露之远程暴露!! 服务远程暴露 先回顾一下上篇文章,上篇文章我们聊到了 ServiceConfig 的 export() 方

JDK源码阅读(三):ArraryList源码解析

今天来看一下ArrayList的源码 目录 介绍 继承结构 属性 构造方法 add方法 remove方法 修改方法 获取元素 size()方法 isEmpty方法 clear方法 循环数组 1.介绍 一般来讲文章开始应该先介绍一下说下简介.这里就不介绍了 如果你不知道ArrayList是什么的话就没必要在看了.大致讲一下一些常用的方法 2.继承结构 ArrayList源码定义: ArrayList继承结构如下: Serializable 序列化接口 Cloneable 前面我们在看Object源

CI框架源码阅读笔记3 全局函数Common.php

从本篇开始,将深入CI框架的内部,一步步去探索这个框架的实现.结构和设计. Common.php文件定义了一系列的全局函数(一般来说,全局函数具有最高的加载优先权,因此大多数的框架中BootStrap引导文件都会最先引入全局函数,以便于之后的处理工作). 打开Common.php中,第一行代码就非常诡异: if ( ! defined('BASEPATH')) exit('No direct script access allowed'); 上一篇(CI框架源码阅读笔记2 一切的入口 index