RocketMq入门之消息发送和接收

一、消息产生、发送

 1 public class Producer {
 2 public static void main(String[] args) throws MQClientException {
 3   DefaultMQProducer producer = new DefaultMQProducer("rmq-group");
 4   producer.setNamesrvAddr("172.18.4.114:9876");
 5   producer.setInstanceName("producer");
 6   producer.start();
 7   try {
 8     for (int i = 0; i < 10; i++) {
 9     Thread.sleep(5000); //每5秒发送一次MQ
10     Message msg = new Message("TopicA-test",// topic
11       "TagA",// tag
12       (new Date() + " Hello RocketMQ ,QuickStart" + i)
13       .getBytes()// body
14       );
15     SendResult sendResult = producer.send(msg);
16     }
17   } catch (Exception e) {
18     e.printStackTrace();
19   }
20   producer.shutdown();
21   }
22 }

二、消息接收、消费

 1 import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
 2 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 3 import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 4 import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 5 import com.alibaba.rocketmq.client.exception.MQClientException;
 6 import com.alibaba.rocketmq.common.message.MessageExt;
 7
 8 import java.util.List;
 9
10
11 public class Consumer {
12     public static void main(String[] args) throws MQClientException {
13         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("rmq-group");
14
15         consumer.setNamesrvAddr("172.18.4.114:9876");
16         consumer.setInstanceName("consumer");
17         consumer.subscribe("TopicA-test", "TagA");
18
19         consumer.registerMessageListener(new MessageListenerConcurrently() {
20                 @Override
21                 public ConsumeConcurrentlyStatus consumeMessage(
22                     List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
23                     for (MessageExt msg : msgs) {
24                         System.out.println(new String(msg.getTopic()));
25                         System.out.println(new String(msg.getTags()));
26                         System.out.println("=== " + new String(msg.getBody()));
27                     }
28
29                     return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
30                 }
31             });
32         consumer.start();
33         System.out.println("Consumer Started.");
34     }
35 }
时间: 2024-10-10 07:54:16

RocketMq入门之消息发送和接收的相关文章

Chromium的IPC消息发送、接收和分发机制分析

由于Chromium采用多进程架构,因此会涉及到进程间通信问题.通过前面一文的学习,我们知道Browser进程在启动Render进程的过程中会建立一个以UNIX Socket为基础的IPC通道.有了IPC通道之后,接下来Browser进程与Render进程就以消息的形式进行通信.我们将这种消息称为IPC消息,以区别于线程消息循环中的消息.本文就分析Chromium的IPC消息发送.接收和分发机制. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! Chrom

DICOM医学图像处理:DIMSE消息发送与接收“大同小异”之DCMTK fo-dicom mDCM

背景: 从DICOM网络传输一文开始,相继介绍了C-ECHO.C-FIND.C-STORE.C-MOVE等DIMSE-C服务的简单实现,博文中的代码给出的实例都是基于fo-dicom库来实现的,原因只有一个:基于C#的fo-dicom库具有高封装性.对于初学者来说实现大多数的DIMSE-C.DIMSE-N服务几乎都是"傻瓜式"操作--构造C-XXX-RQ.N-XXX-RQ然后绑定相应的OnResponseReceived处理函数即可.本博文希望在前几篇预热的基础上,对比DCMTK.fo

TeamTalk Android代码分析(业务流程篇)---消息发送和接收的整体逻辑说明

第一次纪录东西,也没有特别的顺序,想到哪里就随手画了一下,后续会继续整理- 6.2消息页面动作流程 6.2.1 消息页面初始化的总体思路 1.页面数据的填充更新直接由页面主线程从本地数据库请求 2.数据库数据的填充由后台线程异步方式从网络请求 3.前台线程每次按照18条记录读取数据库数据,后台线程按照每次18*3从网络请求数据 4.后台线程数据的请求由主线程满足一定的条件后发送总线事件,在 oneventbackgroudthread 中处理,具体条件(或的关系)如下: 1>第一次请求 2>本

Bluemix结合RabbitMq实现消息发送与接收实例

什么是RabbitMq? MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术.排队指的是应用程序通过 队列来通信.队列的使用除去了接收和发送应用程序同时执行的要求. 什么是Bluemix? BlueMix 是 IBM 基于 Cloud Foundr

cocos2dx 消息发送与接收

     cocos2dx有个自定义事件可以实现消息的发送和接收,叫EventListenerCustom.它是通过一个字符串来标识事件名称的.下面介绍下,我实现的这个消息的发送和接收.      首先,我们定义2个类,一个消息接收类,一个消息发送类.代码如下:       //消息接收 class cMsgReceiver { public: virtual void RecMsg(int iMsgId, void* pInfo, int iSize) { } }; //消息发送 class

windows消息发送与接收

Windows开发中一个重要的概念就是消息.能搞清楚消息的传递和处理,相信可以使我们对Windows程序有更深的理解. 先把消息划分为3类:发送消息(Incomingsent message).投递消息(Post message).输入消息(Input message).其中发送消息是非队列消息,而后两种是队列消息.在线程的消息队列中并不包括非队列消息,而只有队列消息才会在线程的消息队列中. 由上面的分类也可以知道为什么不能通过PostMessage函数来模拟输入动作.因为投递消息将进入到投递消

ActiveMQ消息发送与接收

推荐文章:ActiveMQ讯息传送机制以及ACK机制 ActiveMQ发送消息 1:创建链接工厂ConnectionFactory 2:创建链接Connection 3:启动session 4:创建消息发送目的地 5:创建生产者 6:发送消息 消息发送类: package com.apt.study.util.activemq; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; impor

linux 系统之间,网络编程,消息发送与接收

[email protected]:~/udp$ sudo apt-get update [email protected]:~/udp$ sudo apt-get install build-essential [email protected]:~/udp$ sudo apt-get install make [email protected]:~/udp$ ll -rw-rw-r-- 1 chunli chunli  279 May 15 10:36 makefile -rw-rw-r--

NTCPMSG 开源高性能TCP消息发送组件

https://www.cnblogs.com/eaglet/archive/2013/01/07/2849010.html 目前的.net 架构下缺乏高效的TCP消息发送组件,而这种组件是构建高性能分布式应用所必需的.为此我结合多年的底层开发经验开发了一个.net 下的高效TCP消息发送组件.这个组件在异步发送时可以达到每秒160万包,而相同大小的数据包用WCF的TCP模式OneWay 方式发送每秒只能达到5.6万包. 项目首页 http://ntcpmsg.codeplex.com/ 功能介