RocketMQ生产者示例程序

  转载请注明出处:http://www.cnblogs.com/xiaodf/

  本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。

  程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<operator>
	<parameters>
		<parameter>
			<key>rocketmq.nameserver.list</key>
			<value>172.16.8.106:9876</value>
		</parameter>
		<parameter>
			<key>rocketmq.group.id</key>
			<value>test006</value>
		</parameter>
		<parameter>
			<key>rocketmq.topic</key>
			<value>TopicTest2</value>
		</parameter>
		<parameter>
			<key>rocketmq.tags</key>
			<value>*</value>
		</parameter>
		<parameter>
			<key>rocketmq.message.key</key>
			<value>OrderID0034</value>
		</parameter>
		<parameter>
			<key>schemaStr</key>
			<value>col1:string,col2:double</value>
		</parameter>
		<parameter>
			<key>filePath</key>
			<value>/home/test/rocketmq/input.txt</value>
		</parameter>
	</parameters>
</operator>

  

生产者示例程序如下:

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.scistor.datavision.operator.common.AvroUtils;
import com.scistor.datavision.operator.common.OperatorConfiguration;
import org.apache.avro.Schema;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.schema.HCatSchema;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class RocketProducer {

    // parameters
    private String nameserver;
    private String rocketmqTopic;
    private String tags;
    private String key;
    private String schemaStr;
    private String filePath;

    public RocketProducer configure(OperatorConfiguration conf) {
        this.nameserver = conf.get("rocketmq.nameserver.list");
        this.rocketmqTopic = conf.get("rocketmq.topic");
        this.tags = conf.get("rocketmq.tags");
        this.key = conf.get("rocketmq.message.key");
        this.schemaStr = conf.get("schemaStr");
        this.filePath = conf.get("filePath");
        return this;
    }

    public int run() {
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameserver);
        producer.setInstanceName("RocketProducer");
        /**
         * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         * 注意:切记不可以在每次发送消息时,都调用start方法
         */
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

        HCatSchema hcatSchema = null;
        Schema schema = null;
        SchemaUtil schemaUtil = new SchemaUtil();
        try {
            hcatSchema = schemaUtil.createHCatSchema(schemaStr);
            schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema);
        } catch (HCatException e) {
            e.printStackTrace();
        }

        List<String> content = RocketProducer.readFileByLines(filePath);

        /**
         * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
         * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
         * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
         * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
         */
        for (int i = 0; i < content.size(); i++) {
            try {
                {
                    String[] fields = content.get(i).split(",");
                    Object[] record = AvroUtils.convert(schema, fields);
                    byte[] bytes = AvroUtils.serialize(schema, record);
                    Message msg = new Message(rocketmqTopic,// topic
                            tags,// tag
                            key,// key
                            bytes);// body
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            //TimeUnit.MILLISECONDS.sleep(10);
        }

        /**
         * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
         * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法
         */
        producer.shutdown();
        return 0;
    }

    public static List<String> readFileByLines(String fileName) {
        List<String> list = new ArrayList<String>();
        File file = new File(fileName);
        BufferedReader reader = null;
        try {
            System.out.println("以行为单位读取文件内容,一次读一整行:");
            reader = new BufferedReader(new FileReader(file));
            String tempString = null;
            int line = 1;
            // 一次读入一行,直到读入null为文件结束
            while ((tempString = reader.readLine()) != null) {
                // 显示行号
                list.add(tempString);
                System.out.println("line " + line + ": " + tempString);
                line++;
            }
            reader.close();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (reader != null) {
                try {
                    reader.close();
                } catch (IOException e1) {
                }
            }
        }
        return list;
    }

    public static void main(String[] args) {
        if (args.length < 1) {
            System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录");
            System.exit(-1);
        }
        OperatorConfiguration conf = new OperatorConfiguration(args[0]);
        RocketProducer trainer = new RocketProducer();
        System.exit(trainer.configure(conf).run());
    }
}

  

程序运行输出打印到控制台:

[[email protected] rocketmq]# ./produce.sh
以行为单位读取文件内容,一次读一整行:
line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1
line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2
line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3
line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4
line 5: hdfs:///user/xdf/streaming/file-web/file,1
line 6: /home/xdf/workflow/file-web/file/1.html,1
line 7: /home/xdf/workflow/file-web/file/2.html,2
line 8: /home/xdf/workflow/file-web/file/3.html,3
line 9: /home/xdf/workflow/file-web/file/4.html,4
line 10: /home/xdf/workflow/file-web/file,1
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710]
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700]
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668]
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663]
SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649]
SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633]
SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629]
SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626]
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711]
SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]

  

时间: 2024-10-28 19:39:30

RocketMQ生产者示例程序的相关文章

Android cocos2dx游戏开发——示例程序HelloCpp源码分析

本文通过分析cocos2dx提供的示例程序HelloCpp来分析cocos2dx的启动过程. 我们从HelloCpp.java开始: [java] view plaincopyprint? package org.cocos2dx.hellocpp; import org.cocos2dx.lib.Cocos2dxActivity; import android.os.Bundle; public class HelloCpp extends Cocos2dxActivity{ protecte

winpcap示例程序采用VC6编译时出错error C2054: expected &#39;(&#39; to follow &#39;_W64&#39;

e:\vehiclesecurity\wpdpack_4_1_2\include\pcap-stdinc.h(80) : error C2054: expected '(' to follow '_W64' e:\vehiclesecurity\wpdpack_4_1_2\include\pcap-stdinc.h(80) : error C2085: 'uintptr_t' : not in formal parameter list e:\vehiclesecurity\wpdpack_4_

Windows Socket编程示例-TCP示例程序

前面一部分是介绍,后面有示例 1.网络中进程之间如何通信? 首要解决的问题是如何唯一标识一个进程,否则通信无从谈起!在本地可以通过进程PID来唯一标识一个进程,但是在网络中这是行不通的.其实TCP/IP协议族已经帮我们解决了这个问题,网络层的"ip地址"可以唯一标识网络中的主机,而传输层的"协议+端口"可以唯一标识主机中的应用程序(进程).这样利用三元组(ip地址,协议,端口)就可以标识网络的进程了,网络中的进程通信就可以利用这个标志与其它进程进行交互. 使用TCP

创建ArcGIS API for JavaScript的第一个示例程序

原文:创建ArcGIS API for JavaScript的第一个示例程序 在上一篇博客中已经介绍了如何搭建ArcGIS API for JavaScript开发环境,如果您还没有搭建好开发环境的话,参考博客:http://blog.csdn.net/zdw_wym/article/details/48678913. 如果开发环境搭建好了的话,那么今天我们继续来搭建我们的第一个ArcGIS API for JavaScript应用程序. 下面首先将代码贴出来,复制到VS2012中新建的html

.NET跨平台:在CentOS上编译dnx并运行ASP.NET 5示例程序

在之前的博文中我们在 Ubuntu 上成功编译出了 dnx ,并且用它成功运行了 ASP.NET 5 示例程序.在这篇博文中我们将 Ubuntu 换成 CentOS. 目前 dnx 的编译需要用到 mono,所以先要安装 mono,而且最好是用最新的 mono 源代码进行编译并安装. 我们实际成功编译的操作步骤如下(假设将 mono 安装到 /data/mono_build 目录): mkdir /data/mono_build PATH=/data/mono_build/bin:$PATH g

ABP示例程序-使用AngularJs,ASP.NET MVC,Web API和EntityFramework创建N层的单页面Web应用

本片文章翻译自ABP在CodeProject上的一个简单示例程序,网站上的程序是用ABP之前的版本创建的,模板创建界面及工程文档有所改变,本文基于最新的模板创建.通过这个简单的示例可以对ABP有个更深入的了解,每个工程里应该写什么样的代码,代码如何组织以及ABP是如何在工程中发挥作用的. 源文档地址:https://www.codeproject.com/Articles/791740/Using-AngularJs-ASP-NET-MVC-Web-API-and-EntityFram 源码可以

Hadoop示例程序wordcount分析

wordcount作为Hadoop的示例程序,其思想很简洁,但也值得去理解 尤其是作为Hadoop菜鸟的我 wordcount程序如下: package com.lcy.hadoop.examples; import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import

如何编译ReactNative示例程序Examples

通过示例程序可以看到一些基本组件的使用,对于学习ReactNative是很有帮助的. 编译示例程序需要将整个项目导入到androidStudio中,androidStudio导入项目时选择react-native/ReactAndroid目录. 由于项目依赖ndk因此如果要编译Examples还需要安装配置ndk目录,下载ndk后是一个自解压程序,会释放ndk的目录.然后需要设置环境变量或者在react-native根目录下新建local.properties文件,文件内容如下:sdk.dir=

JSON API描述以及示例程序

JSON部分API说明: 示例程序: #include <stdio.h> #include <stdlib.h> #include <stddef.h> #include <string.h> #include "json.h" int main(int argc, char **argv) { struct json_tokener *tok; struct json_object *my_string, *my_int, *my_o