1,首先调用类SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法。内部类的主要属性如下
private final BlockingQueueConsumer consumer; private final CountDownLatch start; private volatile FatalListenerStartupException startupException;
2,内部类的run方法如下
boolean aborted = false; int consecutiveIdles = 0; int consecutiveMessages = 0; try { try { SimpleMessageListenerContainer.this.redeclareElementsIfNecessary(); this.consumer.start(); this.start.countDown(); } catch (QueuesNotAvailableException e) { if (SimpleMessageListenerContainer.this.missingQueuesFatal) { throw e; } else { this.start.countDown(); handleStartupFailure(e); throw e; } } catch (FatalListenerStartupException ex) { throw ex; } catch (Throwable t) { this.start.countDown(); handleStartupFailure(t); throw t; } if (SimpleMessageListenerContainer.this.transactionManager != null) { /* * Register the consumer's channel so it will be used by the transaction manager * if it's an instance of RabbitTransactionManager. */ ConsumerChannelRegistry.registerConsumerChannel(consumer.getChannel(), getConnectionFactory()); } // Always better to stop receiving as soon as possible if // transactional boolean continuable = false; while (isActive(this.consumer) || continuable) { try { // Will come back false when the queue is drained continuable = receiveAndExecute(this.consumer) && !isChannelTransacted(); if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { if (continuable) { consecutiveIdles = 0; if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { considerAddingAConsumer(); consecutiveMessages = 0; } } else { consecutiveMessages = 0; if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); consecutiveIdles = 0; } } } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw } catch (AmqpRejectAndDontRequeueException rejectEx) { /* * These will normally be wrapped by an LEFE if thrown by the * listener, but we will also honor it if thrown by an * error handler. */ } } } catch (InterruptedException e) { logger.debug("Consumer thread interrupted, processing stopped."); Thread.currentThread().interrupt(); aborted = true; } catch (QueuesNotAvailableException ex) { if (SimpleMessageListenerContainer.this.missingQueuesFatal) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; } } catch (FatalListenerStartupException ex) { logger.error("Consumer received fatal exception on startup", ex); this.startupException = ex; // Fatal, but no point re-throwing, so just abort. aborted = true; } catch (FatalListenerExecutionException ex) { logger.error("Consumer received fatal exception during processing", ex); // Fatal, but no point re-throwing, so just abort. aborted = true; } catch (ShutdownSignalException e) { if (RabbitUtils.isNormalShutdown(e)) { if (logger.isDebugEnabled()) { logger.debug("Consumer received Shutdown Signal, processing stopped: " + e.getMessage()); } } else { this.logConsumerException(e); } } catch (AmqpIOException e) { if (e.getCause() instanceof IOException && e.getCause().getCause() instanceof ShutdownSignalException && e.getCause().getCause().getMessage().contains("in exclusive use")) { logger.warn(e.getCause().getCause().toString()); } else { this.logConsumerException(e); } } catch (Error e) { logger.error("Consumer thread error, thread abort.", e); aborted = true; } catch (Throwable t) { this.logConsumerException(t); } finally { if (SimpleMessageListenerContainer.this.transactionManager != null) { ConsumerChannelRegistry.unRegisterConsumerChannel(); } } // In all cases count down to allow container to progress beyond startup start.countDown(); if (!isActive(consumer) || aborted) { logger.debug("Cancelling " + this.consumer); try { this.consumer.stop(); synchronized (consumersMonitor) { if (SimpleMessageListenerContainer.this.consumers != null) { SimpleMessageListenerContainer.this.consumers.remove(this.consumer); } } } catch (AmqpException e) { logger.info("Could not cancel message consumer", e); } if (aborted) { logger.error("Stopping container from aborted consumer"); stop(); } } else { logger.info("Restarting " + this.consumer); restart(this.consumer); }
3,run方法调用类BlockingQueueConsumer的start方法,BlockingQueueConsumer的属性如下
private final BlockingQueue<Delivery> queue; // When this is non-null the connection has been closed (should never happen in normal operation). private volatile ShutdownSignalException shutdown; private final String[] queues; private final int prefetchCount; private final boolean transactional; private Channel channel; private RabbitResourceHolder resourceHolder; private InternalConsumer consumer; private final AtomicBoolean cancelled = new AtomicBoolean(false); private volatile long shutdownTimeout; private final AtomicBoolean cancelReceived = new AtomicBoolean(false); private final AcknowledgeMode acknowledgeMode; private final ConnectionFactory connectionFactory; private final MessagePropertiesConverter messagePropertiesConverter; private final ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter; private final Map<String, Object> consumerArgs = new HashMap<String, Object>(); private final boolean exclusive; private final Set<Long> deliveryTags = new LinkedHashSet<Long>(); private final boolean defaultRequeuRejected; private final CountDownLatch suspendClientThread = new CountDownLatch(1); private final Collection<String> consumerTags = Collections.synchronizedSet(new HashSet<String>()); private final Set<String> missingQueues = Collections.synchronizedSet(new HashSet<String>()); private final long retryDeclarationInterval = 60000; private long lastRetryDeclaration;
4,调用ConnectionFactoryUtils获取connection
private static RabbitResourceHolder doGetTransactionalResourceHolder(ConnectionFactory connectionFactory, ResourceFactory resourceFactory) { Assert.notNull(connectionFactory, "ConnectionFactory must not be null"); Assert.notNull(resourceFactory, "ResourceFactory must not be null"); RabbitResourceHolder resourceHolder = (RabbitResourceHolder) TransactionSynchronizationManager .getResource(connectionFactory); if (resourceHolder != null) { Channel channel = resourceFactory.getChannel(resourceHolder); if (channel != null) { return resourceHolder; } } RabbitResourceHolder resourceHolderToUse = resourceHolder; if (resourceHolderToUse == null) { resourceHolderToUse = new RabbitResourceHolder(); } Connection connection = resourceFactory.getConnection(resourceHolderToUse); Channel channel = null; try { /* * If we are in a listener container, first see if there's a channel registered * for this consumer and the consumer is using the same connection factory. */ channel = ConsumerChannelRegistry.getConsumerChannel(connectionFactory); if (channel == null && connection == null) { connection = resourceFactory.createConnection(); resourceHolderToUse.addConnection(connection); } if (channel == null) { channel = resourceFactory.createChannel(connection); } resourceHolderToUse.addChannel(channel, connection); if (resourceHolderToUse != resourceHolder) { bindResourceToTransaction(resourceHolderToUse, connectionFactory, resourceFactory.isSynchedLocalTransactionAllowed()); } return resourceHolderToUse; } catch (IOException ex) { RabbitUtils.closeChannel(channel); RabbitUtils.closeConnection(connection); throw new AmqpIOException(ex); } }
5,调用类CachingConnectionFactory的createConnection
@Override public final Connection createConnection() throws AmqpException { synchronized (this.connectionMonitor) { if (this.cacheMode == CacheMode.CHANNEL) { if (this.connection == null) { this.connection = new ChannelCachingConnectionProxy(super.createBareConnection()); // invoke the listener *after* this.connection is assigned getConnectionListener().onCreate(connection); } return this.connection; } else if (this.cacheMode == CacheMode.CONNECTION) { ChannelCachingConnectionProxy connection = null; while (connection == null && !this.idleConnections.isEmpty()) { connection = this.idleConnections.poll(); if (connection != null) { if (!connection.isOpen()) { if (logger.isDebugEnabled()) { logger.debug("Removing closed connection '" + connection + "'"); } connection.notifyCloseIfNecessary(); this.openConnections.remove(connection); this.openConnectionNonTransactionalChannels.remove(connection); this.openConnectionTransactionalChannels.remove(connection); connection = null; } } } if (connection == null) { connection = new ChannelCachingConnectionProxy(super.createBareConnection()); getConnectionListener().onCreate(connection); if (logger.isDebugEnabled()) { logger.debug("Adding new connection '" + connection + "'"); } this.openConnections.add(connection); this.openConnectionNonTransactionalChannels.put(connection, new LinkedList<ChannelProxy>()); this.openConnectionTransactionalChannels.put(connection, new LinkedList<ChannelProxy>()); } else { if (logger.isDebugEnabled()) { logger.debug("Obtained connection '" + connection + "' from cache"); } } return connection; } } return null; }
6,调用类ConnectionFactory的newConnection方法
public Connection newConnection(ExecutorService executor, Address[] addrs) throws IOException { FrameHandlerFactory fhFactory = createFrameHandlerFactory(); ConnectionParams params = params(executor); if (isAutomaticRecoveryEnabled()) { // see com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addrs); conn.init(); return conn; } else { IOException lastException = null; for (Address addr : addrs) { try { FrameHandler handler = fhFactory.create(addr); AMQConnection conn = new AMQConnection(params, handler); conn.start(); return conn; } catch (IOException e) { lastException = e; } } throw (lastException != null) ? lastException : new IOException("failed to connect"); } }
7,调用AMQConnection的start方法
public void start() throws IOException { initializeConsumerWorkService(); initializeHeartbeatSender(); this._running = true; // Make sure that the first thing we do is to send the header, // which should cause any socket errors to show up for us, rather // than risking them pop out in the MainLoop AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation(); // We enqueue an RPC continuation here without sending an RPC // request, since the protocol specifies that after sending // the version negotiation header, the client (connection // initiator) is to wait for a connection.start method to // arrive. _channel0.enqueueRpc(connStartBlocker); try { // The following two lines are akin to AMQChannel's // transmit() method for this pseudo-RPC. _frameHandler.setTimeout(HANDSHAKE_TIMEOUT); _frameHandler.sendHeader(); } catch (IOException ioe) { _frameHandler.close(); throw ioe; } // start the main loop going MainLoop loop = new MainLoop(); final String name = "AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); // after this point clear-up of MainLoop is triggered by closing the frameHandler. AMQP.Connection.Start connStart = null; AMQP.Connection.Tune connTune = null; try { connStart = (AMQP.Connection.Start) connStartBlocker.getReply().getMethod(); _serverProperties = Collections.unmodifiableMap(connStart.getServerProperties()); Version serverVersion = new Version(connStart.getVersionMajor(), connStart.getVersionMinor()); if (!Version.checkVersion(clientVersion, serverVersion)) { throw new ProtocolVersionMismatchException(clientVersion, serverVersion); } String[] mechanisms = connStart.getMechanisms().toString().split(" "); SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms); if (sm == null) { throw new IOException("No compatible authentication mechanism found - " + "server offered [" + connStart.getMechanisms() + "]"); } LongString challenge = null; LongString response = sm.handleChallenge(null, this.username, this.password); do { Method method = (challenge == null) ? new AMQP.Connection.StartOk.Builder() .clientProperties(_clientProperties) .mechanism(sm.getName()) .response(response) .build() : new AMQP.Connection.SecureOk.Builder().response(response).build(); try { Method serverResponse = _channel0.rpc(method).getMethod(); if (serverResponse instanceof AMQP.Connection.Tune) { connTune = (AMQP.Connection.Tune) serverResponse; } else { challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge(); response = sm.handleChallenge(challenge, this.username, this.password); } } catch (ShutdownSignalException e) { Method shutdownMethod = e.getReason(); if (shutdownMethod instanceof AMQP.Connection.Close) { AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod; if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) { throw new AuthenticationFailureException(shutdownClose.getReplyText()); } } throw new PossibleAuthenticationFailureException(e); } } while (connTune == null); } catch (ShutdownSignalException sse) { _frameHandler.close(); throw AMQChannel.wrap(sse); } catch(IOException ioe) { _frameHandler.close(); throw ioe; } try { int channelMax = negotiateChannelMax(this.requestedChannelMax, connTune.getChannelMax()); _channelManager = instantiateChannelManager(channelMax, threadFactory); int frameMax = negotiatedMaxValue(this.requestedFrameMax, connTune.getFrameMax()); this._frameMax = frameMax; int heartbeat = negotiatedMaxValue(this.requestedHeartbeat, connTune.getHeartbeat()); setHeartbeat(heartbeat); _channel0.transmit(new AMQP.Connection.TuneOk.Builder() .channelMax(channelMax) .frameMax(frameMax) .heartbeat(heartbeat) .build()); _channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder() .virtualHost(_virtualHost) .build()); } catch (IOException ioe) { _heartbeatSender.shutdown(); _frameHandler.close(); throw ioe; } catch (ShutdownSignalException sse) { _heartbeatSender.shutdown(); _frameHandler.close(); throw AMQChannel.wrap(sse); } // We can now respond to errors having finished tailoring the connection this._inConnectionNegotiation = false; return; }
8,默认queue以及Channel的值
Queue [name=app_queue, durable=true, autoDelete=false, exclusive=false, arguments=null]
Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1)
9,调用RabbitAdmin完成相关通道和绑定等的创建
10,ConnectionFactoryUtils.getTransactionalResourceHolder完成后,开始建立消费者BlockingQueueConsumer的start方法中
this.consumer = new InternalConsumer(channel); this.deliveryTags.clear(); this.activeObjectCounter.add(this);
11,SimpleMessageListenerContainer的内部类AsyncMessageProcessingConsumer的run方法中调用
while (isActive(this.consumer) || continuable) { try { // Will come back false when the queue is drained continuable = receiveAndExecute(this.consumer) && !isChannelTransacted(); if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) { if (continuable) { consecutiveIdles = 0; if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) { considerAddingAConsumer(); consecutiveMessages = 0; } } else { consecutiveMessages = 0; if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) { considerStoppingAConsumer(this.consumer); consecutiveIdles = 0; } } } } catch (ListenerExecutionFailedException ex) { // Continue to process, otherwise re-throw } catch (AmqpRejectAndDontRequeueException rejectEx) { /* * These will normally be wrapped by an LEFE if thrown by the * listener, but we will also honor it if thrown by an * error handler. */ } }
12,后续调用业务onmessage见下图
注意:消费者默认属性
acknowledgeMode prefetchCount=1
AcknowledgeMode AutoAck
时间: 2024-10-29 10:48:14