<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
最常见的几种消息通信模式主要有发布-订阅、点对点这两种
http://blog.csdn.net/woogeyu/article/details/51119101 集群
http://blog.csdn.net/column/details/rabbitmq.html Python版
http://www.cnblogs.com/LipeiNet/category/896408.html
http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Channel.html api
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现
其他公开标准(如 COBAR的 IIOP ,或者是 SOAP 等)
从Erlang的官网 http://www.erlang.org/download.html 下载最新的erlang安装包,Linux和MacOSX下载的版本是 http://www.erlang.org/download.html
1、安装依赖
# yum install build-essential m4
# yum install openssl
# yum install openssl-devel
# yum install unixODBC
# yum install unixODBC-devel
# yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
# yum install perl
2、配置并安装erlang
# tar -zxvf otp_src_20.1.tar
# cd otp_src_20.1
# ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll
./configure
--prefix=/usr/local/erlang --with-ssl --enable-threads
--enable-smp-support --enable-kernel-poll --enable-hipe --without-javac
./configure --prefix=/usr/local/erlang --enable-all (选这个)
make
make install
3、设置erlang环境变量
打开/etc/profile设置环境变量 查看PATH:echo $PATH 环境变量
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH
4、安装mq
http://www.rabbitmq.com/install-rpm.html
http://www.rabbitmq.com/releases/rabbitmq-server/ 下载rpm包
rabbitmq-server-3.6.14-1.el6.noarch
yum install -y socat(不需要)
ln -s /usr/local/erlang/bin/erl /usr/bin/erl 建立软连接 (不建可能会报错)
rpm -i --nodeps rabbitmq-server-3.6.5-1.noarch
执行结果 warning:rabbitmq-server-3.6.5-1.noarch.rpm: Header V4 RSA/SHA512 Signature, key ID 6026dfca: NOKEY
5、起停mq
/sbin/service rabbitmq-server stop/start/etc.
[[email protected] ~]#service rabbitmq-server start 启动服务
[[email protected] ~]#service rabbitmq-server etc 查看哪些命令可以使用
[[email protected] ~]#service rabbitmq-server stop 停止服务
[[email protected] ~]#service rabbitmq-server status 查看服务状态
启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
查看所有的队列:rabbitmqctl list_queues
清除所有的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app 不同于停止服务
启动应用:rabbitmqctl start_app
6、卸载等命令
#rpm -qa|grep rabbitmq
rabbitmq-server-3.6.5-1.noarch
卸载 mq
#rpm -e --nodeps rabbitmq-server-3.6.5-1.noarch
#rpm -qa|grep erlang
esl-erlang-18.3-1.x86_64
#rpm -e --nodeps esl-erlang-18.3-1.x86_64
7、访问后台
关闭防火墙 service iptables stop
安装启动后其实还不能在其它机器访问, rabbitmq默认的 guest 账号只能在本地机器访问, 如果想在其它机器访问必须配置其它账号
配置管理员账号:
rabbitmqctl add_user admin adminpasspord
rabbitmqctl set_user_tags admin administrator
启动rabbitmq内置web插件, 管理rabbitmq账号等信息
rabbitmq-plugins enable rabbitmq_management
访问 http://192.168.89.131:15672/#/users 为刚建的账号 set permission
http://127.0.0.1:15672
(1)添加用户
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
(2) 删除一个用户
rabbitmqctl delete_user Username
(3) 修改用户的密码
rabbitmqctl change_password Username Newpassword
(4) 查看当前用户列表
rabbitmqctl list_users
4、设置开机启动
chkconfig rabbitmq-server on
5、java客户端amqp-client版本号为3.6.5 与 rabbitmq-server-3.6.5-1.noarch服务版本号必须匹配
8、命令
http://www.linuxidc.com/Linux/2016-10/136493.htm
1、信息分发
向指定队列发送多条信息,多个消费者来获取该队列中信息
channel.basicQos(1);保证一次只分发一个,否则可能有些消费者获取较多消息有的消费者获取不到消息
可以使消费者采用手动答复,保证在一个消费者处理消息失败后(此时不答复)其他消费者还能继续获取并处理该消息
2、交换机
rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息,然后按照规则把它推送到对列中
fanout,表示分发,所有的消费者得到同样的队列信息,发布/订阅,channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
direct,发送信息到不同的路由,消费者根据不同路由获取不同信息
topics,发送信息到不同的路由,消费者根据模糊匹配获取某些路由的信息
1,单任务消息
生产者:根据一定的 QUEUE_NAME 生产单个消息
消费者:根据 QUEUE_NAME 获取消息
2,多任务分发
生产fenfa() 消费fenfa1()fenfa2()
生产者:根据一定的 QUEUE_NAME 生产多个消息
消费者:根据 QUEUE_NAME 获取消息(channel.basicQos(1)一次只分发一个,通过手动回复使多个消费者公平的获取和处理消息)
3,交换机
生产exchange() 消费exchange1()exchange2()
生产者:根据一定的 EXCHANGE_NAME,生产多个消息(fanout表示分发,所有的消费者得到同样的信息)
消费者:根据 EXCHANGE_NAME,每一个交换机都会获取一遍消息
service
package com.xmh.mq; import java.io.IOException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.MessageProperties; public class ServerTest { private static final String EXCHANGE_NAME = "logs"; public final static String QUEUE_NAME="rabbitMQ.test1"; public static void main(String[] args) throws Exception{ } public static void exchange(){ try{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME,"fanout");//fanout表示分发,所有的消费者得到同样的队列信息 //分发信息 for (int i=0;i<5;i++){ String message="Hello World"+i; channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes()); System.out.println("EmitLog Sent ‘" + message + "‘"); } channel.close(); connection.close(); }catch(Exception e){ e.printStackTrace(); } } /** * 多任务分发,两个以上客户端处理消息 */ public static void fenfa(){ try{ ConnectionFactory factory=new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection=factory.newConnection(); Channel channel=connection.createChannel(); channel.queueDeclare(QUEUE_NAME,true,false,true,null); //分发信息 for (int i=0;i<10;i++){ String message="Hello RabbitMQ"+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); System.out.println("NewTask send ‘"+message+"‘"); } channel.close(); connection.close(); }catch(Exception e){ e.printStackTrace(); } } /** * 注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、 * 第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数 注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体 声明队列后mq后台可看到该队列及存放的消息 */ public static void quene(){ try{ //创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ相关信息 factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); // factory.setVirtualHost("/"); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); // 声明一个队列 channel.queueDeclare(QUEUE_NAME, false, false, true, null); String message = "Hello RabbitMQ-1"; //发送消息到队列中 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Sendss +‘" + message + "‘"); //关闭通道和连接 channel.close(); connection.close(); }catch(Exception e){ e.printStackTrace(); } } }
注1:queueDeclare第一个参数表示队列名称、第二个参数为是否持久化(true表示是,队列将在服务器重启时生存)、第三个参数为是否是独占队列(创建者可以使用的私有队列,断开后自动删除)、第四个参数为当所有消费者客户端连接断开时是否自动删除队列、第五个参数为队列的其他参数
注2:basicPublish第一个参数为交换机名称、第二个参数为队列映射的路由key、第三个参数为消息的其他属性、第四个参数为发送信息的主体
package com.xmh.mq; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.QueueingConsumer; public class ConsumerTest { private final static String QUEUE_NAME = "rabbitMQ.test1"; private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { test_exchange(); test_exchange2(); } /** * 必须先订阅(先启动客户端,才能收到服务端消息) 发布/订阅 * rabbitMQ其实真正的思想是生产者不发送任何信息到队列,甚至不知道信息将发送到哪个队列。相反生产者只能发送信息到交换机,交换机接收到生产者的信息, * 然后按照规则把它推送到对列中,交换机发布/订阅 Fanout扇形交换机 其他有(Direct直连交换机、Topic主题交换机) */ public static void exchange1(){ try{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定 System.out.println("ReceiveLogs1 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs1 Received ‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer);//队列会自动删除 }catch(Exception e){ e.printStackTrace(); } } public static void exchange2(){ try{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //产生一个随机的队列名称 String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, "");//对队列进行绑定 System.out.println("ReceiveLogs2 Waiting for messages"); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("ReceiveLogs2 Received ‘" + message + "‘"); } }; channel.basicConsume(queueName, true, consumer);//队列会自动删除 }catch(Exception e){ e.printStackTrace(); } } /** * 对于分发信息的处理 * 注:channel.basicQos(1);保证一次只分发一个 。autoAck是否自动回复,如果为true的话,每次生产者只要发送信息就会从内存中删除,那么如果消费者程序异常退出, * 那么就无法获取数据,我们当然是不希望出现这样的情况,所以才去手动回复,每当消费者收到并处理信息然后在通知生成者。最后从队列中删除这条信息。如果消费者异常退出, * 如果还有其他消费者,那么就会把队列中的消息发送给其他消费者,如果没有,等消费者启动时候再次发送。 */ public static void fenfa1(){ try{ final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, true, null); System.out.println("Worker1 Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //envelope - packaging data for the message body - the message body (opaque, client-specific byte array) String message = new String(body, "UTF-8"); System.out.println("Worker1 Received ‘" + message + "‘"); try { doWork(message); channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages. .getDeliveryTag() 消息传递标记 System.out.println("Worker1 Done"); }catch (Exception e){ channel.abort();//终止渠道 }finally { } } }; boolean autoAck=false;//手动回复 //消息消费完成确认 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e){ e.printStackTrace(); } } //对于分发信息的处理 public static void fenfa2(){ try{ final ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.89.131"); factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, true, null); System.out.println("Worker2 Waiting for messages"); //每次从队列获取的数量 channel.basicQos(1); final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { //envelope - packaging data for the message body - the message body (opaque, client-specific byte array) String message = new String(body, "UTF-8"); System.out.println("Worker2 Received ‘" + message + "‘"); try { doWork(message); //Integer.parseInt("s"); channel.basicAck(envelope.getDeliveryTag(),false);//Acknowledge one or several received messages. .getDeliveryTag() 消息传递标记 System.out.println("Worker2 Done"); }catch (Exception e){ channel.abort();//终止渠道,另外一个客户端会继续获取消息 System.out.println("Worker2 客户端处理异常"); }finally { } } }; boolean autoAck=false; //消息消费完成确认 channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (Exception e){ e.printStackTrace(); } } private static void doWork(String task) { try { Thread.sleep(1000); // 暂停1秒钟 } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } /** * 手动关掉客户端后,该队列会自动删除(声明队列时设置) */ public static void test2(){ try{ // 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置RabbitMQ地址 factory.setHost("192.168.89.131");// 192.168.89.131 :15672 factory.setUsername("admin"); factory.setPassword("admin"); factory.setPort(5672); //创建一个新的连接 Connection connection = factory.newConnection(); //创建一个通道 Channel channel = connection.createChannel(); //声明要关注的队列 channel.queueDeclare(QUEUE_NAME, false, false, true, null); System.out.println("Customer Waiting Received messages"); //DefaultConsumer类实现了Consumer接口,通过传入一个频道, // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery,持续监听 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("Customer Received ‘" + message + "‘"); } }; //自动回复队列应答 -- RabbitMQ中的消息确认机制 channel.basicConsume(QUEUE_NAME, true, consumer); } catch (Exception e){ e.printStackTrace(); } } }
原文地址:https://www.cnblogs.com/xingminghui/p/8650394.html