spring的RabbitTemplate 接收Message源码导读

1,首先调用类SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法。内部类的主要属性如下

		private final BlockingQueueConsumer consumer;

		private final CountDownLatch start;

		private volatile FatalListenerStartupException startupException;

2,内部类的run方法如下

			boolean aborted = false;

			int consecutiveIdles = 0;

			int consecutiveMessages = 0;

			try {

				try {
					SimpleMessageListenerContainer.this.redeclareElementsIfNecessary();
					this.consumer.start();
					this.start.countDown();
				}
				catch (QueuesNotAvailableException e) {
					if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
						throw e;
					}
					else {
						this.start.countDown();
						handleStartupFailure(e);
						throw e;
					}
				}
				catch (FatalListenerStartupException ex) {
					throw ex;
				}
				catch (Throwable t) {
					this.start.countDown();
					handleStartupFailure(t);
					throw t;
				}

				if (SimpleMessageListenerContainer.this.transactionManager != null) {
					/*
					 * Register the consumer's channel so it will be used by the transaction manager
					 * if it's an instance of RabbitTransactionManager.
					 */
					ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory());
				}

				// Always better to stop receiving as soon as possible if
				// transactional
				boolean continuable = false;
				while (isActive(this.consumer) || continuable) {
					try {
						// Will come back false when the queue is drained
						continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
							if (continuable) {
								consecutiveIdles = 0;
								if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
									considerAddingAConsumer();
									consecutiveMessages = 0;
								}
							}
							else {
								consecutiveMessages = 0;
								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
									considerStoppingAConsumer(this.consumer);
									consecutiveIdles = 0;
								}
							}
						}
					}
					catch (ListenerExecutionFailedException ex) {
						// Continue to process, otherwise re-throw
					}
					catch (AmqpRejectAndDontRequeueException rejectEx) {
						/*
						 *  These will normally be wrapped by an LEFE if thrown by the
						 *  listener, but we will also honor it if thrown by an
						 *  error handler.
						 */
					}
				}

			}
			catch (InterruptedException e) {
				logger.debug("Consumer thread interrupted, processing stopped.");
				Thread.currentThread().interrupt();
				aborted = true;
			}
			catch (QueuesNotAvailableException ex) {
				if (SimpleMessageListenerContainer.this.missingQueuesFatal) {
					logger.error("Consumer received fatal exception on startup", ex);
					this.startupException = ex;
					// Fatal, but no point re-throwing, so just abort.
					aborted = true;
				}
			}
			catch (FatalListenerStartupException ex) {
				logger.error("Consumer received fatal exception on startup", ex);
				this.startupException = ex;
				// Fatal, but no point re-throwing, so just abort.
				aborted = true;
			}
			catch (FatalListenerExecutionException ex) {
				logger.error("Consumer received fatal exception during processing", ex);
				// Fatal, but no point re-throwing, so just abort.
				aborted = true;
			}
			catch (ShutdownSignalException e) {
				if (RabbitUtils.isNormalShutdown(e)) {
					if (logger.isDebugEnabled()) {
						logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage());
					}
				}
				else {
					this.logConsumerException(e);
				}
			}
			catch (AmqpIOException e) {
				if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException
						&& e.getCause().getCause().getMessage().contains("in exclusive use")) {
					logger.warn(e.getCause().getCause().toString());
				}
				else {
					this.logConsumerException(e);
				}
			}
			catch (Error e) {
				logger.error("Consumer thread error, thread abort.", e);
				aborted = true;
			}
			catch (Throwable t) {
				this.logConsumerException(t);
			}
			finally {
				if (SimpleMessageListenerContainer.this.transactionManager != null) {
					ConsumerChannelRegistry.unRegisterConsumerChannel();
				}
			}

			// In all cases count down to allow container to progress beyond startup
			start.countDown();

			if (!isActive(consumer) || aborted) {
				logger.debug("Cancelling " + this.consumer);
				try {
					this.consumer.stop();
					synchronized (consumersMonitor) {
						if (SimpleMessageListenerContainer.this.consumers != null) {
							SimpleMessageListenerContainer.this.consumers.remove(this.consumer);
						}
					}
				}
				catch (AmqpException e) {
					logger.info("Could not cancel message consumer", e);
				}
				if (aborted) {
					logger.error("Stopping container from aborted consumer");
					stop();
				}
			}
			else {
				logger.info("Restarting " + this.consumer);
				restart(this.consumer);
			}

3,run方法调用类BlockingQueueConsumer的start方法,BlockingQueueConsumer的属性如下

	private final BlockingQueue<Delivery> queue;

	// When this is non-null the connection has been closed (should never happen in normal operation).
	private volatile ShutdownSignalException shutdown;

	private final String[] queues;

	private final int prefetchCount;

	private final boolean transactional;

	private Channel channel;

	private RabbitResourceHolder resourceHolder;

	private InternalConsumer consumer;

	private final AtomicBoolean cancelled = new AtomicBoolean(false);

	private volatile long shutdownTimeout;

	private final AtomicBoolean cancelReceived = new AtomicBoolean(false);

	private final AcknowledgeMode acknowledgeMode;

	private final ConnectionFactory connectionFactory;

	private final MessagePropertiesConverter messagePropertiesConverter;

	private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter;

	private final Map<String, Object> consumerArgs = new HashMap<String, Object>();

	private final boolean exclusive;

	private final Set<Long> deliveryTags = new LinkedHashSet<Long>();

	private final boolean defaultRequeuRejected;

	private final CountDownLatch suspendClientThread = new CountDownLatch(1);

	private final Collection<String> consumerTags = Collections.synchronizedSet(new HashSet<String>());

	private final Set<String> missingQueues = Collections.synchronizedSet(new HashSet<String>());

	private final long retryDeclarationInterval = 60000;

	private long lastRetryDeclaration;

4,调用ConnectionFactoryUtils获取connection

	private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionFactory connectionFactory,
			ResourceFactory resourceFactory) {

		Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
		Assert.notNull(resourceFactory, "ResourceFactory must not be null");

		RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager
				.getResource(connectionFactory);
		if (resourceHolder != null) {
			Channel channel = resourceFactory.getChannel(resourceHolder);
			if (channel != null) {
				return resourceHolder;
			}
		}
		RabbitResourceHolder resourceHolderToUse = resourceHolder;
		if (resourceHolderToUse == null) {
			resourceHolderToUse = new RabbitResourceHolder();
		}
		Connection connection = resourceFactory.getConnection(resourceHolderToUse);
		Channel channel = null;
		try {
			/*
			 * If we are in a listener container, first see if there's a channel registered
			 * for this consumer and the consumer is using the same connection factory.
			 */
			channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory);
			if (channel == null && connection == null) {
				connection = resourceFactory.createConnection();
				resourceHolderToUse.addConnection(connection);
			}
			if (channel == null) {
				channel = resourceFactory.createChannel(connection);
			}
			resourceHolderToUse.addChannel(channel, connection);

			if (resourceHolderToUse != resourceHolder) {
				bindResourceToTransaction(resourceHolderToUse, connectionFactory,
						resourceFactory.isSynchedLocalTransactionAllowed());
			}

			return resourceHolderToUse;

		} catch (IOException ex) {
			RabbitUtils.closeChannel(channel);
			RabbitUtils.closeConnection(connection);
			throw new AmqpIOException(ex);
		}
	}

5,调用类CachingConnectionFactory的createConnection

	@Override
	public final Connection createConnection() throws AmqpException {
		synchronized (this.connectionMonitor) {
			if (this.cacheMode == CacheMode.CHANNEL) {
				if (this.connection == null) {
					this.connection = new ChannelCachingConnectionProxy(super.createBareConnection());
					// invoke the listener *after* this.connection is assigned
					getConnectionListener().onCreate(connection);
				}
				return this.connection;
			}
			else if (this.cacheMode == CacheMode.CONNECTION) {
				ChannelCachingConnectionProxy connection = null;
				while (connection == null && !this.idleConnections.isEmpty()) {
					connection = this.idleConnections.poll();
					if (connection != null) {
						if (!connection.isOpen()) {
							if (logger.isDebugEnabled()) {
								logger.debug("Removing closed connection '" + connection + "'");
							}
							connection.notifyCloseIfNecessary();
							this.openConnections.remove(connection);
							this.openConnectionNonTransactionalChannels.remove(connection);
							this.openConnectionTransactionalChannels.remove(connection);
							connection = null;
						}
					}
				}
				if (connection == null) {
					connection = new ChannelCachingConnectionProxy(super.createBareConnection());
					getConnectionListener().onCreate(connection);
					if (logger.isDebugEnabled()) {
						logger.debug("Adding new connection '" + connection + "'");
					}
					this.openConnections.add(connection);
					this.openConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
					this.openConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>());
				}
				else {
					if (logger.isDebugEnabled()) {
						logger.debug("Obtained connection '" + connection + "' from cache");
					}
				}
				return connection;
			}
		}
		return null;
	}

6,调用类ConnectionFactory的newConnection方法

    public Connection newConnection(ExecutorService executor, Address[] addrs)
        throws IOException
    {
        FrameHandlerFactory fhFactory = createFrameHandlerFactory();
        ConnectionParams params = params(executor);

        if (isAutomaticRecoveryEnabled()) {
            // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs);
            conn.init();
            return conn;
        } else {
            IOException lastException = null;
            for (Address addr : addrs) {
                try {
                    FrameHandler handler = fhFactory.create(addr);
                    AMQConnection conn = new AMQConnection(params, handler);
                    conn.start();
                    return conn;
                } catch (IOException e) {
                    lastException = e;
                }
            }
            throw (lastException != null) ? lastException : new IOException("failed to connect");
        }
    }

7,调用AMQConnection的start方法

 public void start()
        throws IOException
    {
        initializeConsumerWorkService();
        initializeHeartbeatSender();
        this._running = true;
        // Make sure that the first thing we do is to send the header,
        // which should cause any socket errors to show up for us, rather
        // than risking them pop out in the MainLoop
        AMQChannel.SimpleBlockingRpcContinuation connStartBlocker =
            new AMQChannel.SimpleBlockingRpcContinuation();
        // We enqueue an RPC continuation here without sending an RPC
        // request, since the protocol specifies that after sending
        // the version negotiation header, the client (connection
        // initiator) is to wait for a connection.start method to
        // arrive.
        _channel0.enqueueRpc(connStartBlocker);
        try {
            // The following two lines are akin to AMQChannel's
            // transmit() method for this pseudo-RPC.
            _frameHandler.setTimeout(HANDSHAKE_TIMEOUT);
            _frameHandler.sendHeader();
        } catch (IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

        // start the main loop going
        MainLoop loop = new MainLoop();
        final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
        mainLoopThread = Environment.newThread(threadFactory, loop, name);
        mainLoopThread.start();
        // after this point clear-up of MainLoop is triggered by closing the frameHandler.

        AMQP.Connection.Start connStart = null;
        AMQP.Connection.Tune connTune = null;
        try {
            connStart =
                (AMQP.Connection.Start) connStartBlocker.getReply().getMethod();

            _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());

            Version serverVersion =
                new Version(connStart.getVersionMajor(),
                            connStart.getVersionMinor());

            if (!Version.checkVersion(clientVersion, serverVersion)) {
                throw new ProtocolVersionMismatchException(clientVersion,
                                                           serverVersion);
            }

            String[] mechanisms = connStart.getMechanisms().toString().split(" ");
            SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);
            if (sm == null) {
                throw new IOException("No compatible authentication mechanism found - " +
                        "server offered [" + connStart.getMechanisms() + "]");
            }

            LongString challenge = null;
            LongString response = sm.handleChallenge(null, this.username, this.password);

            do {
                Method method = (challenge == null)
                    ? new AMQP.Connection.StartOk.Builder()
                                    .clientProperties(_clientProperties)
                                    .mechanism(sm.getName())
                                    .response(response)
                          .build()
                    : new AMQP.Connection.SecureOk.Builder().response(response).build();

                try {
                    Method serverResponse = _channel0.rpc(method).getMethod();
                    if (serverResponse instanceof AMQP.Connection.Tune) {
                        connTune = (AMQP.Connection.Tune) serverResponse;
                    } else {
                        challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();
                        response = sm.handleChallenge(challenge, this.username, this.password);
                    }
                } catch (ShutdownSignalException e) {
                    Method shutdownMethod = e.getReason();
                    if (shutdownMethod instanceof AMQP.Connection.Close) {
                        AMQP.Connection.Close shutdownClose =  (AMQP.Connection.Close) shutdownMethod;
                        if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {
                            throw new AuthenticationFailureException(shutdownClose.getReplyText());
                        }
                    }
                    throw new PossibleAuthenticationFailureException(e);
                }
            } while (connTune == null);
        } catch (ShutdownSignalException sse) {
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        } catch(IOException ioe) {
            _frameHandler.close();
            throw ioe;
        }

        try {
            int channelMax =
                negotiateChannelMax(this.requestedChannelMax,
                                    connTune.getChannelMax());
            _channelManager = instantiateChannelManager(channelMax, threadFactory);

            int frameMax =
                negotiatedMaxValue(this.requestedFrameMax,
                                   connTune.getFrameMax());
            this._frameMax = frameMax;

            int heartbeat =
                negotiatedMaxValue(this.requestedHeartbeat,
                                   connTune.getHeartbeat());

            setHeartbeat(heartbeat);

            _channel0.transmit(new AMQP.Connection.TuneOk.Builder()
                                .channelMax(channelMax)
                                .frameMax(frameMax)
                                .heartbeat(heartbeat)
                              .build());
            _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder()
                                      .virtualHost(_virtualHost)
                                    .build());
        } catch (IOException ioe) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw ioe;
        } catch (ShutdownSignalException sse) {
            _heartbeatSender.shutdown();
            _frameHandler.close();
            throw AMQChannel.wrap(sse);
        }

        // We can now respond to errors having finished tailoring the connection
        this._inConnectionNegotiation = false;

        return;
    }

8,默认queue以及Channel的值

Queue [name=app_queue, durable=true, autoDelete=false, exclusive=false, arguments=null]
Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)

9,调用RabbitAdmin完成相关通道和绑定等的创建

10,ConnectionFactoryUtils.getTransactionalResourceHolder完成后,开始建立消费者BlockingQueueConsumer的start方法中

		this.consumer = new InternalConsumer(channel);
		this.deliveryTags.clear();
		this.activeObjectCounter.add(this);

11,SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法中调用

while (isActive(this.consumer) || continuable) {
					try {
						// Will come back false when the queue is drained
						continuable = receiveAndExecute(this.consumer) && !isChannelTransacted();
						if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
							if (continuable) {
								consecutiveIdles = 0;
								if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
									considerAddingAConsumer();
									consecutiveMessages = 0;
								}
							}
							else {
								consecutiveMessages = 0;
								if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
									considerStoppingAConsumer(this.consumer);
									consecutiveIdles = 0;
								}
							}
						}
					}
					catch (ListenerExecutionFailedException ex) {
						// Continue to process, otherwise re-throw
					}
					catch (AmqpRejectAndDontRequeueException rejectEx) {
						/*
						 *  These will normally be wrapped by an LEFE if thrown by the
						 *  listener, but we will also honor it if thrown by an
						 *  error handler.
						 */
					}
				}

12,后续调用业务onmessage见下图

注意:消费者默认属性

acknowledgeMode  prefetchCount=1

AcknowledgeMode AutoAck

时间: 2024-10-29 10:48:14

spring的RabbitTemplate 接收Message源码导读的相关文章

spring的RabbitTemplate 发送Message源码导读

1,首先业务方法调用RabbitTemplate的convertAndSend方法:(RabbitTemplate继承RabbitAccessor,实现了RabbitOperations和MessageListener接口 @Override public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException { convertAndSend(exchan

spring 事件模式 源码导读

一,jdk 事件对象基类 package java.util; import java.io.Serializable; public class EventObject implements Serializable { protected transient Object source; public Object getSource() { return this.source; } public EventObject(Object paramObject) { if (paramObj

Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏玄机

文章目录 1. 开箱即用,内藏玄机 2. 总结 3. 源代码 Spring Boot提供了很多"开箱即用"的依赖模块,那么,Spring Boot 如何巧妙的做到开箱即用,自动配置的呢? 开箱即用,内藏玄机 Spring Boot提供了很多"开箱即用"的依赖模块,都是以spring-boot-starter-xx作为命名的.例如,之前提到的 spring-boot-starter-redis.spring-boot-starter-data-mongodb.spri

Spring Boot 揭秘与实战 源码分析 - 工作原理剖析

文章目录 1. EnableAutoConfiguration 帮助我们做了什么 2. 配置参数类 – FreeMarkerProperties 3. 自动配置类 – FreeMarkerAutoConfiguration4. 扩展阅读 3.1. 核心注解 3.2. 注入 Bean 结合<Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏玄机>一文,我们再来深入的理解 Spring Boot 的工作原理. 在<Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏

Spring Cloud Eureka服务注册源码分析

Eureka是怎么work的 那eureka client如何将本地服务的注册信息发送到远端的注册服务器eureka server上.通过下面的源码分析,看出Eureka Client的定时任务调用Eureka Server的Reset接口,而Eureka接收到调用请求后会处理服务的注册以及Eureka Server中的数据同步的问题. 服务注册 源码分析,看出服务注册可以认为是Eureka client自己完成,不需要服务本身来关心. Eureka Client的定时任务调用Eureka Se

做一个合格的程序猿之浅析Spring AOP源码(十八) Spring AOP开发大作战源码解析

其实上一篇文章价值很小,也有重复造轮子的嫌疑,网上AOP的实例很多,不胜枚举,其实我要说的并不是这个,我想要说的就是上一节中spring的配置文件: 我们这边并没有用到我们上几节分析的哪几个AOP的主要实现类:ProxyFactoryBean.java , ProxyFactory.java ,AspectJProxyFactory.java ,在我们这个配置文件中,根本没有显示的去配置这些类,那么spring到底是怎么做到的呢? 大家可以这么想,spring到底是怎么去杀害目标对象的呢?真正的

【Spring】Spring&amp;WEB整合原理及源码分析

表现层和业务层整合: 1. Jsp/Servlet整合Spring: 2. Spring MVC整合SPring: 3. Struts2整合Spring: 本文主要介绍Jsp/Servlet整合Spring原理及源码分析. 一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了

分布式内存网格Hazelcast源码导读

去年项目需要看了hazelcast源码,当时记录的笔记. Node是节点的抽象,里面包含节点引擎.客户端引擎.分区服务.集群服务.组播服务.连接管理.命令管理.组播属性.节点配置.本地成员.tcp地址.组播地址.连接者.节点初始化器.管理中心.安全上下文. Config类,包含GroupConfig.NetworkConfig.MapConfig.TopicConfig.QueueConfig.MultiMapConfig.ListConfig.SetConfig.ExecutorConfig.

【SSH进阶之路】Spring的IOC逐层深入——源码解析之IoC的根本BeanFactory(五)

我们前面的三篇博文,简单易懂的介绍了为什么要使用IOC[实例讲解](二).和Spring的IOC原理[通俗解释](三)以及依赖注入的两种常用实现类型(四),这些都是刚开始学习Spring IoC容器时的基础内容,当然只有有了这些基础,我们才能走到今天更加详细的解析Spring的源码,深入理解IOC. 这篇我先简单的复习一下IoC,然后根据实例介绍IoC最基本的原理.废话少说,下面我们开始这篇博文的话题: 什么是IoC IoC容器,最主要的就是完成对象的创建以及维护对象的依赖关系等. 所谓控制反转