1 java客户端maven加载包
<dependency>
<groupId>com.taobao.metamorphosis</groupId>
<artifactId>metamorphosis-client</artifactId>
<version>1.4.6.2</version>
</dependency>
2 消息会话工厂类和生产者、消费者
<bean id="mqContext" class="com.liukunzhou.dpp.commons.mqclient.MQContextFactory"
init-method="start" factory-method="getMQContext" >
</bean>
以下是实现代码:
/**
* MQ上下文工厂
*/
public class MQContextFactory {
public static Map<String,MQConfigs> configMap=new HashMap<String, MQConfigs>();
public static Map<String,IMQContext> contextMap=new HashMap<String, IMQContext>();
public static IMQContext getMQContext(MQType type,String configName,String name){
String key=type.name()+"/"+configName;
if(configMap.get(key)==null){
MQConfigs conf=XmlParser.parse(configName);
configMap.put(key, conf);
}
String key2=key+"/"+name;
if(contextMap.get(key2)==null){
MQConfigs conf=configMap.get(key);
IMQContext context;
MQConfig config=conf.getConfig(name);
if(config==null){
return null;
}
if(type==MQType.METAQ){
context=new MetaQContext(config);
}else{
return null;
}
contextMap.put(key2, context );
}
return contextMap.get(key2);
}
public static IMQContext getMQContext(String configName,String name){
return getMQContext(MQType.METAQ, configName, name);
}
public static IMQContext getMQContext(String name){
return getMQContext("/mqclient_config.xml", name);
}
public static IMQContext getMQContext(){
return getMQContext("/mqclient_config.xml", "default");
}
public static enum MQType{
METAQ
}
}
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.liukunzhou.dpp.commons.mqclient.IMQContext;
import com.liukunzhou.dpp.commons.mqclient.IMsgListener;
import com.liukunzhou.dpp.commons.mqclient.IProducerHelper;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConsumerConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQProducerConfig;
import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQTopicConfig;
import com.taobao.metamorphosis.client.MessageSessionFactory;
import com.taobao.metamorphosis.client.MetaClientConfig;
import com.taobao.metamorphosis.client.MetaMessageSessionFactory;
import com.taobao.metamorphosis.client.consumer.ConsumerConfig;
import com.taobao.metamorphosis.client.consumer.MessageConsumer;
import com.taobao.metamorphosis.client.producer.MessageProducer;
import com.taobao.metamorphosis.exception.MetaClientException;
import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;
/**
* MetaQ实现
*/
public class MetaQContext implements IMQContext {
Logger logger=LoggerFactory.getLogger(getClass());
protected final MessageSessionFactory sessionFactory ;
protected final MetaQProducerHelper producerHelper;
protected final MQConfig config;
protected List<MessageConsumer> consumers=new ArrayList<MessageConsumer>();
protected List<MessageProducer> producers=new ArrayList<MessageProducer>();
public MetaQContext(MQConfig config) {
this.config = config;
sessionFactory=createSessionFactory(config.getUrl(),config.getZkRoot());
producerHelper=new MetaQProducerHelper(createProducer());
producers.add(producerHelper.getProducer());
}
@Override
public IProducerHelper getProducerHelper() {
return producerHelper;
}
/**
* 创建SessionFactory
* @param url
* @return
*/
protected MessageSessionFactory createSessionFactory(String url,String zkRoot) {
MessageSessionFactory sessionFactory = null;
try {
final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = url;
if(StringUtils.isNotEmpty(zkRoot)){
zkConfig.zkRoot=zkRoot;
}
metaClientConfig.setZkConfig(zkConfig);
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);
} catch (Exception e) {
logger.error("create SessionFactory error",e);
throw new RuntimeException(e);
}
return sessionFactory;
}
/**
* 获取生产者
* @return
*/
protected MessageProducer createProducer() {
return sessionFactory.createProducer();
}
/**
* 获取消费者
* @return
*/
protected MessageConsumer createConsumer(String group,int runnerNum,long delay) {
try {
ConsumerConfig config = new ConsumerConfig(group);
// 抓取线程数
config.setFetchRunnerCount(runnerNum);
config.setConsumeFromMaxOffset();
// 抓取间隔最大间隔时间(每次递增10%)
config.setMaxDelayFetchTimeInMills(delay);
return sessionFactory.createConsumer(config);
} catch (Exception e) {
logger.error("create Consumer exception", e);
throw new RuntimeException(e.getCause());
}
}
@Override
public void start() {
for(MQProducerConfig mc:config.getProducers()){
producerHelper.publish(mc.getTopic());
}
for(MQConsumerConfig mc:config.getConsumers()){
MetaQConsumerHelper consumer=new MetaQConsumerHelper(this.createConsumer(mc.getGroup(), mc.getRunnerNum(), mc.getDelay()));
for(MQTopicConfig tc:mc.getTopics()){
IMsgListener listener;
try {
listener = (IMsgListener)Class.forName(tc.getListener()).newInstance();
} catch (Exception e) {
logger.error("init listener ("+tc+") error",e);
continue;
}
consumer.subscribe(tc.getName(), tc.getMaxSize(), listener );
}
consumer.completeSubscribe();
consumers.add(consumer.getConsumer());
}
}
@Override
public void stop() {
try {
for(MessageConsumer c:consumers){
c.shutdown();
}
for(MessageProducer c:producers){
c.shutdown();
}
} catch (MetaClientException e) {
logger.error("stop exception", e);
}
}
}
3 配置文件
/**
* 配置文件解析器
*/
public class XmlParser {
public static MQConfigs parse(String configName){
Digester dig = new Digester();
// push 调用类
dig.push(new MQConfigs());
// 设置匹配规则处理类
dig.setRules(new ExtendedBaseRules());
// 遇到config结点时创建MQConfig对象 以下调用有顺序
dig.addObjectCreate("mqconfigs/config", MQConfig.class);
//处理属性
dig.addSetProperties("mqconfigs/config");
// 处理子结点
dig.addBeanPropertySetter("mqconfigs/config/?");
//注册生产主题
dig.addObjectCreate("mqconfigs/config/producer", MQProducerConfig.class);
dig.addSetProperties("mqconfigs/config/producer");
dig.addBeanPropertySetter("mqconfigs/config/producer/?");
//消费者
dig.addObjectCreate("mqconfigs/config/consumer", MQConsumerConfig.class);
dig.addSetProperties("mqconfigs/config/consumer");
dig.addBeanPropertySetter("mqconfigs/config/consumer/?");
//主题
dig.addObjectCreate("mqconfigs/config/consumer/topic", MQTopicConfig.class);
dig.addSetProperties("mqconfigs/config/consumer/topic");
dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");
dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");
dig.addSetNext("mqconfigs/config/consumer/topic", "addTopic");
dig.addSetNext("mqconfigs/config/consumer", "addConsumer");
dig.addSetNext("mqconfigs/config/producer", "addProducer");
// 遇到model结点时,调用XmlTest的addModel方法(参数为XmlModel)
dig.addSetNext("mqconfigs/config", "addConfig");
try {
return (MQConfigs) dig.parse(XmlParser.class.getResourceAsStream(configName));
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<mqconfigs>
<config name="158" url="192.168.1.158:2181" >
<consumer group="meta-commonBiz" runnerNum="6" delay="200">
<topic name="ORDERBIZ" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />
</consumer>
<producer topic="ORDERBIZ"/>
</config>
<config name="133" url="192.168.1.133:2181" zkRoot="/meta-order">
<consumer group="meta-commonBiz" runnerNum="6" delay="200">
<topic name="TEST" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />
</consumer>
<producer topic="TEST"/>
</config>
</mqconfigs>