Strom小实例,大小写转换

结构:

RandomWordSpout:

  1. package cn.itcast.stormdemo;
  2. import java.util.Map;
  3. import java.util.Random;
  4. import backtype.storm.spout.SpoutOutputCollector;
  5. import backtype.storm.task.TopologyContext;
  6. import backtype.storm.topology.OutputFieldsDeclarer;
  7. import backtype.storm.topology.base.BaseRichSpout;
  8. import backtype.storm.tuple.Fields;
  9. import backtype.storm.tuple.Values;
  10. import backtype.storm.utils.Utils;
  11. public class RandomWordSpout extends BaseRichSpout{
  12. private SpoutOutputCollector collector;
  13. //模拟一些数据
  14. String[] words = {"iphone","xiaomi","mate","sony","sumsung","moto","meizu"};
  15. //不断地往下一个组件发送tuple消息
  16. //这里面是该spout组件的核心逻辑
  17. @Override
  18. public void nextTuple() {
  19. //可以从kafka消息队列中拿到数据,简便起见,我们从words数组中随机挑选一个商品名发送出去
  20. Random random = new Random();
  21. int index = random.nextInt(words.length);
  22. //通过随机数拿到一个商品名
  23. String godName = words[index];
  24. //将商品名封装成tuple,发送消息给下一个组件
  25. collector.emit(new Values(godName));
  26. //每发送一个消息,休眠500ms
  27. Utils.sleep(500);
  28. }
  29. //初始化方法,在spout组件实例化时调用一次
  30. @Override
  31. public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
  32. this.collector = collector;
  33. }
  34. //声明本spout组件发送出去的tuple中的数据的字段名
  35. @Override
  36. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  37. declarer.declare(new Fields("orignname"));
  38. }
  39. }

SuffixBolt :

  1. package cn.itcast.stormdemo;
  2. import java.io.FileWriter;
  3. import java.io.IOException;
  4. import java.util.Map;
  5. import java.util.UUID;
  6. import backtype.storm.task.TopologyContext;
  7. import backtype.storm.topology.BasicOutputCollector;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.topology.base.BaseBasicBolt;
  10. import backtype.storm.tuple.Tuple;
  11. public class SuffixBolt extends BaseBasicBolt{
  12. FileWriter fileWriter = null;
  13. //在bolt组件运行过程中只会被调用一次
  14. @Override
  15. public void prepare(Map stormConf, TopologyContext context) {
  16. try {
  17. fileWriter = new FileWriter("/home/hadoop/stormoutput/"+UUID.randomUUID());
  18. } catch (IOException e) {
  19. throw new RuntimeException(e);
  20. }
  21. }
  22. //该bolt组件的核心处理逻辑
  23. //每收到一个tuple消息,就会被调用一次
  24. @Override
  25. public void execute(Tuple tuple, BasicOutputCollector collector) {
  26. //先拿到上一个组件发送过来的商品名称
  27. String upper_name = tuple.getString(0);
  28. String suffix_name = upper_name + "_itisok";
  29. //为上一个组件发送过来的商品名称添加后缀
  30. try {
  31. fileWriter.write(suffix_name);
  32. fileWriter.write("\n");
  33. fileWriter.flush();
  34. } catch (IOException e) {
  35. throw new RuntimeException(e);
  36. }
  37. }
  38. //本bolt已经不需要发送tuple消息到下一个组件,所以不需要再声明tuple的字段
  39. @Override
  40. public void declareOutputFields(OutputFieldsDeclarer arg0) {
  41. }
  42. }

TopoMain :

  1. package cn.itcast.stormdemo;
  2. import backtype.storm.Config;
  3. import backtype.storm.StormSubmitter;
  4. import backtype.storm.generated.AlreadyAliveException;
  5. import backtype.storm.generated.InvalidTopologyException;
  6. import backtype.storm.generated.StormTopology;
  7. import backtype.storm.topology.TopologyBuilder;
  8. /**
  9. * 组织各个处理组件形成一个完整的处理流程,就是所谓的topology(类似于mapreduce程序中的job)
  10. * 并且将该topology提交给storm集群去运行,topology提交到集群后就将永无休止地运行,除非人为或者异常退出
  11. * @author [email protected]
  12. *
  13. */
  14. public class TopoMain {
  15. public static void main(String[] args) throws Exception {
  16. TopologyBuilder builder = new TopologyBuilder();
  17. //将我们的spout组件设置到topology中去
  18. //parallelism_hint :4 表示用4个excutor来执行这个组件
  19. //setNumTasks(8) 设置的是该组件执行时的并发task数量,也就意味着1个excutor会运行2个task
  20. builder.setSpout("randomspout", new RandomWordSpout(), 4).setNumTasks(8);
  21. //将大写转换bolt组件设置到topology,并且指定它接收randomspout组件的消息
  22. //.shuffleGrouping("randomspout")包含两层含义:
  23. //1、upperbolt组件接收的tuple消息一定来自于randomspout组件
  24. //2、randomspout组件和upperbolt组件的大量并发task实例之间收发消息时采用的分组策略是随机分组shuffleGrouping
  25. builder.setBolt("upperbolt", new UpperBolt(), 4).shuffleGrouping("randomspout");
  26. //将添加后缀的bolt组件设置到topology,并且指定它接收upperbolt组件的消息
  27. builder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
  28. //用builder来创建一个topology
  29. StormTopology demotop = builder.createTopology();
  30. //配置一些topology在集群中运行时的参数
  31. Config conf = new Config();
  32. //这里设置的是整个demotop所占用的槽位数,也就是worker的数量
  33. conf.setNumWorkers(4);
  34. conf.setDebug(true);
  35. conf.setNumAckers(0);
  36. //将这个topology提交给storm集群运行
  37. StormSubmitter.submitTopology("demotopo", conf, demotop);
  38. }
  39. }

UpperBolt :

  1. package cn.itcast.stormdemo;
  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. public class UpperBolt extends BaseBasicBolt{
  9. //业务处理逻辑
  10. @Override
  11. public void execute(Tuple tuple, BasicOutputCollector collector) {
  12. //先获取到上一个组件传递过来的数据,数据在tuple里面
  13. String godName = tuple.getString(0);
  14. //将商品名转换成大写
  15. String godName_upper = godName.toUpperCase();
  16. //将转换完成的商品名发送出去
  17. collector.emit(new Values(godName_upper));
  18. }
  19. //声明该bolt组件要发出去的tuple的字段
  20. @Override
  21. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  22. declarer.declare(new Fields("uppername"));
  23. }
  24. }

来自为知笔记(Wiz)

时间: 2024-12-17 04:34:28

Strom小实例,大小写转换的相关文章

css实现的将英文单词进行大小写转换代码实例

css实现的将英文单词进行大小写转换代码实例:本章节分享一段代码实例,它实现了利用CSS将英文单词进行大小写转换效果.代码非常的简单,相关属性的更多内容可以自行在本站查询.代码实例如下: <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <meta name="author" content="http://www.51texiao.cn/&quo

一起talk C栗子吧(第一百六十六回:C语言实例--大小写字符转换)

各位看官们,大家好,上一回中咱们说的是C语言中常量的例子,这一回咱们说的例子是:大小写字符转换.闲话休提,言归正转.让我们一起talk C栗子吧! 看官们,在程序中经常会用到英文字符,有时候需要把英文字符进行大小写转换.如何进行转换呢?接下来我们一起看看具体的转换方法. 大家都知道每个字符都有相应的ASIIC码,如果把一个字符的ASIIC码加上一个数值就变成了另外一个字符的ASIIC码.比如给字符a的ASIIC码加上1后就成了字符字符b的ASIIC码.基于这个原理,我们可以在小写字符的ASIIC

Excel大小写转换函数

Excel中的大小写转换函数 (1).转换为全部小写字母:lower函数 (2).转换为全部大写字母:upper函数 (3).转换为首字母大写,其余小写字母:proper函数 三种函数的使用方式,如下图所示: 流程操作,如下图所示: 小知识,简而记之. 蓝的成长记系列: 原创作品,出自 "深蓝的blog" 博客 蓝的成长记--追逐DBA(1):奔波于路上,挺进山东 蓝的成长记--追逐DBA(2):安装!安装!久违的记忆,引起我对DBA的重新认知 蓝的成长记--追逐DBA(3):古董上操

C++大小写转换和性能(C语言,C++,API,STL一共4种方法)

大小写转换和性能 前言 本文主要讨论最基本的一些大小写转换函数和API,不讨论一些常见的字符串程序库里面的大小写转换接口,另外本文的落脚点是这些转换函数的性能和日常开发中遇到的一些问题. 不考虑范围 其实ctype.h里面有定义一套宏,就是不考虑字符是否落在A-Z,a-z范围,直接计算(直接用加减法或者使用位与或计算,差别不是很大).显然这样的效率是最高的,但是使用可能是有问题的,遇到中文或者其他友邦的一些字符,可能就转换错了,当然如果已经提前确认过输入会落在A-Z,a-z范围,则是可以使用这种

黄聪:PHP字符串操作(string替换、删除、截取、复制、连接、比较、查找、包含、大小写转换、切割成数组等)

一.字符串替换 str_replace("iwind", "kiki", "i love iwind, iwind said"); 将输出 "i love kiki, kiki said" str_replace(find,replace,string,count)参数 描述 find 必需.规定要查找的值. replace 必需.规定替换 find 中的值的值. string 必需.规定被搜索的字符串. count 可选.一

【C++】【一日一练】读写文件小实例【20140510】

需要反省,说好一日一练的,昨天周五就偷懒出去逛了逛...吾当一日三省吾身... 今天的练习是写一个小程序,需要用到文件的读写,不过越写越长,又用到了很多乱七八糟的内置类什么的,就当是复习了吧,大概涉及到以下几个知识: enum fstream stringstream vector 渣程序如下: #include <iostream> #include <fstream> #include <string> #include <sstream> // str

从一个简单的小实例分析JSP+Servelt与JSP+Struts2框架的区别

最近在学struts2,struts2相比以前的JSP+Servlet,在处理流程上的更简单,我们就一个小实例来具体分析一下. 实例内容如下: 实现一个简单的注册页面包括:用户名.密码.重复密码.年龄.出生日期.毕业日期 要求如下:用户名的长度在4-6之间密码的长度在4-6之间重复密码与密码相等年龄在10-50之间出生日期在毕业日期之前 输入错误返回原页面,并在原页面的文本框后面显示具体的错误信息.正确输入则跳入下个页面将信息显示出来. 1.JSP+Servlet 1)我们编写注册页面regis

C++大小写转换和性能

大小写转换和性能 前言 本文主要讨论最基本的一些大小写转换函数和API,不讨论一些常见的字符串程序库里面的大小写转换接口,另外本文的落脚点是这些转换函数的性能和日常开发中遇到的一些问题. 不考虑范围 其实ctype.h里面有定义一套宏,就是不考虑字符是否落在A-Z,a-z范围,直接计算(直接用加减法或者使用位与或计算,差别不是很大).显然这样的效率是最高的,但是使用可能是有问题的,遇到中文或者其他友邦的一些字符,可能就转换错了,当然如果已经提前确认过输入会落在A-Z,a-z范围,则是可以使用这种

vim大小写转换 以及区块选择 复制粘贴

一  文本选择: 矩形区块选择: [Ctrl]+v 区块选择,可以用长方形的 方式选择 v 字符选择,会将光标经过的地方反白选择! V(大写) 行选择,会将光标经过的行反白选择! 选择好以后命令行模式下使用d删除,y复制,光标移动到另一个地方,在命令行模式用p粘贴到光标下方,P粘贴到光标上方. 二 vim中大小写转化的命令是 gu或者gU 形象一点的解释就是小u意味着转为小写:大U意味着转为大写. 1.整篇文章大写转化为小写 打开文件后,无须进入命令行模式.键入:ggguG 解释一下:ggguG