hornetq 入门(1)

Hornetq 版本2.4.0final  需要JDK7及以上

Hornetq官网

Hornetq2.1中文手册

step1.启动服务端

  1.1准备配置文件(配置说明参考官网手册)

  hornetq-configuration.xml

  


<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-configuration.xsd">
<!--
<broadcast-groups> <broadcast-group name="my-broadcast-group">
<local-bind-address>192.168.0.215</local-bind-address>
<local-bind-port>11212</local-bind-port>
<group-address>255.255.255.0</group-address>
<group-port>9876</group-port>
<broadcast-period>2000</broadcast-period> </broadcast-group>
</broadcast-groups>
-->
<name>HornetQ.main.config</name>

<bindings-directory>F:/hornetq/data/messaging/bindings</bindings-directory>

<large-messages-directory>F:/hornetq/data/messaging/largemessages</large-messages-directory>

<paging-directory>F:/hornetq/data/messaging/paging</paging-directory>

<!--离线消息固化到文件-->
<journal-directory>F:/hornetq/journal</journal-directory>
<journal-min-files>10</journal-min-files>
<!-- 缓存大小 -->
<id-cache-size>9000</id-cache-size>
<jmx-management-enabled>true</jmx-management-enabled>
<!-- 消息计数器 -->
<message-counter-enabled>true</message-counter-enabled>
<!-- keep history for a week -->
<message-counter-max-day-history>7</message-counter-max-day-history>
<!-- sample the queues every minute (60000ms) -->
<message-counter-sample-period>60000</message-counter-sample-period>
<persistence-enabled>true</persistence-enabled>
<!-- 通知将从JMS话题 "notificationsTopic"上接收
<management-notification-address>jms.queue.notificationsQueue</management-notification-address>
-->
   <!--不配置这个的话会有一个安全警告-->
<cluster-user>HORNETQ.CLUSTER.ADMIN.USER</cluster-user>
<cluster-password>test65525</cluster-password>

<!-- Connectors -->
<connectors>
<connector name="connector-netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory
</factory-class>
<param key="use-nio" value="true" />
<param key="host" value="localhost"/>
<param key="port" value="11212" />
</connector>

<!-- SSL connector -->
<connector name="netty-ssl-connector">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyConnectorFactory</factory-class>
<param key="host" value="localhost"/>
<param key="port" value="5500"/>
<param key="ssl-enabled" value="true"/>
<param key="key-store-path" value="F:/ssl/keystore"/>
<param key="key-store-password" value="test"/>
</connector>
</connectors>

<!-- Acceptors -->
<acceptors>
<acceptor name="netty">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory
</factory-class>
<!-- -->
<param key="use-nio" value="true" />
<param key="host" value="0.0.0.0,127.0.0.1,localhost"></param>
<param key="port" value="11212" />
</acceptor>

<!-- SSL connector -->
<acceptor name="netty-ssl-acceptor">
<factory-class>org.hornetq.core.remoting.impl.netty.NettyAcceptorFactory</factory-class>
<param key="host" value="localhost"/>
<param key="port" value="5500"/>
<param key="ssl-enabled" value="true"/>
<param key="key-store-path" value="F:/ssl/keystore"/>
<param key="key-store-password" value="test"/>
<param key="trust-store-path" value="F:/ssl/truststore"/>
<param key="trust-store-password" value="test"/>
</acceptor>
</acceptors>

<!-- Other config -->
<address-settings>
<address-setting match="jms.queue.#">
<redelivery-delay>5000</redelivery-delay>
<!-- 没有导致远程queue查找不到 -->
<expiry-address>jms.queue.expiryQueue</expiry-address>
<!-- 没有导致远程queue查找不到 -->
<last-value-queue>true</last-value-queue>
<max-size-bytes>100000</max-size-bytes>
<page-size-bytes>20000</page-size-bytes>
<redistribution-delay>0</redistribution-delay>
<address-full-policy>PAGE</address-full-policy>
<!-- 死信地址-->
<send-to-dla-on-no-route>true</send-to-dla-on-no-route>
<dead-letter-address>jms.queue.deadLetterQueue</dead-letter-address>
<max-delivery-attempts>3</max-delivery-attempts>
</address-setting>
</address-settings>

<security-settings>
<!--security for example queue-->
<security-setting match="jms.queue.#">
<permission type="createDurableQueue" roles="guest" />
<permission type="deleteDurableQueue" roles="guest" />
<permission type="createNonDurableQueue" roles="guest" />
<permission type="deleteNonDurableQueue" roles="guest" />
<permission type="consume" roles="guest" />
<permission type="send" roles="guest" />
</security-setting>
</security-settings>

</configuration>

hornetq-jms.xml


<configuration xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:hornetq /schema/hornetq-jms.xsd">

<connection-factory name="ConnectionFactory">
<connectors>
<!--对应hornetq-configuration.xml 里面的connectors-->
<connector-ref connector-name="connector-netty" />
</connectors>
<entries>
<entry name="ConnectionFactory" />
<entry name="/ConnectionFactory" />
<entry name="XAConnectionFactory" />
<entry name="/XAConnectionFactory" />
<entry name="java:/ConnectionFactory"/>
<entry name="java:/XAConnectionFactory"/>
</entries>

<retry-interval>1000</retry-interval>
<retry-interval-multiplier>1.5</retry-interval-multiplier>
<max-retry-interval>60000</max-retry-interval>
<reconnect-attempts>1000</reconnect-attempts>
<confirmation-window-size>1048576</confirmation-window-size>
</connection-factory>

<!--jms address-->
<queue name="notificationsQueue">
<entry name="/queue/notificationsQueue"></entry>
</queue>
<queue name="testQueue">
<entry name="/queue/testQueue"/>
<selector string="color=‘red‘"/>
<durable>true</durable>
</queue>
<!-- the dead letter queue where dead messages will be sent-->
<queue name="deadLetterQueue">
<entry name="/queue/deadLetterQueue"/>
</queue>

</configuration>

1.2 启动hornetq服务


public static void startHornetqServer(){
try {
//config hornetq-configuration.xml
FileConfiguration config = new FileConfiguration();
config.start();
//HornetQServer
HornetQServer server=HornetQServers.newHornetQServer(config);
//JNPServer
StandaloneNamingServer standalone=new StandaloneNamingServer(server);
standalone.setBindAddress("0.0.0.0");
standalone.setRmiBindAddress("0.0.0.0");
standalone.start();
//JMSServer hornetq-jms.xml
jmsServer=new JMSServerManagerImpl(server);
jmsServer.start();
//start hornetq core server
server.start();
System.out.println(jmsServer.isStarted());
} catch (Exception e) {
e.printStackTrace();
}
}

step2.发送消息客户端


/**
* @param args
*/
public static void main(String[] args) {
try {
Properties prop = new Properties();
prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");

Context ctx = new InitialContext(prop);
System.out.println("+++++++1111ssssssss");
//查找目标地址
Destination destination = (Destination)ctx.lookup("/queue/notificationsQueue");
System.out.println("+++++++2222"+destination);

//根据上下文查找一个连接工厂 QueueConnectionFactory 。
//该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;
//ConnectionFactory 对应hornetq-jms.xml里面的 connection-factory name="ConnectionFactory"
ConnectionFactory factory = (ConnectionFactory)ctx.lookup("ConnectionFactory");
System.out.println("+++++++3333"+factory);
//从连接工厂得到一个连接 create QueueConnection
Connection conn = factory.createConnection();
System.out.println("+++++++4444"+conn);
conn.start();

//通过连接来建立一个会话(Session);
javax.jms.Session session = conn.createSession(true,Session.AUTO_ACKNOWLEDGE);

//根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)
MessageProducer producer = session.createProducer(destination);
TextMessage msg = session.createTextMessage("ffffffffffffffffffffffffffffffffffffffffffffff小心呈现出");
BytesMessage byteMessage=session.createBytesMessage();
byteMessage.writeBytes("testddddddddd".getBytes("utf-8"));
producer.send(msg);
producer.send(byteMessage);
System.out.println("send over !!!!!");
session.close();
conn.close();
System.out.println("send down===");
} catch (Exception e) {
e.printStackTrace();
}

}

step3.接受消息客户端


public MessageReceive(String ...destinationJNDI){

QueueConnectionFactory factory=(QueueConnectionFactory)getJNDIRemoteObj("ConnectionFactory");
try {
if(factory==null)
return;
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
for (int i = 0; i < destinationJNDI.length; i++) {
destination = (Queue) getJNDIRemoteObj(destinationJNDI[i]);
if(destination==null)
continue;
producer = session.createConsumer(destination);
//接受消息
producer.setMessageListener(new ReceiveMessage());
}
} catch (JMSException e) {
e.printStackTrace();
}
}


public static Object getJNDIRemoteObj(String jndiName) {
try {
Properties prop = new Properties();
prop.setProperty("java.naming.factory.initial","org.jnp.interfaces.NamingContextFactory");
prop.setProperty("java.naming.provider.url", "jnp://127.0.0.1:1099");
prop.setProperty("java.naming.factory.url.pkgs","org.jboss.naming:org.jnp.interfaces");
prop.setProperty(Context.SECURITY_PRINCIPAL,"guest");
prop.setProperty(Context.SECURITY_CREDENTIALS, "guest");
Context context = new InitialContext(prop);
return context.lookup(jndiName);
} catch (NamingException e) {
e.printStackTrace();
}
return null;
}


public class ReceiveMessage implements MessageListener {

@SuppressWarnings("deprecation")
@Override
public void onMessage(Message message) {
System.out.println("Received notification:"+new Date().toLocaleString());
try
{
// Enumeration propertyNames = message.getPropertyNames();
// while (propertyNames.hasMoreElements())
// {
// String propertyName = (String)propertyNames.nextElement();
// System.err.format(" %s: %s\n", propertyName, message.getObjectProperty(propertyName));
// }
HornetQDestination des=(HornetQDestination) message.getJMSDestination();
if(message instanceof TextMessage){
TextMessage mesg=(TextMessage)message;
System.out.println(des.getAddress()+"==received:"+mesg.getText());
}else if(message instanceof BytesMessage){
BytesMessage mesg=(BytesMessage)message;
ByteArrayOutputStream out=new ByteArrayOutputStream(((Long)mesg.getBodyLength()).intValue());
try {
byte[] r=new byte[2048];
int i=0;
while((i=mesg.readBytes(r))!=-1)
out.write(r,0,i);
System.out.println(des.getClass()+"==received:"+new String(out.toByteArray(),"utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(message instanceof HornetQObjectMessage){
HornetQObjectMessage object=(HornetQObjectMessage)message;
Object msgObj=object.getObject();
if(msgObj instanceof ErrorMessageBO){
ErrorMessageBO messageBO=(ErrorMessageBO)msgObj;
String msg=messageBO.getMessageContent();
System.err.println("error:==>"+msg);
}
}

}
catch (JMSException e)
{
e.printStackTrace();
}
System.out.println("----------end--------------");

}

hornetq 入门(1)

时间: 2024-10-10 23:46:18

hornetq 入门(1)的相关文章

R语言快速上手入门

R语言快速上手入门 课程学习网址:http://www.xuetuwuyou.com/course/196 课程出自学途无忧网:http://www.xuetuwuyou.com 课程简介 本教程深入浅出地讲解如何使用R语言玩转数据.课程中涵盖R语言编程的方方面面,内容涉及R对象的类型.R的记号体系和环境系统.自定义函数.if else语句.for循环.S3类R的包系统以及调试工具等.本课程还通过示例演示如何进行向量化编程,从而对代码进行提速并尽可能地发挥R的潜能.本课程适合立志成为数据科学家的

笔记:Spring Cloud Zuul 快速入门

Spring Cloud Zuul 实现了路由规则与实例的维护问题,通过 Spring Cloud Eureka 进行整合,将自身注册为 Eureka 服务治理下的应用,同时从 Eureka 中获取了所有其他微服务的实例信息,这样的设计非常巧妙的将服务治理体系中维护的实例信息利用起来,使得维护服务实例的工作交给了服务治理框架自动完成,而对路由规则的维护,默认会将通过以服务名作为 ContextPath 的方式来创建路由映射,也可以做一些特别的配置,对于签名校验.登录校验等在微服务架构中的冗余问题

linux入门基础知识及简单命令介绍

linux入门基础知识介绍 1.计算机硬件组成介绍 计算机主要由cpu(运算器.控制器),内存,I/O,外部存储等构成. cpu主要是用来对二进制数据进行运算操作,它从内存中取出数据,然后进行相应的运算操作.不能从硬盘中直接取数据. 内存从外部存储中取出数据供cpu运存.内存的最小单位是字节(byte) 备注:由于32的cpu逻辑寻址能力最大为32内存单元.因此32位cpu可以访问的最大内存空间为:4GB,算法如下: 2^32=2^10*2^10*2^10*2^2 =1024*1024*1024

JAVA通信系列二:mina入门总结

一.学习资料 Mina入门实例(一) http://www.cnblogs.com/juepei/p/3939119.html Mina入门教程(二)----Spring4 集成Mina http://www.cnblogs.com/juepei/p/3940396.html Apache Mina 入门实例--创建一个MINA时间服务http://loftor.com/archives/apache-mina-quick-start-guide.html MINA2.0用户手册中文版--系列文

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

浅谈Ubuntu PowerShell——小白入门教程

早在去年八月份PowerShell就开始开源跨平台了,但是一直没有去尝试,叫做PowerShell Core. 这里打算简单介绍一下如何安装和简单使用,为还不知道PowerShell Core on Ubuntu的同学们提供一点小小的入门帮助,谢谢大家支持~ PowerShell Core是由Microsoft开发的运行在.Net Core上的开源跨平台的任务自动化和配置管理系统. 1.   在Ubuntu 16.04上安装PowerShell Core a)         导入公共存储库GP

2.vue.js 入门环境搭建

原文链接:http://blog.csdn.net/luckylqh/article/details/52863026?locationNum=2&fps=1 vue这个新的工具,确实能够提高效率,在经历的一段时间的摧残之后,终于能够有一个系统的认识了,下面就今天的收获做一个总结,也是vue入门的精髓: 1.要使用vue来开发前端框架,首先要有环境,这个环境要借助于node,所以要先安装node,借助于node里面的npm来安装需要的依赖等等. 这里有一个小技巧:如果在cmd中直接使用npm来安

学习mysql的笔记:mysql十大基本入门语句

学习mysql数据库,从最简单的十条入门语句开始.刚开始学习mysql,老师让我们用cmd控制台输入语句,而不是选择用工具输入,这是为了我们能够先熟悉mysql语句. 首先要先进入mysql,语句为:mysql -hlocalhost -uroot -p  然后回车,输入密码. C:\Users\Administrator>mysql -hlocalhost -uroot -p Enter password: ****** 成功进入的话会出现 Welcome to the MySQL monit

Java - Java入门(2-1am)

第一讲.Java入门 1. 计算机语言是人和计算机进行交互的一种工具,人们通过使用计算机语言编写程序来向计算机施令,计算机则执行程序,并把结果输出给用户. 2. 机器语言:由0.1序列构成的指令码组成 如:10000000  加        10010000减          11110000 打印 3. 汇编语言:用助记符号描述的指令系统 如: Mov   A, #10; Mov   B, #20; Add   A,  B; 4. 高级语言: 面向过程的高级语言:程序设计的基本单元为函数