storm实时计算实例(socket实时接入)

介绍

实现了一个简单的从实时日志文件监听,写入socket服务器,再接入Storm计算的一个流程。

源码

日志监听实时写入socket服务器

[java] view plain copy

  1. package socket;
  2. import java.io.BufferedReader;
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.io.InputStreamReader;
  6. import java.io.PrintWriter;
  7. import java.io.RandomAccessFile;
  8. import java.net.Socket;
  9. import java.util.concurrent.Executors;
  10. import java.util.concurrent.ScheduledExecutorService;
  11. import java.util.concurrent.TimeUnit;
  12. /*
  13. * 监测数据,通过socket远程发送到另外服务器 ,见MyServerMulti
  14. * ClientRead再通过服务器从socket里读
  15. *
  16. */
  17. public class LogViewToSocket {
  18. private long lastTimeFileSize = 0;  //上次文件大小
  19. /**
  20. * 实时输出日志信息
  21. * @param logFile 日志文件
  22. * @throws IOException
  23. */
  24. public String getNewFile(File file)
  25. {
  26. File[] fs=file.listFiles();
  27. long maxtime=0;
  28. String newfilename="";
  29. for (int i=0;i<fs.length;i++)
  30. {
  31. if (fs[i].lastModified()>maxtime)
  32. {
  33. maxtime=fs[i].lastModified();
  34. newfilename=fs[i].getAbsolutePath();
  35. }
  36. }
  37. return newfilename;
  38. }
  39. RandomAccessFile randomFile=null;
  40. String newfile=null;
  41. String thisfile=null;
  42. public void realtimeShowLog(final File logFile,final PrintWriter out) throws IOException{
  43. newfile=getNewFile(logFile);
  44. //指定文件可读可写
  45. randomFile = new RandomAccessFile(new File(newfile),"r");
  46. //启动一个线程每1秒钟读取新增的日志信息
  47. ScheduledExecutorService exec =
  48. Executors.newScheduledThreadPool(1);
  49. exec.scheduleWithFixedDelay(new Runnable(){
  50. public void run() {
  51. try {
  52. //获得变化部分的
  53. randomFile.seek(lastTimeFileSize);
  54. String tmp = "";
  55. while( (tmp = randomFile.readLine())!= null) {
  56. System.out.println(new String(tmp.getBytes("ISO8859-1")));
  57. out.println(new String(tmp.getBytes("ISO8859-1")));
  58. out.flush();
  59. }
  60. thisfile=getNewFile(logFile);
  61. if(!thisfile.equals(newfile))
  62. {
  63. randomFile = new RandomAccessFile(new File(newfile),"r");
  64. lastTimeFileSize=0;
  65. }
  66. else
  67. lastTimeFileSize = randomFile.length();
  68. } catch (IOException e) {
  69. throw new RuntimeException(e);
  70. }
  71. }
  72. }, 0, 1, TimeUnit.SECONDS);
  73. }
  74. public static void main(String[] args) throws Exception {
  75. LogViewToSocket view = new LogViewToSocket();
  76. Socket socket=new Socket("192.168.27.100",5678);
  77. PrintWriter out=new PrintWriter(socket.getOutputStream());
  78. final File tmpLogFile = new File("/home/hadoop/test");
  79. view.realtimeShowLog(tmpLogFile,out);
  80. // socket.close();
  81. }
  82. }

socket服务器处理

[java] view plain copy

  1. import java.io.BufferedReader;
  2. import java.io.IOException;
  3. import java.io.InputStreamReader;
  4. import java.io.PrintWriter;
  5. import java.net.ServerSocket;
  6. import java.net.Socket;
  7. import java.net.SocketAddress;
  8. import java.util.*;
  9. public class MyServerMulti {
  10. private static Socket socket1;
  11. public static void main(String[] args) throws IOException {
  12. ServerSocket server = new ServerSocket(5678);
  13. int i=0;
  14. ArrayList<PrintWriter> outs=new ArrayList<PrintWriter>();
  15. /*
  16. * 一个client socket发送数据过来, server端再发到其他client socket端
  17. *
  18. */
  19. Socket socket1=null;
  20. while (true) {
  21. Socket socket = server.accept();
  22. i++;
  23. System.out.println(i);
  24. System.out.println(socket.getInetAddress());
  25. PrintWriter out= new PrintWriter(socket.getOutputStream());
  26. outs.add(out);
  27. if(i==1)
  28. socket1=socket;
  29. if(i==2)
  30. invoke(socket1,outs);
  31. }
  32. }
  33. private static void invoke(final Socket client, final ArrayList<PrintWriter> outs) throws IOException {
  34. new Thread(new Runnable() {
  35. public void run() {
  36. BufferedReader in = null;
  37. PrintWriter out = null;
  38. PrintWriter out1 = null;
  39. try {
  40. in = new BufferedReader(new InputStreamReader(client.getInputStream()));
  41. out = new PrintWriter(client.getOutputStream());
  42. while (true) {
  43. String msg = in.readLine();
  44. System.out.println(msg);
  45. out.println("Server received " + msg);
  46. out.flush();
  47. /*数据转发送到多个client*/
  48. for(int i=0;i<outs.size();i++)
  49. {
  50. out1=outs.get(i);
  51. System.out.println(i);
  52. System.out.println("send msg:"+msg);
  53. out1.println(msg);
  54. out1.flush();
  55. }
  56. System.out.println(client.getInetAddress());
  57. if (msg.equals("bye")) {
  58. break;
  59. }
  60. }
  61. } catch(IOException ex) {
  62. ex.printStackTrace();
  63. } finally {
  64. try {
  65. in.close();
  66. } catch (Exception e) {}
  67. try {
  68. out.close();
  69. } catch (Exception e) {}
  70. try {
  71. client.close();
  72. } catch (Exception e) {}
  73. }
  74. }
  75. }).start();
  76. }
  77. }

storm topology

[java] view plain copy

  1. import java.io.BufferedReader;
  2. import java.io.BufferedWriter;
  3. import java.io.File;
  4. import java.io.FileNotFoundException;
  5. import java.io.FileOutputStream;
  6. import java.io.FileReader;
  7. import java.io.FileWriter;
  8. import java.io.IOException;
  9. import java.io.InputStreamReader;
  10. import java.io.OutputStreamWriter;
  11. import java.io.PrintWriter;
  12. import java.io.RandomAccessFile;
  13. import java.net.Socket;
  14. import java.net.UnknownHostException;
  15. import java.util.Map;
  16. //import mytest.ThroughputTest.GenSpout;
  17. import backtype.storm.Config;
  18. import backtype.storm.LocalCluster;
  19. import backtype.storm.StormSubmitter;
  20. import backtype.storm.generated.AlreadyAliveException;
  21. import backtype.storm.generated.InvalidTopologyException;
  22. import backtype.storm.spout.SpoutOutputCollector;
  23. import backtype.storm.task.OutputCollector;
  24. import backtype.storm.task.TopologyContext;
  25. import backtype.storm.topology.BasicOutputCollector;
  26. import backtype.storm.topology.OutputFieldsDeclarer;
  27. import backtype.storm.topology.TopologyBuilder;
  28. import backtype.storm.topology.base.BaseBasicBolt;
  29. import backtype.storm.topology.base.BaseRichBolt;
  30. import backtype.storm.topology.base.BaseRichSpout;
  31. import backtype.storm.tuple.Fields;
  32. import backtype.storm.tuple.Tuple;
  33. import backtype.storm.tuple.Values;
  34. import backtype.storm.utils.Utils;
  35. /*
  36. *
  37. *
  38. *  storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true
  39. *
  40. */
  41. public class SocketProcess {
  42. public static class  SocketSpout extends BaseRichSpout {
  43. /**
  44. */
  45. static Socket sock=null;
  46. static BufferedReader in=null;
  47. String str=null;
  48. private static final long serialVersionUID = 1L;
  49. private SpoutOutputCollector _collector;
  50. private BufferedReader br;
  51. private String dataFile;
  52. private BufferedWriter bw2;
  53. RandomAccessFile randomFile;
  54. private long lastTimeFileSize = 0;
  55. int cnt=0;
  56. //定义spout文件
  57. SocketSpout(){
  58. }
  59. //定义如何读取spout文件
  60. @Override
  61. public void open(Map conf, TopologyContext context,
  62. SpoutOutputCollector collector) {
  63. // TODO Auto-generated method stub
  64. _collector = collector;
  65. try {
  66. sock=new Socket("192.168.27.100",5678);
  67. in=
  68. new BufferedReader(new InputStreamReader(sock.getInputStream()));
  69. } catch (UnknownHostException e) {
  70. // TODO Auto-generated catch block
  71. e.printStackTrace();
  72. } catch (IOException e) {
  73. // TODO Auto-generated catch block
  74. e.printStackTrace();
  75. }
  76. }
  77. //获取下一个tuple的方法
  78. @Override
  79. public void nextTuple() {
  80. // TODO Auto-generated method stub
  81. if(sock==null){
  82. try {
  83. sock=new Socket("192.168.27.100",5678);
  84. in=
  85. new BufferedReader(new InputStreamReader(sock.getInputStream()));
  86. } catch (UnknownHostException e) {
  87. // TODO Auto-generated catch block
  88. e.printStackTrace();
  89. } catch (IOException e) {
  90. // TODO Auto-generated catch block
  91. e.printStackTrace();
  92. }
  93. }
  94. while(true){
  95. try {
  96. str = in.readLine();
  97. } catch (IOException e) {
  98. // TODO Auto-generated catch block
  99. e.printStackTrace();
  100. }
  101. System.out.println(str);
  102. _collector.emit(new Values(str));
  103. if(str.equals("end")){
  104. break;
  105. }
  106. }
  107. }
  108. @Override
  109. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  110. // TODO Auto-generated method stub
  111. declarer.declare(new Fields("line"));
  112. }
  113. }
  114. public static class Process extends BaseRichBolt{
  115. private String _seperator;
  116. private String _outFile;
  117. PrintWriter pw;
  118. private OutputCollector _collector;
  119. private BufferedWriter bw;
  120. public Process(String outFile) {
  121. this._outFile   = outFile;
  122. }
  123. //把输出结果保存到外部文件里面。
  124. @Override
  125. public void prepare(Map stormConf, TopologyContext context,
  126. OutputCollector collector) {
  127. // TODO Auto-generated method stub
  128. this._collector = collector;
  129. File out = new File(_outFile);
  130. try {
  131. //                                  br = new BufferedWriter(new FileWriter(out));
  132. bw = new BufferedWriter(new OutputStreamWriter(
  133. new FileOutputStream(out, true)));
  134. } catch (IOException e1) {
  135. // TODO Auto-generated catch block
  136. e1.printStackTrace();
  137. }
  138. }
  139. //blot计算单元,把tuple中的数据添加一个bkeep和回车。然后保存到outfile指定的文件中。
  140. @Override
  141. public void execute(Tuple input) {
  142. // TODO Auto-generated method stub
  143. String line = input.getString(0);
  144. //                         System.out.println(line);
  145. //     String[] str = line.split(_seperator);
  146. //   System.out.println(str[2]);
  147. try {
  148. bw.write(line+",bkeep"+"\n");
  149. bw.flush();
  150. } catch (IOException e) {
  151. // TODO Auto-generated catch block
  152. e.printStackTrace();
  153. }
  154. _collector.emit(new Values(line));
  155. }
  156. @Override
  157. public void declareOutputFields(OutputFieldsDeclarer declarer) {
  158. // TODO Auto-generated method stub
  159. declarer.declare(new Fields("line"));
  160. }
  161. }
  162. public static void main(String[] argv) throws AlreadyAliveException, InvalidTopologyException{
  163. String outFile   = argv[0]; //输出文件
  164. boolean distribute = Boolean.valueOf(argv[1]);       //本地模式还是集群模式
  165. TopologyBuilder builder = new TopologyBuilder();  //build一个topology
  166. builder.setSpout("spout", new  SocketSpout(), 1);   //指定spout
  167. builder.setBolt("bolt", new Process(outFile),1).shuffleGrouping("spout");  //指定bolt,包括bolt、process和grouping
  168. Config conf = new Config();
  169. if(distribute){
  170. StormSubmitter.submitTopology("SocketProcess", conf, builder.createTopology());
  171. }else{
  172. LocalCluster cluster = new LocalCluster();
  173. cluster.submitTopology("SocketProcess", conf, builder.createTopology());
  174. }
  175. }
  176. }

最后执行

[java] view plain copy

  1. storm jar stormtest.jar socket.SocketProcess /home/hadoop/out_socket.txt true

spout接受从socket服务器实时发送过来的数据,经过topology处理,最终将数据写入out_socket.txt文件

转:http://blog.csdn.net/u011750989/article/details/18547015

时间: 2024-08-04 00:14:20

storm实时计算实例(socket实时接入)的相关文章

实时计算,流数据处理系统简介与简单分析

转自:http://www.csdn.net/article/2014-06-12/2820196-Storm 摘要:实时计算一般都是针对海量数据进行的,一般要求为秒级.实时计算主要分为两块:数据的实时入库.数据的实时计算.今天这篇文章详细介绍了实时计算,流数据处理系统简介与简单分析. 编者按:互联网领域的实时计算一般都是针对海量数据进行的,除了像非实时计算的需求(如计算结果准确)以外,实时计算最重要的一个需求是能够实时响应计算结果,一般要求为秒级.实时计算的今天,业界都没有一个准确的定义,什么

实时计算平台

实时计算平台中的弹性集群资源管理 本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用

实时计算平台中的弹性集群资源管理

本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用Kafka作为日志收集组件与实时应

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

实时计算框架之二:Storm之入门实例

预备.开火.瞄准-- 1 总结与提升 自1月份来,可谓是浮浮荡荡,一波三折呀. 先是参加了公司组织的创意马拉松大赛,虽说24小时内完成了作品,但是自己感觉上效果很差,自然成绩也是不高.通过这24小时持续的奋斗以及后来的各种产品描述等环节,发现了开发上的许多缺点.首先,对我们的产品进行了深入的认识和了解,也在产品之上,发现了更多可以发展走向成功的点子,这是我觉得最棒的一点:其次,短时间内和队员进行协作交流,生成产品,这之间的沟通非常重要:第三,选择C++作为24小时创作的语言,开发效率相对而言是非

Storm实时计算:流操作入门编程实践

转自:http://shiyanjun.cn/archives/977.html Storm实时计算:流操作入门编程实践 Storm是一个分布式是实时计算系统,它设计了一种对流和计算的抽象,概念比较简单,实际编程开发起来相对容易.下面,简单介绍编程实践过程中需要理解的Storm中的几个概念: Topology Storm中Topology的概念类似于Hadoop中的MapReduce Job,是一个用来编排.容纳一组计算逻辑组件(Spout.Bolt)的对象(Hadoop MapReduce中一

storm消费kafka实现实时计算

大致架构 * 每个应用实例部署一个日志agent * agent实时将日志发送到kafka * storm实时计算日志 * storm计算结果保存到hbase storm消费kafka 创建实时计算项目并引入storm和kafka相关的依赖 <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>1.0.

实时计算storm流程架构总结

hadoop一般用在离线的分析计算中,而storm区别于hadoop,用在实时的流式计算中,被广泛用来进行实时日志处理.实时统计.实时风控等场景,当然也可以用在对数据进行实时初步的加工,存储到分布式数据库中如HBase,便于后续的查询. 面对的大批量的数据的实时计算,storm实现了一个可扩展的.低延迟.可靠性和容错的分布式计算平台. 1.对象介绍 tuple:表示流中一个基本的处理单元,可以包括多个field,每个filed表示一个属性 topology:一个拓扑是一个个计算节点组成的图,每个

大数据学习之Storm实时计算概述及安装部署33

一:Storm概述 网址:http://storm.apache.org/ ApacheStorm是一个免费的开源分布式实时计算系统.Storm可以轻松可靠地处理无限数据流,实现Hadoop对批处理所做的实时处理.Storm非常简单,可以与任何编程语言一起使用,并且使用起来很有趣! Storm有许多用例:实时分析,在线机器学习,连续计算,分布式RPC,ETL等.风暴很快:一个基准测试表示每个节点每秒处理超过一百万个元组.它具有可扩展性,容错性,可确保您的数据得到处理,并且易于设置和操作. Sto