基于loghub的消息消费延迟监控

  我们可以把loghub当作一个消息中间件来使用。如果能知道当前的消费进度,自然好了,否则消费情况一无所知,总是有点慌!

  loghub消费分两种情况,一是普通消费,二是消费组消费;

  消费组消费,loghub服务端会记录消费情况,这时可以通过调用服务端API进行偏移信息查询。

  普通消费则不同,需要自行维护偏移量,即只有自己知道偏移信息,自己处理延迟。我们主要讨论这种情况。

一、 消费loghub数据的样例如下:

    // 普通消费
    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                cursor = response.getNextCursor();
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }

  因为消费一直在进行,想要进行监控,就插入一些埋点。我们可以使用的 Map 来保存每个 shard 的消费延迟情况。用一个 LoghubCursorDelayTransformer 描述具体信息。

    /**
     * 消费偏移控制容器
     */
    public static final ConcurrentMap<Integer, LoghubCursorDelayTransformer> CONSUME_CURSOR_DELAY_TRANSFORMER = new ConcurrentHashMap<>();

/**
 * loghub 分区延迟管理器
 *
 * @author weiy
 * @date 2019/11/27
 */
public class LoghubCursorDelayTransformer {
    /**
     * 最后一次消费 loghub 数据的时间(大约)
     */
    private int lastConsumeDataTime;

    /**
     * 消费延迟 (s)
     */
    private int delay;

    /**
     * 分区 shard
     */
    private int shard;

    /**
     * 记录创建时间,如果创建时间已很久,说明该消费延迟应已失效
     */
    private long recordTime = System.currentTimeMillis();

    public LoghubCursorDelayTransformer(int lastConsumeDataTime, int delay, int shard) {
        this.lastConsumeDataTime = lastConsumeDataTime;
        this.delay = delay;
        this.shard = shard;
    }

    public int getLastConsumeDataTime() {
        return lastConsumeDataTime;
    }

    public int getDelay() {
        return delay;
    }

    public int getShard() {
        return shard;
    }

    public long getRecordTime() {
        return recordTime;
    }

}

二、 埋点插入监控数据

  只要在每次消费完成之后,进行一次消费延迟的记录就好了,具体记录可以视情况而定。比如,每消费一批次之后记录一次就是个不错的选择!

    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    // 没有更多数据,以当前系统时间作为最后消费时间(并不关心实际生产者是否有在产生旧数据)
                    metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1);
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                // 每批次消费完成后,记录一次消费延迟情况
                // 此处取 最后一个消息的时间作为批次时间点
                int lastestConsumeTime = logGroups.get(logGroups.size() -1).GetFastLogGroup().getLogs(0).getTime();
                metricConsumeDelay(lastestConsumeTime, shardId, null);
                cursor = response.getNextCursor();
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }
    /**
     * 记录消费延迟信息
     *
     * @param lastConsumeTime 最后消费时间(如果没有获取到数据,则使用系统时间代替),单位为 s秒
     * @param shard 分区id
     * @param calculatedDelay 已计算好的延时,为null时需要根据当前系统时间计算
     */
    public static void metricConsumeDelay(int lastConsumeTime, int shard, Integer calculatedDelay) {
        if(calculatedDelay == null) {
            calculatedDelay = (int)(System.currentTimeMillis() / 1000) - lastConsumeTime;
        }
        LoghubCursorDelayTransformer delayTransformer = new LoghubCursorDelayTransformer(
                lastConsumeTime, calculatedDelay, shard);
        CONSUME_CURSOR_DELAY_TRANSFORMER.put(shard, delayTransformer);
    }

  如上的延迟统计是不准确的,如果想准确统计,应使用 cursor 与 最后的偏移进行对比才行。如下:

    private static void consumeDataFromShard(int shardId) throws Exception {
        String cursor = client.GetCursor(project, logStore, shardId, new Date()).GetCursor();
        System.out.println("cursor = " +cursor);
        try {
            while (true) {
                PullLogsRequest request = new PullLogsRequest(project, logStore, shardId, 1000, cursor);
                PullLogsResponse response = client.pullLogs(request);
                List<LogGroupData> logGroups = response.getLogGroups();
                if (logGroups.isEmpty()) {
                    // 没有更多数据,以当前系统时间作为最后消费时间(并不关心实际生产者是否有在产生旧数据)
                    metricConsumeDelay((int)(System.currentTimeMillis() / 1000), shardId, -1);
                    return;
                }

                System.out.println(response.getCount());
                System.out.println("cursor = " + cursor + " next_cursor = " + response.getNextCursor());
                logGroups.forEach(rec1 -> {
                    // do your biz
                });
                cursor = response.getNextCursor();
                // 从loghub-api 换取具体时间,计算延迟,可能会导致性能下降厉害
                int lastestConsumeTime = exchangeTimeWithCursorFromApi(cursor, shardId);
                int delay = getMaxTimeOffsetFromApi(shardId) - lastestConsumeTime;
                metricConsumeDelay(lastestConsumeTime, shardId, delay);
                Thread.sleep(200);
            }
        }
        catch(LogException e) {
            System.out.println(e.GetRequestId() + e.GetErrorMessage());
        }
    }

    /**
     * 从loghub-api中获取对应cursor的时间
     *
     * @param cursor 指定游标(当前)
     * @param shardId 分区id
     * @return 数据时间
     * @throws LogException 查询异常时抛出
     */
    public static int exchangeTimeWithCursorFromApi(String cursor, int shardId) throws LogException {
        GetCursorTimeResponse cursorTimeResponse = client.GetCursorTime(project, logStore, shardId, cursor);
        return cursorTimeResponse.GetCursorTime();
    }

    /**
     * 从loghub-api中获取最大的时间偏移,以便精确计算消费延迟
     *
     * @param shardId 分区id
     * @return 最大时间
     * @throws LogException 查询异常时抛出
     */
    public static int getMaxTimeOffsetFromApi(int shardId) throws LogException {
        String cursor = client.GetCursor(project, logStore, shardId, Consts.CursorMode.END).GetCursor();
        return exchangeTimeWithCursorFromApi(cursor, shardId);
    }

三、 监控数据暴露

  通过prometheus进行数据暴露!

    /**
     * 暴露延迟信息数据,启动时调用即可
     */
    public static void exposeMetricData() {
        // 统计loghub消费延时
        CollectorRegistry.defaultRegistry.register(new Collector() {
            @Override
            public List<MetricFamilySamples> collect() {
                List<MetricFamilySamples> mfs = new ArrayList<>();
                final ConcurrentMap<Integer, LoghubCursorDelayTransformer> cursorHolder = CONSUME_CURSOR_DELAY_TRANSFORMER;
                // With lastest time labels
                GaugeMetricFamily consumeTimeGauge = new GaugeMetricFamily("my_shard_consume_lastest",
                        "last consume time watch help",
                        Collections.singletonList("shard"));
                // With delay labels
                GaugeMetricFamily delayGauge = new GaugeMetricFamily("my_shard_consume_delay",
                        "delay msg help",
                        Collections.singletonList("shard"));
                // todo: 注意优化消费长时间暂停情况
                for (LoghubCursorDelayTransformer delayTransformer : cursorHolder.values()) {
                    delayGauge.addMetric(
                            Collections.singletonList(delayTransformer.getShard() + ""),
                            delayTransformer.getDelay());
                    consumeTimeGauge.addMetric(Collections.singletonList("" + delayTransformer.getShard()), delayTransformer.getLastConsumeDataTime());
                }

                mfs.add(delayGauge);
                mfs.add(consumeTimeGauge);
                return mfs;
            }

        });
    }

  是不是很简单?自定义一个 Collector 就可以了。接入信息的其他细节可以参考之前的文章。

四、 消费组的监控?

  消费端实践

    private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
    private static String sProject = "ali-cn-hangzhou-sls-admin";
    private static String sLogstore = "sls_operation_log";
    private static String sConsumerGroup = "consumerGroupX";
    private static String sAccessKeyId = "";
    private static String sAccessKey = "";
    public static void groupConsume() throws LogHubClientWorkerException, InterruptedException {
        // 第二个参数是消费者名称,同一个消费组下面的消费者名称必须不同,可以使用相同的消费组名称,不同的消费者名称在多台机器上启动多个进程,来均衡消费一个Logstore,这个时候消费者名称可以使用机器ip来区分。第9个参数(maxFetchLogGroupSize)是每次从服务端获取的LogGroup数目,使用默认值即可,如有调整请注意取值范围(0,1000]。
        LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
        ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
        Thread thread = new Thread(worker);
        //Thread运行之后,Client Worker会自动运行,ClientWorker扩展了Runnable接口。
        thread.start();
        Thread.sleep(60 * 60 * 1000);
        //调用worker的Shutdown函数,退出消费实例,关联的线程也会自动停止。
        worker.shutdown();
        //ClientWorker运行过程中会生成多个异步的Task,Shutdown之后最好等待还在执行的Task安全退出,建议sleep 30s。
        Thread.sleep(30 * 1000);
    }
// 消费业务端样例
public class SampleLogHubProcessor implements ILogHubProcessor {
    private int shardId;
    // 记录上次持久化 checkpoint 的时间。
    private long mLastCheckTime = 0;

    public void initialize(int shardId) {
        this.shardId = shardId;
    }

    // 消费数据的主逻辑,这里面的所有异常都需要捕获,不能抛出去。
    public String process(List<LogGroupData> logGroups,
                          ILogHubCheckPointTracker checkPointTracker) {
        // 这里简单的将获取到的数据打印出来。
        for (LogGroupData logGroup : logGroups) {
            FastLogGroup flg = logGroup.GetFastLogGroup();
            System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                    flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
            System.out.println("Tags");
            for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                FastLogTag logtag = flg.getLogTags(tagIdx);
                System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
            }
            for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                FastLog log = flg.getLogs(lIdx);
                System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                    FastLogContent content = log.getContents(cIdx);
                    System.out.println(content.getKey() + "\t:\t" + content.getValue());
                }
            }
        }
        long curTime = System.currentTimeMillis();
        // 每隔 30 秒,写一次 checkpoint 到服务端,如果 30 秒内,worker crash,
        // 新启动的 worker 会从上一个 checkpoint 取消费数据,有可能有少量的重复数据。
        if (curTime - mLastCheckTime > 30 * 1000) {
            try {
                //参数true表示立即将checkpoint更新到服务端,为false会将checkpoint缓存在本地,后台默认隔60s会将checkpoint刷新到服务端。
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
            mLastCheckTime = curTime;
        }
        return null;
    }

    // 当 worker 退出的时候,会调用该函数,用户可以在此处做些清理工作。
    public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
        //将消费断点保存到服务端。
        try {
            checkPointTracker.saveCheckPoint(true);
        } catch (LogHubCheckPointException e) {
            e.printStackTrace();
        }
    }
}

class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
    public ILogHubProcessor generatorProcessor() {
        // 生成一个消费实例。
        return new SampleLogHubProcessor();
    }
}

  实现原理即定期向loghub中写入 checkpoint, 以便可以查询。既然数据都写入了 loghub 服务端,那么也能很容易在后台看到消费延迟了。

  不过我们也可以通过api获取消费情况,自行另外监控也行。(只是意义不大)

  可以通过如下方式获取当前消费情况,与最后的数据偏移做比较,就可以得到延迟情况了。

    List<ConsumerGroupShardCheckPoint> checkPoints = client.GetCheckPoint(project, sLogstore, sConsumerGroup).getCheckPoints();

五、 grafana 延迟监控配置

  前面通过prometheus获取到了延迟数据,接入到grafana后,就可以进行展示了。我们先来看下最终效果!

  配置本身是很简单的,有个注意的点是需要整合两个坐标数据,因为一个消费延迟数据,另一个是具体的消费时间,这样就可以同步查看了。

  配置右边的Y轴坐标需要使用 series override 选项,使用正则进行匹配如: /最后消费时间shard:.*/i

  时间选项需要乘以1000变为毫秒如: test_shard_consume_lastest * 1000

  监控思路可以扩展到以拉取模式进行消费的消息系统。

原文地址:https://www.cnblogs.com/yougewe/p/11959394.html

时间: 2024-11-04 10:06:12

基于loghub的消息消费延迟监控的相关文章

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

mysql主从同步(3)-percona-toolkit工具(数据一致性监测、延迟监控)使用梳理

转自:http://www.cnblogs.com/kevingrace/p/6261091.html 在mysql工作中接触最多的就是mysql replication mysql在复制方面还是会有一些常规问题: 比如主库宕机或者从库宕机有可能会导致复制中断,通常需要进行人为修复, 或者很多时候需要把一个从库提升为主库,但对从库和主库的数据一致性不能保证一样. 这种情况下就需要使用percona-toolkit工具的pt-table-checksum组件来检查主从数据的一致性:如果发现不一致的

基于JMS的消息传送

简单的介绍下基于JMS的消息传送 Java消息队列JMS整体设计结构 基本要素:生产者(producer),消费者(consumere),消息服务(broker) 交互模型: JMS两种消息传送模式 点对点(Point-to-Point):专门用于使用队列Queue传送消息: 发布/订阅(Publish/Subscribe):专门用于使用主题Topic传送消息 两种传送方式比较 基于队列Queue的点对点消息只能被一个消费者消费,如多个消费者都注册到同一个消息队列上,当生产者发送一条消息后,而只

关于RocketMQ消息消费与重平衡的一些问题探讨

其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结. 关于 push 模式下的消息循环拉取问题 之前发表了一篇关于重平衡的文章:「Kafka 重平衡机制」,里面有说到 RocketMQ 重平衡机制是每隔 20s 从任意一个 Broker 节点获取消费组的消费 ID 以及订阅信息,再根据这些订阅信息进行分配,然后将分配到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 队列中

消息消费队列和索引文件的更新

ConsumeQueue,IndexFile需要及时更新,否则无法及时被消费,根据消息属性查找消息也会出现较大延迟. mq通过开启一个线程ReputMessageService来准时转发commitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue,IndexFile文件 DefaultMessageStore#start ReputMessageService线程每执行一次任务推送休息1毫秒旧继续尝试推送消息到消息消费队列和索引文件. 返回reputFromOf

mq消息消费

消息消费以组的的模式开展: 一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题: 消费组之间有集群模式与广播模式两种消费模式:集群模式-主题下的同一条消息只允许被其中一个消费者消费.广播模式-主题下的同一条消息将被集群内的所有消费者消费一次.集群模式下消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消息消费者消费,一个消费者可以消费多个消息队列. 消息服务器与消费者之间的消息传送也有两种方式 推模式,拉模式:拉模式-消费端主动发起啦消息请求.推模式-消息到达服务器后,

Zabbix基于Proxy分布式部署实现Web监控

前言 在日常运维工作中,难免会遇到这样或那样的故障,如何能在第一时间发现故障,并及时定位故障原因,保证业务不受影响,我想这应该是做好一个运维必须要掌握的技能.但人力不可能实时掌控系统的变化,于是监控系统应运而生,监控便是运维的眼睛,把监控和性能管理做好后,运维就是一件很轻松的事情.目前比较流行的开源监控工具有Cacti.Nagios(Icinga).Zabbix等.本文带来的是Zabbix基于Proxy分布式部署实现Web监控. Zabbix 简介 Zabbix是一个基于Web界面提供分布式系统

自定义基于xmemcached协议消息队列的Spark Streaming 接收器

虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的.对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可.而Receiver的实现只需要简单地实现两个方法: 1.onStart():接收数据. 2.onStop():停止接收数据. 一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收.而onStop()方法负责确保这些接收数据的线程是停止的,在 Receiver 被关闭时调用了

转:使用基于Http的消息代替WebService的数据交互

http://blog.csdn.net/cyq1984/article/details/38041671 系统间交互的工作,随着信息化建设的发展,以及业界对SOA的认识及其带来的低TOC(总体拥有成本)等优势,越来越受到信息化水平较高的用户的重视. 这里先抛开SOA这样的架构规划,单纯就系统间整合的协议进行讨论. 系统间的交互或者成为整合(互联互通),早在信息化系统诞生的时候,就已经出现,只是并不明显,或者由于早期开发平台.开发语言等的单一性,这种需求并没有非常大的爆发出来. 随着信息化建设的