首先我们需要下载 ActiveMQ:http://activemq.apache.org/。
启动 ActiveMQ 服务:解包下载的 ActiveMQ 》进去其bin 目录》双击 activemq.bat。
ActiveMQ 默认使用的是端口61616,可以在cmd中查看61616端口是否被占用,以确定ActiveMQ 服务是否正常启动。查看的命令如下:
netstat -nao | find "61616",如果服务启动则可以看到ActiveMQ所对应的的进程。
下面将以三种形式体验ActiveMQ 发送,接收消息的功能。
第一种是ActiveMQ 所支持的,但不是基于JNDI的,不是JMS标准所建议的。
发送消息的类如下,基于的端口是默认的61616,连续发送了5了消息,消息队列是xiaoyunduo。
import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Sender { public static void main(String[] srgs) throws JMSException, InterruptedException { factory; Connection connection = null; Session session = null; Destination destination = null; MessageProducer producer = null; // 创建连接工厂 factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 创建连接 connection = factory.createConnection(); // 建立连接 connection.start(); // 建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 指定消息队列 destination = session.createQueue("xiaoyunduo"); // 创建消息发生器 producer = session.createProducer(destination); for (int i = 0; i < 5; i++) { MapMessage message = session.createMapMessage(); message.setLong("mess", new Date().getTime()); Thread.sleep(1000); // 发送消息 producer.send(message); } session.commit(); session.close(); connection.close(); } }
接收消息的类如下,基于的端口是默认的61616,消息队列是xiaoyunduo。
import java.util.Date; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class Receiver { public static void main(String[] args) throws JMSException { ConnectionFactory factory = null; Connection connection = null; Session session = null; Destination destination = null; MessageConsumer consumer = null; // 创建连接工厂 factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 创建连接 connection = factory.createConnection(); // 建立连接 connection.start(); // 建立session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); // 指定消息队列 destination = session.createQueue("xiaoyunduo"); // 产生消费者 consumer = session.createConsumer(destination); for (int i = 0; i < 5; i++) { //获取jms server中的消息 MapMessage message = (MapMessage) consumer.receive(1000); session.commit(); System.out.println("收到消息:" + new Date(message.getLong("mess"))); } session.close(); connection.close(); } }
先启动发送消息的类,再启动接收消息的类,可以看到消息内容打印在消息接收端。
第二种是基于文件形式的JNDI,使用Sun自带的RefFSContextFactory来存储JNDI信息。需要引入fscontext.jar和providerutil.jar,这是进行测试的前提。
ConnectionFactory改从Context中读取,其余部分保持不变。发送消息和接收消息的类都需要进行对应的修改。
// 创建连接工厂 //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); Hashtable env = new Hashtable(5); env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.fscontext.RefFSContextFactory"); env.put(Context.PROVIDER_URL, "file:JNDI_REF"); try { Context ctx = new InitialContext(env); ActiveMQConnectionFactory mqFactory = new ActiveMQConnectionFactory(); mqFactory.setBrokerURL("tcp://localhost:61616"); mqFactory.setUserName(null); mqFactory.setPassword(null); //设置的参数少了某一项,只有sender发送等几秒后再去启动receive才没问题,否则接收不到消息 //ctx.bind("mqFactory", mqFactory);//只需要绑定一次 factory = (ConnectionFactory) ctx.lookup("mqFactory"); } catch (NamingException e) { e.printStackTrace(); }
第三种基于配置文件的方式,需要先有下面的一个配置文件jndi.properties,其中包含了ActiveMQ 的基本配置信息。
java.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory java.naming.provider.url=tcp://localhost:61616 java.naming.security.principal=system java.naming.security.credentials=manager connectionFactoryNames=con1,con2 ##queue.MyQueue=MyQueue topic.MyTopic=MyTopic topic.topic1=jms.topic1
还需要一个读取配置文件的工厂类,InitialContext利用properties信息进行初始化,将直接利用JNDI从InitialContext中读取信息。
import java.util.Properties; import javax.naming.Context; import javax.naming.InitialContext; import javax.naming.NamingException; public class JndiFactoryForJMS { protected Context context = null; public void initalize() throws NamingException { Properties props = new Properties(); try{ org.apache.activemq.jndi.ActiveMQInitialContextFactory af = new org.apache.activemq.jndi.ActiveMQInitialContextFactory(); props.load(this.getClass().getResourceAsStream("jndi.properties")); context = new InitialContext(props); }catch(Exception ex){ ex.printStackTrace(); } } public Context getJndiContext() throws NamingException { if(context == null){ initalize(); } return context; } }
和第二种方法类似,只需要改变第一个类的部分代码即可,发送端和接收端要同时修改。
// 创建连接工厂 //factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp:/ /localhost:61616"); try { JndiFactoryForJMS factoryForJMS = new JndiFactoryForJMS(); Context ctx = factoryForJMS.getJndiContext(); //获取连接工厂。 factory = (ConnectionFactory)ctx.lookup("con1"); } catch (NamingException e) { e.printStackTrace(); }