zookeeper和metaq客户端请自行搜索....
使用方式如下:
metaq maven依赖
<dependency> <groupId>com.taobao.metamorphosis</groupId> <artifactId>metamorphosis-client</artifactId> <version>1.4.6.2</version> </dependency>
metaq 配置
dubbo.registry.address=127.0.0.1:2181
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <!-- metaq producer --> <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean"> <property name="zkConnect" value="${metaq.registry.address}" /> <property name="zkSessionTimeoutMs" value="30000" /> <property name="zkConnectionTimeoutMs" value="30000" /> <property name="zkSyncTimeMs" value="5000" /> </bean> <!-- 默认消息转换对象,生产和消费必须是同一个类,改用下面自定义的 --> <!-- <bean class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter" /> --> <bean id="messageBodyConverter" class="com.pay.utils.MetaqMessageConverter" /> <bean id="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate"> <property name="messageSessionFactory" ref="sessionFactory" /> <property name="messageBodyConverter" ref="messageBodyConverter" /> </bean> <!-- metaq customer --> <bean id="topic1" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic"> <!-- 同一个group(包括集群)下只有一个消费者 --> <property name="group" value="group1" /> <property name="topic" value="test" /> <property name="maxBufferSize" value="16384" /> </bean> <bean id="topic1Listener" class="com.pay.metaq.MetaqTestListener"> <property name="processThreads" value="10" /> </bean> <!-- metaq container --> <bean id="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer"> <property name="messageSessionFactory" ref="sessionFactory" /> <property name="messageBodyConverter" ref="messageBodyConverter" /> <property name="subscribers"> <map> <entry key-ref="topic1" value-ref="topic1Listener" /> <!-- ... --> </map> </property> </bean> </beans>
metaq consumer
package com.pay.metaq; import com.alibaba.fastjson.JSON; import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener; import com.taobao.metamorphosis.client.extension.spring.MetaqMessage; public class MetaqTestListener extends DefaultMessageListener<String> { @Override public void onReceiveMessages(MetaqMessage<String> msg) { String y = msg.getBody(); System.out.println(JSON.toJSONString(msg)); System.out.println(y); } }
metaq producer
package com.pay.tests.combine; import java.io.BufferedReader; import java.io.InputStreamReader; import java.util.Date; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import com.taobao.metamorphosis.client.extension.spring.MessageBuilder; import com.taobao.metamorphosis.client.extension.spring.MetaqTemplate; import api.pay.combine.dto.PayItemReqDto; @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration("/notenv/applicationContext.xml") public class MetaqTest { @Autowired private MetaqTemplate metaqTemplate; @Test public void test01() throws Exception { PayItemReqDto dto = new PayItemReqDto(); BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); String line = null; while ((line = reader.readLine()) != null) { dto.setOrderNo("order001"); dto.setTotalFee(1); dto.setSpName("spname001"); dto.setTimeOut(new Date()); dto.setSpDetail(line); metaqTemplate.send(MessageBuilder.withTopic("test").withBody(dto)); } } }
时间: 2024-10-31 17:17:41