metaq实例

1 java客户端maven加载包

<dependency>

<groupId>com.taobao.metamorphosis</groupId>

<artifactId>metamorphosis-client</artifactId>

<version>1.4.6.2</version>

</dependency>

2 消息会话工厂类和生产者、消费者

<bean id="mqContext" class="com.liukunzhou.dpp.commons.mqclient.MQContextFactory"
init-method="start" factory-method="getMQContext" >

</bean>

以下是实现代码:

/**

* MQ上下文工厂

*/

public class MQContextFactory {

public static Map<String,MQConfigs> configMap=new HashMap<String, MQConfigs>();

public static Map<String,IMQContext> contextMap=new HashMap<String, IMQContext>();

public static IMQContext getMQContext(MQType type,String configName,String name){

String key=type.name()+"/"+configName;

if(configMap.get(key)==null){

MQConfigs conf=XmlParser.parse(configName);

configMap.put(key, conf);

}

String key2=key+"/"+name;

if(contextMap.get(key2)==null){

MQConfigs conf=configMap.get(key);

IMQContext context;

MQConfig config=conf.getConfig(name);

if(config==null){

return null;

}

if(type==MQType.METAQ){

context=new MetaQContext(config);

}else{

return null;

}

contextMap.put(key2, context );

}

return contextMap.get(key2);

}

public static IMQContext getMQContext(String configName,String name){

return getMQContext(MQType.METAQ, configName, name);

}

public static IMQContext getMQContext(String name){

return getMQContext("/mqclient_config.xml", name);

}

public static IMQContext getMQContext(){

return getMQContext("/mqclient_config.xml", "default");

}

public static enum MQType{

METAQ

}

}

import java.util.ArrayList;

import java.util.List;

import org.apache.commons.lang.StringUtils;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.liukunzhou.dpp.commons.mqclient.IMQContext;

import com.liukunzhou.dpp.commons.mqclient.IMsgListener;

import com.liukunzhou.dpp.commons.mqclient.IProducerHelper;

import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConfig;

import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQConsumerConfig;

import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQProducerConfig;

import com.liukunzhou.dpp.commons.mqclient.config.MQConfigs.MQTopicConfig;

import com.taobao.metamorphosis.client.MessageSessionFactory;

import com.taobao.metamorphosis.client.MetaClientConfig;

import com.taobao.metamorphosis.client.MetaMessageSessionFactory;

import com.taobao.metamorphosis.client.consumer.ConsumerConfig;

import com.taobao.metamorphosis.client.consumer.MessageConsumer;

import com.taobao.metamorphosis.client.producer.MessageProducer;

import com.taobao.metamorphosis.exception.MetaClientException;

import com.taobao.metamorphosis.utils.ZkUtils.ZKConfig;

/**

* MetaQ实现

*/

public class MetaQContext implements IMQContext {

Logger logger=LoggerFactory.getLogger(getClass());

protected final MessageSessionFactory sessionFactory ;

protected final MetaQProducerHelper producerHelper;

protected final MQConfig config;

protected List<MessageConsumer> consumers=new ArrayList<MessageConsumer>();

protected List<MessageProducer> producers=new ArrayList<MessageProducer>();

public MetaQContext(MQConfig config) {

this.config = config;

sessionFactory=createSessionFactory(config.getUrl(),config.getZkRoot());

producerHelper=new MetaQProducerHelper(createProducer());

producers.add(producerHelper.getProducer());

}

@Override

public IProducerHelper getProducerHelper() {

return producerHelper;

}

/**

* 创建SessionFactory

* @param url

* @return

*/

protected MessageSessionFactory createSessionFactory(String url,String zkRoot) {

MessageSessionFactory sessionFactory = null;

try {

final MetaClientConfig metaClientConfig = new MetaClientConfig();

final ZKConfig zkConfig = new ZKConfig();

zkConfig.zkConnect = url;

if(StringUtils.isNotEmpty(zkRoot)){

zkConfig.zkRoot=zkRoot;

}

metaClientConfig.setZkConfig(zkConfig);

sessionFactory = new MetaMessageSessionFactory(metaClientConfig);

} catch (Exception e) {

logger.error("create SessionFactory error",e);

throw new RuntimeException(e);

}

return sessionFactory;

}

/**

* 获取生产者

* @return

*/

protected MessageProducer createProducer() {

return sessionFactory.createProducer();

}

/**

* 获取消费者

* @return

*/

protected MessageConsumer createConsumer(String group,int runnerNum,long delay) {

try {

ConsumerConfig config = new ConsumerConfig(group);

// 抓取线程数

config.setFetchRunnerCount(runnerNum);

config.setConsumeFromMaxOffset();

// 抓取间隔最大间隔时间(每次递增10%)

config.setMaxDelayFetchTimeInMills(delay);

return sessionFactory.createConsumer(config);

} catch (Exception e) {

logger.error("create Consumer exception", e);

throw new RuntimeException(e.getCause());

}

}

@Override

public void start() {

for(MQProducerConfig mc:config.getProducers()){

producerHelper.publish(mc.getTopic());

}

for(MQConsumerConfig mc:config.getConsumers()){

MetaQConsumerHelper consumer=new MetaQConsumerHelper(this.createConsumer(mc.getGroup(), mc.getRunnerNum(), mc.getDelay()));

for(MQTopicConfig tc:mc.getTopics()){

IMsgListener listener;

try {

listener = (IMsgListener)Class.forName(tc.getListener()).newInstance();

} catch (Exception e) {

logger.error("init listener ("+tc+") error",e);

continue;

}

consumer.subscribe(tc.getName(), tc.getMaxSize(), listener );

}

consumer.completeSubscribe();

consumers.add(consumer.getConsumer());

}

}

@Override

public void stop() {

try {

for(MessageConsumer c:consumers){

c.shutdown();

}

for(MessageProducer c:producers){

c.shutdown();

}

} catch (MetaClientException e) {

logger.error("stop exception", e);

}

}

}

3 配置文件

/**

* 配置文件解析器

*/

public class XmlParser {

public static MQConfigs parse(String configName){

Digester dig = new Digester();

// push 调用类

dig.push(new MQConfigs());

// 设置匹配规则处理类

dig.setRules(new ExtendedBaseRules());

// 遇到config结点时创建MQConfig对象 以下调用有顺序

dig.addObjectCreate("mqconfigs/config", MQConfig.class);

//处理属性

dig.addSetProperties("mqconfigs/config");

// 处理子结点

dig.addBeanPropertySetter("mqconfigs/config/?");

//注册生产主题

dig.addObjectCreate("mqconfigs/config/producer", MQProducerConfig.class);

dig.addSetProperties("mqconfigs/config/producer");

dig.addBeanPropertySetter("mqconfigs/config/producer/?");

//消费者

dig.addObjectCreate("mqconfigs/config/consumer", MQConsumerConfig.class);

dig.addSetProperties("mqconfigs/config/consumer");

dig.addBeanPropertySetter("mqconfigs/config/consumer/?");

//主题

dig.addObjectCreate("mqconfigs/config/consumer/topic", MQTopicConfig.class);

dig.addSetProperties("mqconfigs/config/consumer/topic");

dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");

dig.addBeanPropertySetter("mqconfigs/config/consumer/topic/?");

dig.addSetNext("mqconfigs/config/consumer/topic", "addTopic");

dig.addSetNext("mqconfigs/config/consumer", "addConsumer");

dig.addSetNext("mqconfigs/config/producer", "addProducer");

// 遇到model结点时,调用XmlTest的addModel方法(参数为XmlModel)

dig.addSetNext("mqconfigs/config", "addConfig");

try {

return (MQConfigs) dig.parse(XmlParser.class.getResourceAsStream(configName));

} catch (Exception e) {

e.printStackTrace();

throw new RuntimeException(e);

}

}

}

<?xml version="1.0" encoding="UTF-8"?>

<mqconfigs>

<config name="158" url="192.168.1.158:2181" >

<consumer group="meta-commonBiz" runnerNum="6" delay="200">

<topic name="ORDERBIZ" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />

</consumer>

<producer topic="ORDERBIZ"/>

</config>

<config name="133" url="192.168.1.133:2181" zkRoot="/meta-order">

<consumer group="meta-commonBiz" runnerNum="6" delay="200">

<topic name="TEST" maxSize="2048" listener="com.liukunzhou.dpp.commons.mqclient.t.MyMsgListener" />

</consumer>

<producer topic="TEST"/>

</config>

</mqconfigs>

时间: 2024-11-01 15:37:02

metaq实例的相关文章

MetaQ 实例之三

六.Message Consumer 一.配置消费者:每个Java的消费者都需要一个ConsumerConfig的配置实例. 二.消费者分组 在MetaQ里,消费者被认为是一个集群,也就是说认为是有一组的机器在共同分担消费一个topic.因此消费者配置ConsumerConfig中最重要的配置是group,每个消费者都必须告诉MetaQ它属于哪个group,然后MetaQ会找出这个group下所有注册上来的消费者,在他们之间做负载均衡,共同消费一个或多个topic.注意,不同group之间可以认

metaq安装实例

下载metaq: http://fnil.net/downloads/index.html 安装metaq: [[email protected] software]# pwd /export/software [[email protected] software]# tar -zxvf metaq-server-1.4.6.2.tar.gz [[email protected] software]# cd taobao [[email protected] taobao]# ls COPYI

Nagios 监控实例部署

Nagios是一款企业级开源软件,专注于监控服务器上服务是否正常,不生成图形,提供报警机制,邮件或者短信发送监控状态,它通过各种插件实现不同的功能. Nagios        监控平台主程序 Nagios-plugins     必选插件 NRPE         监控远程服务器的主机资源 NSClient++      用于监控Windows主机 NDOUtils       将数据写入数据库 实例应用: 1 监控快速部署 监控需要安装http php nagios nagios-plugi

阿里中间件——消息中间件Notify和MetaQ

3.1.Notify Notify是淘宝自主研发的一套消息服务引擎,是支撑双11最为核心的系统之中的一个,在淘宝和支付宝的核心交易场景中都有大量使用.消息系统的核心作用就是三点:解耦,异步和并行.以下让我以一个实际的样例来说明一下解耦异步和并行分别所代表的详细意义吧: 如果我们有这么一个应用场景,为了完毕一个用户注冊淘宝的操作,可能须要将用户信息写入到用户库中,然后通知给红包中心给用户发新手红包,然后还须要通知支付宝给用户准备相应的支付宝账号,进行合法性验证,告知sns系统给用户导入新的用户等1

linux下tomcat shutdown后 java进程依旧存在 -- 阿里MetaQ篇

此篇文章描述的症状和上一篇文章一致(即执行tomcat ./shutdown.sh 后,虽然tomcat服务不能正常访问了,但是ps -ef | grep java 后,发现tomcat对应的java进程未随web容器关闭而销毁,进而存在僵尸java进程),但是处理的过程不一致,所有又单开了一篇blog来写. 我在另外一个项目中使用到了阿里的MetaQ消息中间件,然后shutdown tomcat 发现java进程依旧存在,沿用上一篇文章的思路,我最开始以为是本地代码中scheduledExec

中间件、MetaQ入门学习

目录 1. 中间件技术 2. MetaQ中间件 3. MetaQ编程实践 1. 中间件技术 0x1: 中间件简介 中间件(Middleware)是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务.面向服务的体系结构等中应用比较广泛,如: 1. 数据库 2. Apache的Tomcat 3. IBM公司的WebSphere 4. BEA公司的WebLogic[[应用服务器] 5. 东方通公司的Tong系列中

metaq入门部署到实战

初识metaq zookeeper部署,这里单机zk为例. wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.5/zookeeper-3.4.6.tar.gz tar -zxvf zookeeper-3.4.6.tar.gz cd zookeeper-3.4.6 mkdir data cp conf/zoo_sample.cfg conf/zoo.cfg 改动zk的配置文件.将data路径dataDir的值设置为zookee

solr分布式索引【实战一、分片配置读取:工具类configUtil.java,读取配置代码片段,配置实例】

1 private static Properties prop = new Properties(); 2 3 private static String confFilePath = "conf" + File.separator + "config.properties";// 配置文件目录 4 static { 5 // 加载properties 6 InputStream is = null; 7 InputStreamReader isr = null;

Spring事务管理(详解+实例)

写这篇博客之前我首先读了<Spring in action>,之后在网上看了一些关于Spring事务管理的文章,感觉都没有讲全,这里就将书上的和网上关于事务的知识总结一下,参考的文章如下: Spring事务机制详解 Spring事务配置的五种方式 Spring中的事务管理实例详解 1 初步理解 理解事务之前,先讲一个你日常生活中最常干的事:取钱. 比如你去ATM机取1000块钱,大体有两个步骤:首先输入密码金额,银行卡扣掉1000元钱:然后ATM出1000元钱.这两个步骤必须是要么都执行要么都