阿里云RocketMQ的消费者简单实现

业务场景之类的请看另一篇生产者的实现;

package com.ttt.eee;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.nio.charset.Charset;
import java.util.Properties;
import java.util.Scanner;

public class MQTestConsumer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 您在控制台创建的 Group ID,其实就是网上说的groupName
        properties.put(PropertyKeyConst.GROUP_ID, "eee");
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "sss");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "bbb");
        // 设置 TCP 接入域名,到控制台的实例基本信息中查看
        properties.put(PropertyKeyConst.NAMESRV_ADDR,
                       "http://ttt.mq-internet-access.mq-internet.aliyuncs.com:80");
        // 集群订阅方式 (默认)
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
        // 广播订阅方式
        // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
        Consumer consumer = ONSFactory.createConsumer(properties);
        // TODO tag如果是*表示订阅所有的tag消息,注意在producer里是叫tags,这里却叫subExpresion
        // *表示订阅所有Tag,TagA||TagB表示订阅 TagA和TagB
        consumer.subscribe("xxx-change", "*", (message, context) -> {
            // context的用处暂时不知道
            System.out.println("Receive: " + message);
            System.out.println("具体消息为:" + new String(message.getBody(), Charset.forName("UTF-8")));
            // 正常消费返回这个,如果消费消息后业务处理出现问题一般返回:Action.ReconsumeLater表示这条消息晚点处理;
            return Action.CommitMessage;
        });
        //订阅另外一个 Topic
        // TODO 一个Consumer可以订阅多个topic ??,不过既然是官网的例子应该是可以的
        /*consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { //订阅全部 Tag
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + message);
                return Action.CommitMessage;
            }
        });*/
        consumer.start();
        System.out.println("Consumer Started");

        var scanner = new Scanner(System.in);
        scanner.next();

        consumer.shutdown();
        System.out.println("closed producer conn.");
    }

}

集合到Spring里是:

@Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean xxxNotify(XxxNotifyListener xxxNotifyListener) {
        return this.getConsumer(gid, topic, xxxNotifyListener);
    }

    private ConsumerBean getConsumer(String gid, String topic, MessageListener messageListener) {
        Properties properties = new Properties();
        properties.setProperty("addr", addr);
        properties.setProperty("AccessKey", accessKey);
        properties.setProperty("SecretKey", secretKey);
        properties.setProperty("GROUP_ID", gid);

        Map<Subscription, MessageListener> subscriptionTable = new HashMap<>();
        Subscription subscription = new Subscription();
        subscription.setTopic(topic);
        // 这里还可以设置subExpression来描述tag
        subscriptionTable.put(subscription, messageListener);

        ConsumerBean consumer = new ConsumerBean();
        consumer.setProperties(properties);
        consumer.setSubscriptionTable(subscriptionTable);

        return consumer;
    }

MessageListener里是用来实现消费这个消息后的具体业务逻辑的;

原文地址:https://www.cnblogs.com/silentdoer/p/11101736.html

时间: 2024-10-04 15:00:33

阿里云RocketMQ的消费者简单实现的相关文章

阿里云RocketMQ的性能测试(一、本地测试)

因业务需要,需要进行阿里云RocketMQ的性能测试. 环境,一台windows系统CPU:I7,内存:8G,64位操作系统. 测试两种场景,为了保证订阅关系一致性(可以去阿里云官网了解订阅关系一致性),消费分为两种模式. 1.按Tag订阅,订阅所有Tag,测试使用3个消费者,3个生产者,每个生产者发送一万条数据,放到同一个Tag里,对应同一个ShardingKey,保证顺序消费. 测试结果: 2.按Tag订阅,每个消费者订阅一个Tag,每个Tag在一个分组里面. 测试结果: 其中要特别注意第二

阿里云api调用做简单的cmdb

阿里云api调用做简单的cmdb 1 步骤 事实上就是调用阿里api.获取可用区,比方cn-hangzhou啊等等.然后在每一个区调用api 取ecs的状态信息,最好写到一个excel里面去.方便排序排版. 2 示意图 3 源代码 https://github.com/gqdw/cmdb/tree/master

基于阿里云RocketMQ的分片顺序消费+监听器自动启动的Springboot实验

发送消息RocketMqProducerService package com.jane.rocketmq.service; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.order.OrderProducer; import org.springframework.s

在阿里云上搭建一个简单的node服务器

一.阿里云服务器以及node环境的搭建 服务器可以去阿里云官网购买一个ECS云服务器,价格还是有点小贵的,如果想使用免费的阿里云服务器,那么阿里云官网每天也是有抢免费的服务器的,每天上午十点,新人能抢到为期半年的服务器. 然后有了服务器以后,首先搭建一个node的运行环境,保证node 能正常使用,这个不是本文的内容就不多加赘述了. 如果没有搭建的小伙伴可以参考这里. 二.远程服务器上的代码管理 在阿里云服务器上可以安装一个 git 版本控制器,将自己的代码放置在自己的 GitHub 上,然后在

nginx+uwsgi阿里云ubuntu服务器上简单部署flask RESTful风格项目

ubuntu16.04上利用Nginx代理uwsgi处理Flask web应用 1.环境要求 ubuntu16.04  ----  阿里云的服务器 Nginx python2 uwsgi 2.简单介绍Nginx nginx是一个高性能的http和反向代理的服务器,Nginx采采用的epoll的机制,而没有使用select和poll,虽然,在用户活跃数量比较高的时候,epoll性能不如select,但是,我们用Nginx来作为web服务器还是很不错的.nginx是一个轻量级的web服务器,他占用内

CentOS7 配置阿里云yum源,非常之简单

1.进入yum的文件夹 命令:cd   /etc/yum.repos.d/ 2.下载wget 命令:yum -y install wget 3.删除yum文件夹所有yum源 命令:rm -rf    /etc/yum.repos.d/*.repo 4.利用wget下载阿里云repo文件 命令:wget  http://mirrors.aliyun.com/repo/Centos-7.repo 5.执行yum源更新命令 命令:yum clean all 命令:yum makecache 注意:依次

阿里云OSS,PHP简单上传文件并返回地址【转】

需要下载php 的sdk包,注意路径问题 up.php <!DOCTYPE html> <html> <head> <meta charset="utf-8"> </head> <body> <form action="oss.php" method="post" enctype="multipart/form-data"> <!--用户

(转)云存储:阿里云OSS 、又拍云和 七牛 的比较

阿里OSS:好处就是,那是一套完整的体系,存储,数据库,CDN,服务器,阿里都可以给你全包.缺点,费用对于没有盈利的网站来说太高了,好像定位就是给那些高端客户使用的,而且CDN,OSS的流量是分开收费,带宽(2倍成本,呵呵).又拍云:算是老牌静态存储服务商,自带有CDN.存储空间可以弹性增加(不知道可不可以弹性减少,我只是免费使用了一下).费用计算公式(空间和流量),请求次数是免费.可免费试用7天.开源的程序(DZ,PW,WP)都有插件,也可以直接使用FTP,对于技术上要求不是太高就可以使用.七

.net MVC4.0项目发布到阿里云虚拟主机中遇到的问题。

正所谓学以致用,今天本来想做个bootstrap的demo发到服务器上看一下效果,结果服务器搞了半天,最终太晚了没能学到什么东西. 首先写好页面之后我做了一个MVC4.0的网站项目,然后把Bootstrap包装进去, 再然后用FTP上传到阿里云的虚拟主机,本来虚拟主机是Linux我放了一个wordpress的博客网站,但因为我学的是.Net就给改成了windowserver2008,其实阿里云换系统挺简单的几下就搞定了. 点下一步,提交就行了. 注意:提交后需要重置FTP的密码: 因为我已经换了