apollo 消息分发源码分析

1,MessageDispatch消息分发信息

    public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;

    protected ConsumerId consumerId;
    protected ActiveMQDestination destination;
    protected Message message;
    protected int redeliveryCounter;

    protected transient long deliverySequenceId;
    protected transient Object consumer;
    protected transient Runnable transmitCallback;
    protected transient Throwable rollbackCause;

2,自动确认,是再接受到消息,反馈给上层应用之前就给确认

在afterMessageIsConsumed方法中
   先deliveredMessages.clear();接着 session.sendAck(ack);

3,在从tcp取到消息后放到unconsumedMessages等待消费

4,在从unconsumedMessages取出消息预处理后,在beforeMessageIsConsumed方法加消息加到deliveredMessages 里面。 unconsumedMessages; 消费者从mq接受到消息存储的位置,还没有消费

5,receive内部消费之前

 private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
        md.setDeliverySequenceId(session.getNextDeliveryId());
        lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
        if (!isAutoAcknowledgeBatch()) {
            synchronized(deliveredMessages) {
                deliveredMessages.addFirst(md);
            }
            if (session.getTransacted()) {
                if (transactedIndividualAck) {
                    immediateIndividualTransactedAck(md);
                } else {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
        }
    }

6,receive内部消费之后

private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
        if (unconsumedMessages.isClosed()) {
            return;
        }
        if (messageExpired) {
            acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
            stats.getExpiredMessageCount().increment();
        } else {
            stats.onMessage();
            if (session.getTransacted()) {
                // Do nothing.
            } else if (isAutoAcknowledgeEach()) {
                if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                    synchronized (deliveredMessages) {
                        if (!deliveredMessages.isEmpty()) {
                            if (optimizeAcknowledge) {
                                ackCounter++;

                                // AMQ-3956 evaluate both expired and normal msgs as
                                // otherwise consumer may get stalled
                                if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack != null) {
                                        deliveredMessages.clear();
                                        ackCounter = 0;
                                        session.sendAck(ack);
                                        optimizeAckTimestamp = System.currentTimeMillis();
                                    }
                                    // AMQ-3956 - as further optimization send
                                    // ack for expired msgs when there are any.
                                    // This resets the deliveredCounter to 0 so that
                                    // we won't sent standard acks with every msg just
                                    // because the deliveredCounter just below
                                    // 0.5 * prefetch as used in ackLater()
                                    if (pendingAck != null && deliveredCounter > 0) {
                                        session.sendAck(pendingAck);
                                        pendingAck = null;
                                        deliveredCounter = 0;
                                    }
                                }
                            } else {
                                MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                if (ack!=null) {
                                    deliveredMessages.clear();
                                    session.sendAck(ack);
                                }
                            }
                        }
                    }
                    deliveryingAcknowledgements.set(false);
                }
            } else if (isAutoAcknowledgeBatch()) {
                ackLater(md, MessageAck.STANDARD_ACK_TYPE);
            } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                boolean messageUnackedByConsumer = false;
                synchronized (deliveredMessages) {
                    messageUnackedByConsumer = deliveredMessages.contains(md);
                }
                if (messageUnackedByConsumer) {
                    ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                }
            }
            else {
                throw new IllegalStateException("Invalid session state.");
            }
        }
    }
时间: 2024-10-06 06:16:58

apollo 消息分发源码分析的相关文章

Apollo配置中心源码分析

Apollo配置中心源码分析 1. apollo的核心代码分享 SpringApplication启动的关键步骤 在SpringApplication中,会加载所有实现了Init方法的类 protected void applyInitializers(ConfigurableApplicationContext context) { for (ApplicationContextInitializer initializer : getInitializers()) { Class<?> r

Tornado 高并发源码分析之三--- Application 对象

Application 对象主要工作: 服务器启动时: 1.在新建一个app的时候,根据设置好的 URL 和回调函数 Handler 封装成URLSpec 对象 服务器运行时: 2.在请求到来,将 HTTPServer 封装好的HTTPRequest 传入_RequestDispatcher对象,_RequestDispatcher对象根据传入的 HTTPRequest 使用URLSpec解析匹 match 正则匹配找到对应的 RequestHandler ,执行它的 _execute 方法 A

Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象

主要工作: 服务器启动的时候做的事: 1.把包含了各种配置信息的 application 对象封装到了 HttpServer 对象的 request_callback 字段中,等待被调用 2.TCPServer 通过 listen 方法启动端口监听, 封装_handle_connection回调函数,并注册到 IOLoop 中 服务器运行时做的事: 3.当有请求到来时,注册在 IOLoop 中的 _handle_connection 将会被调用, _handle_connection 方法将会调

Hybrid开发源码分析之safe-java-js-webview-bridge

周末抽空看了了下safe-java-js-webview-bridge的源码,整理了一份类之间的调用关系图. 该开源库的基本思路分三步: 1.在native端编写调用本地功能的class(如HostJsScope.java),在初始化WebviewChromeClient时根据该class(在JsCallJava构造函数中)反射动态生成js代码: 2.将动态生成的js代码通过webview.loadUrl触发的onProgressChanged注入到webview中,供前端可调用: 3.在前端调

拨云见日---android异步消息机制源码分析

做过windows GUI的同学应该清楚,一般的GUI操作都是基于消息机制的,应用程序维护一个消息队列,开发人员编写对应事件的回调函数就能实现我们想要的操作 其实android系统也和windows GUI一样,也是基于消息机制,今天让我们通过源码来揭开android消息机制的神秘面纱 谈起异步消息,就不能不提及Handler,在安卓中,由于主线程中不能做耗时操作,所以耗时操作必须让子线程执行,而且只能在主线程(即UI线程)中执行UI更新操作,通过Handler发送异步消息,我们就能更新UI,一

Handler消息机制源码分析

public static final Looper myLooper() { return (Looper)sThreadLocal.get(); } 先来个Handler执行过程的总结: 1. Looper.prepare()方法 为当前线程绑定looper, 在looper构造方法中创建一个messageQueue 2. 创建handler 重并写handleMessage方法 3. 使用handler发送消息,最终消息都会发送至messageQueue对象中,在messageQueue当

[Android]简略的Android消息机制源码分析

相关源码 framework/base/core/java/andorid/os/Handler.java framework/base/core/java/andorid/os/Looper.java framework/base/core/java/andorid/os/Message.java framework/base/core/java/andorid/os/MessageQueue.java libcore/luni/src/main/java/java/lang/ThreadLo

Android消息机制源码分析

本篇主要介绍Android中的消息机制,即Looper.Handler是如何协同工作的: Looper:主要用来管理当前线程的消息队列,每个线程只能有一个Looper Handler:用来将消息(Message)插入到当前线程的消息队列,并负责分发Looper中的消息,将消息发送到当前线程执行 具体关系图如下所示: 接下来我们来分析一下Looper和Handler的源码,了解一下其中的奥妙. 首先我们从一个程序运行的入口来分析,源码如下: public static void main(Stri

Tornado 高并发源码分析之二---Tornado启动和请求处理流程

Tornado 服务器启动流程 因为Tornado 里使用了很多传类的方式,也就是delegate,之所以要这么做,其实和 iOS 开发那样,也很多的 delegate, 如此来实现高度解耦,但是比较绕,所以建议: 1.先浏览一遍启动流程,再看源码 2.在看一遍请求到来时的处理流程,再看源码 备注: 流程图是xmind 编辑的,好像这里无法上传源文件,所以只能把图片下载下来看了,会没那么清晰