转载请注明出处:http://www.cnblogs.com/xiaodf/
本示例展示了一个RocketMQ producer的简单实现,通过解析文本文件获取输入数据,将数据经过Avro序列化后发送到RocketMQ。
程序通过stdin.xml配置文件获取主要参数值,stdin.xml文件内容如下:
<?xml version="1.0" encoding="UTF-8"?> <operator> <parameters> <parameter> <key>rocketmq.nameserver.list</key> <value>172.16.8.106:9876</value> </parameter> <parameter> <key>rocketmq.group.id</key> <value>test006</value> </parameter> <parameter> <key>rocketmq.topic</key> <value>TopicTest2</value> </parameter> <parameter> <key>rocketmq.tags</key> <value>*</value> </parameter> <parameter> <key>rocketmq.message.key</key> <value>OrderID0034</value> </parameter> <parameter> <key>schemaStr</key> <value>col1:string,col2:double</value> </parameter> <parameter> <key>filePath</key> <value>/home/test/rocketmq/input.txt</value> </parameter> </parameters> </operator>
生产者示例程序如下:
import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.scistor.datavision.operator.common.AvroUtils; import com.scistor.datavision.operator.common.OperatorConfiguration; import org.apache.avro.Schema; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.schema.HCatSchema; import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class RocketProducer { // parameters private String nameserver; private String rocketmqTopic; private String tags; private String key; private String schemaStr; private String filePath; public RocketProducer configure(OperatorConfiguration conf) { this.nameserver = conf.get("rocketmq.nameserver.list"); this.rocketmqTopic = conf.get("rocketmq.topic"); this.tags = conf.get("rocketmq.tags"); this.key = conf.get("rocketmq.message.key"); this.schemaStr = conf.get("schemaStr"); this.filePath = conf.get("filePath"); return this; } public int run() { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr(nameserver); producer.setInstanceName("RocketProducer"); /** * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br> * 注意:切记不可以在每次发送消息时,都调用start方法 */ try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } HCatSchema hcatSchema = null; Schema schema = null; SchemaUtil schemaUtil = new SchemaUtil(); try { hcatSchema = schemaUtil.createHCatSchema(schemaStr); schema = schemaUtil.createSchema("com.scistor.rocketmq.producer", rocketmqTopic, hcatSchema); } catch (HCatException e) { e.printStackTrace(); } List<String> content = RocketProducer.readFileByLines(filePath); /** * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。 * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br> * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br> * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。 */ for (int i = 0; i < content.size(); i++) { try { { String[] fields = content.get(i).split(","); Object[] record = AvroUtils.convert(schema, fields); byte[] bytes = AvroUtils.serialize(schema, record); Message msg = new Message(rocketmqTopic,// topic tags,// tag key,// key bytes);// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } //TimeUnit.MILLISECONDS.sleep(10); } /** * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己 * 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法 */ producer.shutdown(); return 0; } public static List<String> readFileByLines(String fileName) { List<String> list = new ArrayList<String>(); File file = new File(fileName); BufferedReader reader = null; try { System.out.println("以行为单位读取文件内容,一次读一整行:"); reader = new BufferedReader(new FileReader(file)); String tempString = null; int line = 1; // 一次读入一行,直到读入null为文件结束 while ((tempString = reader.readLine()) != null) { // 显示行号 list.add(tempString); System.out.println("line " + line + ": " + tempString); line++; } reader.close(); } catch (IOException e) { e.printStackTrace(); } finally { if (reader != null) { try { reader.close(); } catch (IOException e1) { } } } return list; } public static void main(String[] args) { if (args.length < 1) { System.err.println("需要: 参数配置文件<stdin.xml>所在的hdfs目录"); System.exit(-1); } OperatorConfiguration conf = new OperatorConfiguration(args[0]); RocketProducer trainer = new RocketProducer(); System.exit(trainer.configure(conf).run()); } }
程序运行输出打印到控制台:
[[email protected] rocketmq]# ./produce.sh 以行为单位读取文件内容,一次读一整行: line 1: hdfs:///user/xdf/streaming/file-web/file/1.html,1 line 2: hdfs:///user/xdf/streaming/file-web/file/2.html,2 line 3: hdfs:///user/xdf/streaming/file-web/file/3.html,3 line 4: hdfs:///user/xdf/streaming/file-web/file/4.html,4 line 5: hdfs:///user/xdf/streaming/file-web/file,1 line 6: /home/xdf/workflow/file-web/file/1.html,1 line 7: /home/xdf/workflow/file-web/file/2.html,2 line 8: /home/xdf/workflow/file-web/file/3.html,3 line 9: /home/xdf/workflow/file-web/file/4.html,4 line 10: /home/xdf/workflow/file-web/file,1 SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00A36, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18710] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00AED, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18700] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00BA4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=2], queueOffset=18668] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00C5B, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=3], queueOffset=18663] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197504, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=0], queueOffset=18649] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E1975B4, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=1], queueOffset=18633] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197663, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=2], queueOffset=18629] SendResult [sendStatus=SEND_OK, msgId=AC10086B00002A9F000000001E197712, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-b, queueId=3], queueOffset=18626] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00D12, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=0], queueOffset=18711] SendResult [sendStatus=SEND_OK, msgId=AC10086A00002A9F000000001FB00DC1, messageQueue=MessageQueue [topic=TopicTest2, brokerName=broker-a, queueId=1], queueOffset=18701]
时间: 2024-10-28 19:39:30