rabbitMQ应用,laravel生产广播消息,springboot消费消息

最近做一个新需求,用户发布了动态,前台需要查询,为了用户读取信息响应速度更快(MySQL很难实现或者说实现起来很慢),所以在用户动态发布成功后,利用消息机制异步构建 redis缓存 和 elasticsearch索引 。

开发环境

rabbitMQ服务端,docker安装

拉取rabbit-mq镜像
docker pull hub.c.163.com/library/rabbitmq:3.6.10-management

运行镜像
docker run -d --name rabbitmq --publish 5671:5671  --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672  hub.c.163.com/library/rabbitmq:3.6.10-management

后台地址:
http://192.168.1.8:15672

消息生产端(PHP):

composer 安装 rabbitmq客户端
composer require php-amqplib/php-amqplib

生产广播消息官方demo
https://github.com/php-amqplib/php-amqplib/blob/master/demo/amqp_publisher_fanout.php

应用中代码

<?php
/**
 * User: [email protected]
 * Date: 2018/6/18
 * Time: 下午1:54
 */

namespace App\ThirdParty\Message;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

class AmqpPublisher
{
    public function send($content)
    {
        $exchange = ‘message.fanout.exchange‘;
        // 创建连接
        $connection = new AMQPStreamConnection(
            config(‘app.mq_host‘),
            config(‘app.mq_port‘),
            config(‘app.mq_user‘),
            config(‘app.mq_pass‘),
            config(‘app.mq_vhost‘)
        );
        $channel = $connection->channel();
        /*
            name: $exchange
            type: fanout
            passive: false // don‘t check is an exchange with the same name exists
            durable: false // the exchange won‘t survive server restarts
            auto_delete: true //the exchange will be deleted once the channel is closed.
        */
        $channel->exchange_declare($exchange, ‘fanout‘, false, true, false);
        $messageBody = $content;
        $message = new AMQPMessage($messageBody, array(‘content_type‘ => ‘text/plain‘));
        $channel->basic_publish($message, $exchange);
        // 关闭通道
        $channel->close();
        // 关闭连接
        $connection->close();
    }
}

消息消费端(Java):

引入maven依赖

<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
    

配置广播队列信息

package cn.taxiong.release.config;

import cn.taxiong.release.constant.QueueConstants;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQFanout模式配置
 *
 * @author [email protected]
 * @create 2018-06-18 下午4:04
 **/
@Slf4j
@Configuration
public class RabbitMQFanoutConfig {

    @Bean
    public Queue createFanoutQueueCache() {
        log.info( "创建了FanoutQueue cache 缓存 队列" );
        return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME);
    }

    @Bean
    public Queue createFanoutQueueIndex() {
        log.info( "创建了FanoutQueue index 缓存 队列" );
        return new Queue(QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME);
    }

    @Bean
    public FanoutExchange fanoutExchangeRelease() {
        log.info( "创建了fanoutExchange交换机" );
        return new FanoutExchange( QueueConstants.MESSAGE_FANOUT_EXCHANGE);
    }

    @Bean
    public Binding fanoutExchangeCacheQueueBinding() {
        log.info( "将FanoutQueue cache 队列绑定到交换机fanoutExchange" );
        return BindingBuilder.bind( createFanoutQueueCache() ).to( fanoutExchangeRelease() );
    }

    @Bean
    public Binding fanoutExchangeIndexQueueBinding() {
        log.info( "将FanoutQueue index 队列绑定到交换机fanoutExchange" );
        return BindingBuilder.bind( createFanoutQueueIndex() ).to( fanoutExchangeRelease() );
    }
}

队列常量信息

package cn.taxiong.release.constant;

/**
 * 队列常量
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:02
 **/
public interface QueueConstants {/**
     * 消息交换
     */
    String MESSAGE_FANOUT_EXCHANGE = "message.fanout.exchange";

    /**
     * 发布缓存消息队列名称
     */
    String MESSAGE_QUEUE_RELEASE_CACHE_NAME = "message.release.cache.queue";

    /**
     * 发布索引消息队列名称
     */
    String MESSAGE_QUEUE_RELEASE_INDEX_NAME = "message.release.index.queue";
}

缓存(cache)服务消费消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 消息消费
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:14
 **/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_CACHE_NAME)
public class MessageConsumer {

    @Autowired
    private OperateReleaseService operateReleaseService;

    @RabbitHandler
    public void handler(@Payload String message) {
        // operateReleaseService.storeReleaseRedisCache(message);
        log.info("缓存cache消息消费1:{}", message);
    }
}

索引(index)服务消费消息:

package cn.taxiong.release.message;

import cn.taxiong.release.constant.QueueConstants;
import cn.taxiong.release.service.OperateReleaseService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

/**
 * 消息消费
 *
 * @author [email protected]
 * @create 2018-06-14 下午7:14
 **/
@Slf4j
@Component
@RabbitListener(queues = QueueConstants.MESSAGE_QUEUE_RELEASE_INDEX_NAME)
public class MessageConsumer2 {

    @Autowired
    private OperateReleaseService operateReleaseService;

    @RabbitHandler
    public void handler(@Payload String message) {
        log.info("索引消息 index 消费2:{}", message);
    }
}

原文地址:https://www.cnblogs.com/liugx/p/9196067.html

时间: 2024-08-26 20:48:06

rabbitMQ应用,laravel生产广播消息,springboot消费消息的相关文章

kafka生产、存储、消费消息

Kafka架构组成:由producer(消息生产者).consumer(消息消费者).borker(kafka集群的server,负责处理消息读.写请求,存储消息,在kafka cluster这一层这里,其实里面是有很多个broker).topic(消息队列/分类相当于队列,里面有生产者和消费者模型).zookeeper(元数据信息存在zookeeper中,包括:存储消费偏移量,topic话题信息,partition信息) 这些部分组成. kafka里面的消息是有topic来组织的,简单的我们可

2.RABBITMQ 入门 - WINDOWS - 生产和消费消息 一个完整案例

关于安装和配置,见上一篇 1.RABBITMQ 入门 - WINDOWS - 获取,安装,配置 公司有需求,要求使用winform开发这个东西(消息中间件),另外还要求开发一个日志中间件,但是也是要求做成win form的,这明显不合理,因为之前,服务器上我已经放置了一个  短信的winform的服务.那么到后期的话,登录服务器之后,全是 一个个的窗体挂在那儿,这明显合不合常理,但是领导要求这么玩,也没办法, 因为卧虎要负责的是消费 消息,所以重点说明 消费端 该案例的接收端,源自网上的代码片段

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

1.引入maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>2.在application.yml的配置: spring: rabbitmq: host: 106.52.82.241 port: 5672 username: yang

RabbitMQ消息丢失问题和保证消息可靠性-消费端不丢消息和HA(二)

继续上篇文章解决RabbitMQ消息丢失问题和保证消息可靠性(一) 未完成部分,我们聊聊MQ Server端的高可用和消费端如何保证消息不丢的问题? 回归上篇的内容,我们知道消息从生产端到服务端,为了保证消息不丢,我们必须做哪些事情? 发送端采用Confirm模式,注意Server端没成功通知发送端,需要重发操作需要额外处理 消息的持久化处理 上面两个操作保证消息到服务端不丢,但是非高可用状态,如果节点挂掉,服务暂时不可用,需要重启后,消息恢复,消息不会丢失,因为有磁盘存储. 本文先从消费端讲起

RabbitMQ(python实现)学习之二:Producer发送消息至多个消息队列queue(广播消息)

1.1本部分内容简介 这部分我们将要发送一个消息到多个Consumer,这部分称之为"publish/subscribe" 我们实现的方式就是发送端,发送一个消息,与此同时,多个接收端将同时接收到消息并打印在屏幕上面. 1.2exchange简介 在前面的博文中,我们的讲解是:发送端发送消息至消息队列,接收端从消息队列获取消息.现在我们来介绍一下rabbitmq的完整消息传送模型. >Producer:用来发送消息的应用程序 >queue:用来存储消息的缓存 >Con

RabbitMQ下的生产消费者模式与订阅发布模式

??所谓模式,就是在某种场景下,一类问题及其解决方案的总结归纳.生产消费者模式与订阅发布模式是使用消息中间件时常用的两种模式,用于功能解耦和分布式系统间的消息通信,以下面两种场景为例: 数据接入 ??假设有一个用户行为采集系统,负责从App端采集用户点击行为数据.通常会将数据上报和数据处理分离开,即App端通过REST API上报数据,后端拿到数据后放入队列中就立刻返回,而数据处理则另外使用Worker从队列中取出数据来做,如下图所示. ??这样做的好处有:第一,功能分离,上报的API接口不关心

RabbitMQ消息确认机制—消息发送确认和 消息接收确认

/** * RabbitMQ消息确认机制 * 关于rabbit的生产和消费方的一些实用的操作: * producer的confirm和consumer的ack,这两者使用的模式都是用来保证数据完整性,防止数据丢失 */ /** * producer的confirm模式 * 业务场景描述: * 促销系统在做活动前,需要给用户的手机发送一条活动内容短信希望用户来参加, * 因为用户量有点大,所以通过往短信mq中插入数据方式,让短信服务来消费mq发短信: * 此时插入mq消息的服务为了保证给所有用户发

完了!生产事故!几百万消息在消息队列里积压了几个小时!

作者:中华石杉 来源:https://github.com/doocs/advanced-java/blob/master/docs/high-concurrency/mq-time-delay-and-expired-failure.md 一.面试题 如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,说说怎么解决? 二.面试官心里分析 你看这问法,其实本质针对的场景,都是说,可能你的消费端出了问题,不消费了,或者消费的极其极其慢.接着就坑爹了,可能

kafka centos安装发送消费消息

1. 请先下载安装文件,java环境需提前安装,解压到指定目录:tar -zxvf kafka_2.11-2.3.1.tgz -C /root/soft/ 从官网下载文件,上传到centos虚拟机指定路径下,当前download文件夹下面 解压到指定目录soft文件夹下面: 2. 创建zk日志目录.mkdir zklogs3. 查看是否启动成功,ps -ef | grep zookeeper 4. 启动zookeeper nohup bin/zookeeper-server-start.sh .