spring的RabbitTemplate 发送Message源码导读

1,首先业务方法调用RabbitTemplate的convertAndSend方法:(RabbitTemplate继承RabbitAccessor,实现了RabbitOperations和MessageListener接口

@Override
public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
              convertAndSend(exchange, routingKey, object, (CorrelationData) null);
}

2,convertAndSend调用自己的重载方法:

	public void convertAndSend(String exchange, String routingKey, final Object message,
			final MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException {
		Message messageToSend = convertMessageIfNecessary(message);
		messageToSend = messagePostProcessor.postProcessMessage(messageToSend);
		send(exchange, routingKey, messageToSend, correlationData);
	}

3,convertAndSend调用send方法:

	public void send(final String exchange, final String routingKey,
			final Message message, final CorrelationData correlationData)
			throws AmqpException {
		execute(new ChannelCallback<Object>() {

			@Override
			public Object doInRabbit(Channel channel) throws Exception {
				doSend(channel, exchange, routingKey, message, correlationData);
				return null;
			}
		});
	}

4,send调用excute方法进行消息发送:

	@Override
	public <T> T execute(final ChannelCallback<T> action) {
		if (this.retryTemplate != null) {
			try {
				return this.retryTemplate.execute(new RetryCallback<T, Exception>() {

					@Override
					public T doWithRetry(RetryContext context) throws Exception {
						return RabbitTemplate.this.doExecute(action);
					}

				});
			}
			catch (Exception e) {
				if (e instanceof RuntimeException) {
					throw (RuntimeException) e;
				}
				throw RabbitExceptionTranslator.convertRabbitAccessException(e);
			}
		}
		else {
			return this.doExecute(action);
		}
	}

5,execute方法调用doExecute

	private <T> T doExecute(ChannelCallback<T> action) {
		Assert.notNull(action, "Callback object must not be null");
		RabbitResourceHolder resourceHolder = getTransactionalResourceHolder();
		Channel channel = resourceHolder.getChannel();
		if (this.confirmCallback != null || this.returnCallback != null) {
			addListener(channel);
		}
		try {
			if (logger.isDebugEnabled()) {
				logger.debug("Executing callback on RabbitMQ Channel: " + channel);
			}
			return action.doInRabbit(channel);
		}
		catch (Exception ex) {
			if (isChannelLocallyTransacted(channel)) {
				resourceHolder.rollbackAll();
			}
			throw convertRabbitAccessException(ex);
		}
		finally {
			ConnectionFactoryUtils.releaseResources(resourceHolder);
		}
	}

6,doExcute调用execute方法

	public void send(final String exchange, final String routingKey,
			final Message message, final CorrelationData correlationData)
			throws AmqpException {
		execute(new ChannelCallback<Object>() {

			@Override
			public Object doInRabbit(Channel channel) throws Exception {
				doSend(channel, exchange, routingKey, message, correlationData);
				return null;
			}
		});
	}

7,execute调用doSend方法

	protected void doSend(Channel channel, String exchange, String routingKey, Message message,
			CorrelationData correlationData) throws Exception {
		if (logger.isDebugEnabled()) {
			logger.debug("Publishing message on exchange [" + exchange + "], routingKey = [" + routingKey + "]");
		}

		if (exchange == null) {
			// try to send to configured exchange
			exchange = this.exchange;
		}

		if (routingKey == null) {
			// try to send to configured routing key
			routingKey = this.routingKey;
		}
		if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
			PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
			publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
					new PendingConfirm(correlationData, System.currentTimeMillis()));
		}
		boolean mandatory = this.returnCallback != null && this.mandatory;
		MessageProperties messageProperties = message.getMessageProperties();
		if (mandatory) {
			messageProperties.getHeaders().put(PublisherCallbackChannel.RETURN_CORRELATION, this.uuid);
		}
		BasicProperties convertedMessageProperties = this.messagePropertiesConverter
				.fromMessageProperties(messageProperties, encoding);
		channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
		// Check if commit needed
		if (isChannelLocallyTransacted(channel)) {
			// Transacted channel created by this template -> commit.
			RabbitUtils.commitIfNecessary(channel);
		}
	}

8,调用CachingConnectionFactory的invoke方法

public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
			String methodName = method.getName();
			if (methodName.equals("txSelect") && !this.transactional) {
				throw new UnsupportedOperationException("Cannot start transaction on non-transactional channel");
			}
			if (methodName.equals("equals")) {
				// Only consider equal when proxies are identical.
				return (proxy == args[0]);
			}
			else if (methodName.equals("hashCode")) {
				// Use hashCode of Channel proxy.
				return System.identityHashCode(proxy);
			}
			else if (methodName.equals("toString")) {
				return "Cached Rabbit Channel: " + this.target;
			}
			else if (methodName.equals("close")) {
				// Handle close method: don't pass the call on.
				if (active) {
					synchronized (this.channelList) {
						if (!RabbitUtils.isPhysicalCloseRequired() && this.channelList.size() < getChannelCacheSize()) {
							logicalClose((ChannelProxy) proxy);
							// Remain open in the channel list.
							return null;
						}
					}
				}

				// If we get here, we're supposed to shut down.
				physicalClose();
				return null;
			}
			else if (methodName.equals("getTargetChannel")) {
				// Handle getTargetChannel method: return underlying Channel.
				return this.target;
			}
			else if (methodName.equals("isOpen")) {
				// Handle isOpen method: we are closed if the target is closed
				return this.target != null && this.target.isOpen();
			}
			try {
				if (this.target == null || !this.target.isOpen()) {
					this.target = null;
				}
				synchronized (targetMonitor) {
					if (this.target == null) {
						this.target = createBareChannel(theConnection, transactional);
					}
					return method.invoke(this.target, args);
				}
			}
			catch (InvocationTargetException ex) {
				if (this.target == null || !this.target.isOpen()) {
					// Basic re-connection logic...
					this.target = null;
					if (logger.isDebugEnabled()) {
						logger.debug("Detected closed channel on exception.  Re-initializing: " + target);
					}
					synchronized (targetMonitor) {
						if (this.target == null) {
							this.target = createBareChannel(theConnection, transactional);
						}
					}
				}
				throw ex.getTargetException();
			}
		}

9,调用rabbitmq的ChannelN类的basicPublish方法(继承AMQChannel,实现接口com.rabbitmq.client.Channel)

 public void basicPublish(String exchange, String routingKey,
                             boolean mandatory, boolean immediate,
                             BasicProperties props, byte[] body)
        throws IOException
    {
        if (nextPublishSeqNo > 0) {
            unconfirmedSet.add(getNextPublishSeqNo());
            nextPublishSeqNo++;
        }
        BasicProperties useProps = props;
        if (props == null) {
            useProps = MessageProperties.MINIMAL_BASIC;
        }
        transmit(new AMQCommand(new Basic.Publish.Builder()
                                    .exchange(exchange)
                                    .routingKey(routingKey)
                                    .mandatory(mandatory)
                                    .immediate(immediate)
                                .build(),
                                useProps, body));
    }

10,调用AMQChannel的transmit方法

    public void transmit(AMQCommand c) throws IOException {
        synchronized (_channelMutex) {
            ensureIsOpen();
            quiescingTransmit(c);
        }
    }

11,调用quiescingTransmit方法

    public void quiescingTransmit(AMQCommand c) throws IOException {
        synchronized (_channelMutex) {
            if (c.getMethod().hasContent()) {
                while (_blockContent) {
                    try {
                        _channelMutex.wait();
                    } catch (InterruptedException e) {}

                    // This is to catch a situation when the thread wakes up during
                    // shutdown. Currently, no command that has content is allowed
                    // to send anything in a closing state.
                    ensureIsOpen();
                }
            }
            c.transmit(this);
        }
    }

12,调用AMQCommand的transmit方法

 public void transmit(AMQChannel channel) throws IOException {
        int channelNumber = channel.getChannelNumber();
        AMQConnection connection = channel.getConnection();

        synchronized (assembler) {
            Method m = this.assembler.getMethod();
            connection.writeFrame(m.toFrame(channelNumber));
            if (m.hasContent()) {
                byte[] body = this.assembler.getContentBody();

                connection.writeFrame(this.assembler.getContentHeader()
                        .toFrame(channelNumber, body.length));

                int frameMax = connection.getFrameMax();
                int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
                        - EMPTY_FRAME_SIZE;

                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                    int remaining = body.length - offset;

                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                            : bodyPayloadMax;
                    Frame frame = Frame.fromBodyFragment(channelNumber, body,
                            offset, fragmentLength);
                    connection.writeFrame(frame);
                }
            }
        }

        connection.flush();
    }

13,调用AMQConnection的writeFrame方法送数据

 public void writeFrame(Frame f) throws IOException {
        _frameHandler.writeFrame(f);
        _heartbeatSender.signalActivity();
    }

14,调用SocketFrameHandler的方法writeFrame的方法,完成发送

    /**
     * Public API - writes this Frame to the given DataOutputStream
     */
    public void writeTo(DataOutputStream os) throws IOException {
        os.writeByte(type);
        os.writeShort(channel);
        if (accumulator != null) {
            os.writeInt(accumulator.size());
            accumulator.writeTo(os);
        } else {
            os.writeInt(payload.length);
            os.write(payload);
        }
        os.write(AMQP.FRAME_END);
    }

附图:

时间: 2024-10-22 20:51:05

spring的RabbitTemplate 发送Message源码导读的相关文章

spring的RabbitTemplate 接收Message源码导读

1,首先调用类SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法.内部类的主要属性如下 private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException; 2,内部类的run方法如下

spring 事件模式 源码导读

一,jdk 事件对象基类 package java.util; import java.io.Serializable; public class EventObject implements Serializable { protected transient Object source; public Object getSource() { return this.source; } public EventObject(Object paramObject) { if (paramObj

Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏玄机

文章目录 1. 开箱即用,内藏玄机 2. 总结 3. 源代码 Spring Boot提供了很多"开箱即用"的依赖模块,那么,Spring Boot 如何巧妙的做到开箱即用,自动配置的呢? 开箱即用,内藏玄机 Spring Boot提供了很多"开箱即用"的依赖模块,都是以spring-boot-starter-xx作为命名的.例如,之前提到的 spring-boot-starter-redis.spring-boot-starter-data-mongodb.spri

Spring Boot 揭秘与实战 源码分析 - 工作原理剖析

文章目录 1. EnableAutoConfiguration 帮助我们做了什么 2. 配置参数类 – FreeMarkerProperties 3. 自动配置类 – FreeMarkerAutoConfiguration4. 扩展阅读 3.1. 核心注解 3.2. 注入 Bean 结合<Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏玄机>一文,我们再来深入的理解 Spring Boot 的工作原理. 在<Spring Boot 揭秘与实战 源码分析 - 开箱即用,内藏

Spring Cloud Eureka服务注册源码分析

Eureka是怎么work的 那eureka client如何将本地服务的注册信息发送到远端的注册服务器eureka server上.通过下面的源码分析,看出Eureka Client的定时任务调用Eureka Server的Reset接口,而Eureka接收到调用请求后会处理服务的注册以及Eureka Server中的数据同步的问题. 服务注册 源码分析,看出服务注册可以认为是Eureka client自己完成,不需要服务本身来关心. Eureka Client的定时任务调用Eureka Se

做一个合格的程序猿之浅析Spring AOP源码(十八) Spring AOP开发大作战源码解析

其实上一篇文章价值很小,也有重复造轮子的嫌疑,网上AOP的实例很多,不胜枚举,其实我要说的并不是这个,我想要说的就是上一节中spring的配置文件: 我们这边并没有用到我们上几节分析的哪几个AOP的主要实现类:ProxyFactoryBean.java , ProxyFactory.java ,AspectJProxyFactory.java ,在我们这个配置文件中,根本没有显示的去配置这些类,那么spring到底是怎么做到的呢? 大家可以这么想,spring到底是怎么去杀害目标对象的呢?真正的

【Spring】Spring&amp;WEB整合原理及源码分析

表现层和业务层整合: 1. Jsp/Servlet整合Spring: 2. Spring MVC整合SPring: 3. Struts2整合Spring: 本文主要介绍Jsp/Servlet整合Spring原理及源码分析. 一.整合过程 Spring&WEB整合,主要介绍的是Jsp/Servlet容器和Spring整合的过程,当然,这个过程是Spring MVC或Strugs2整合Spring的基础. Spring和Jsp/Servlet整合操作很简单,使用也很简单,按部就班花不到2分钟就搞定了

分布式内存网格Hazelcast源码导读

去年项目需要看了hazelcast源码,当时记录的笔记. Node是节点的抽象,里面包含节点引擎.客户端引擎.分区服务.集群服务.组播服务.连接管理.命令管理.组播属性.节点配置.本地成员.tcp地址.组播地址.连接者.节点初始化器.管理中心.安全上下文. Config类,包含GroupConfig.NetworkConfig.MapConfig.TopicConfig.QueueConfig.MultiMapConfig.ListConfig.SetConfig.ExecutorConfig.

【SSH进阶之路】Spring的IOC逐层深入——源码解析之IoC的根本BeanFactory(五)

我们前面的三篇博文,简单易懂的介绍了为什么要使用IOC[实例讲解](二).和Spring的IOC原理[通俗解释](三)以及依赖注入的两种常用实现类型(四),这些都是刚开始学习Spring IoC容器时的基础内容,当然只有有了这些基础,我们才能走到今天更加详细的解析Spring的源码,深入理解IOC. 这篇我先简单的复习一下IoC,然后根据实例介绍IoC最基本的原理.废话少说,下面我们开始这篇博文的话题: 什么是IoC IoC容器,最主要的就是完成对象的创建以及维护对象的依赖关系等. 所谓控制反转