Storm监控文件夹变化 统计文件单词数量

监控指定文件夹,读取文件(新文件动态读取)里的内容,统计单词的数量。

FileSpout.java,监控文件夹,读取新文件内容


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

package com.test.stormtest.wordcount;

import java.io.File;

import java.io.IOException;

import java.util.Collection;

import java.util.List;

import java.util.Map;

import org.apache.commons.io.FileUtils;

import backtype.storm.spout.SpoutOutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichSpout;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Values;

public class FileSpout extends BaseRichSpout {

    private static final long serialVersionUID = 1L;

    

    private SpoutOutputCollector collector;

    private File target = new File("F:" + File.separator + "test");

    private Collection<File> cacheFiles = null;

    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {

        this.collector = collector;

        

        //启动的时候,将文件夹内的所有文件的内容发射出去

        cacheFiles = FileUtils.listFiles(target, nulltrue);

        for (File file : cacheFiles) {

            emitFileConent(file);

        }

    }

    public void nextTuple() {

        try {

            Thread.sleep(5000);

        catch (InterruptedException e1) {

            e1.printStackTrace();

        }

        

        //监控新文件,将新文件的内容发射出去

        Collection<File> files = FileUtils.listFiles(target, nulltrue);

        for (File file : files) {

            if(!cacheFiles.contains(file)) {

                emitFileConent(file);

            }

        }

        

        cacheFiles = files;

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("line"));

    }

    

    //将文件内容按行发射出去

    private void emitFileConent(File file) {

        try {

            List<String> lines = FileUtils.readLines(file);

            for (String line : lines) {

                this.collector.emit(new Values(line));

            }

        catch (IOException e) {

            e.printStackTrace();

        }

    }

}

SplitBolt.java,将行拆分成单词


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

package com.test.stormtest.wordcount;

import java.util.Map;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Fields;

import backtype.storm.tuple.Tuple;

import backtype.storm.tuple.Values;

public class SplitBolt extends BaseRichBolt {

    private static final long serialVersionUID = 1L;

    

    private OutputCollector collector = null;

    

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

    }

    public void execute(Tuple input) {

        String line = input.getStringByField("line");

        String[] words = line.split(" ");

        for (String word : words) {

            this.collector.emit(new Values(word));

        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

        declarer.declare(new Fields("word"));

    }

}

SumBolt.java 统计单词数量


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

package com.test.stormtest.wordcount;

import java.util.HashMap;

import java.util.Map;

import java.util.Map.Entry;

import java.util.Set;

import backtype.storm.task.OutputCollector;

import backtype.storm.task.TopologyContext;

import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.topology.base.BaseRichBolt;

import backtype.storm.tuple.Tuple;

public class SumBolt extends BaseRichBolt{

    private static final long serialVersionUID = 1L;

    private Map<String, Long> countMap = null;

    

    public void prepare(Map stormConf, TopologyContext context,

            OutputCollector collector) {

        countMap = new HashMap<String, Long>();

    }

    public void execute(Tuple input) {

        String word = input.getStringByField("word");

        Long count = countMap.get(word);

        if(count == null) {

            count = 0L;

        }

        countMap.put(word, ++count);

        

        System.out.println("-----------------------------------------------");

        Set<Entry<String, Long>> entries = countMap.entrySet();

        for (Entry<String, Long> entry : entries) {

            System.out.println(entry.getKey() + ": " + entry.getValue());

        }

    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

WordCountTopology.java 驱动类,本地模式提交topology


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

package com.test.stormtest.wordcount;

import backtype.storm.Config;

import backtype.storm.LocalCluster;

import backtype.storm.topology.TopologyBuilder;

import backtype.storm.tuple.Fields;

import backtype.storm.utils.Utils;

public class WordCountTopology {

    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        

        builder.setSpout("filespout"new FileSpout());

        builder.setBolt("splitbolt"new SplitBolt()).shuffleGrouping("filespout");

        builder.setBolt("sumtblot"new SumBolt()).fieldsGrouping("splitbolt"newFields("word"));

        

        LocalCluster cluster = new LocalCluster();

        Config config = new Config();

        config.setDebug(true);

        cluster.submitTopology("wordcount", config, builder.createTopology());

        

        Utils.sleep(20000);

        cluster.killTopology("wordcount");

        cluster.shutdown();

    }

}

时间: 2024-10-17 13:25:11

Storm监控文件夹变化 统计文件单词数量的相关文章

python 遍历文件夹并统计文件数量

使用python遍历文件夹下的子文件夹及文件,并统计出文件夹下文件的数量: 1 import os 2 count = 0 3 4 5 # 遍历文件夹 6 def walkFile(file): 7 for root, dirs, files in os.walk(file): 8 # root 表示当前正在访问的文件夹路径 9 # dirs 表示该文件夹下的子目录名list 10 # files 表示该文件夹下的文件list 11 12 # 遍历文件 13 for f in files: 14

Linux下的C程序,遍历文件夹并统计其中各个类型文件所占百分比

递归遍历一个目录下的所有文件和文件夹,统计各个类型文件所占的百分比 程序代码a.cpp(编译命令:g++ a.cpp -o a) #include <stdio.h> #include <sys/stat.h> #include <sys/types.h> #include <unistd.h> #include <stdlib.h> #include <dirent.h> #include <string.h> stru

键盘录入一个文件夹路径,统计该文件夹(包含子文件夹)中每种类型的文件及个数,注意:用文件类型(后缀名,不包含.(点),如:&quot;java&quot;,&quot;txt&quot;)作为key, 用个数作为value,放入到map集合中,遍历map集合

package cn.it.zuoye5; import java.io.File;import java.util.HashMap;import java.util.Iterator;import java.util.Map;import java.util.Scanner;import java.util.Set; /** 键盘录入一个文件夹路径,统计该文件夹(包含子文件夹)中每种类型的文件及个数,注意:用文件类型(后缀名,不包含.(点),如:"java","txt&qu

PHP 查看文件夹大小、复制文件夹、删除文件夹

PHP虽然提供了 filesize.copy.unlink 等文件操作的函数,但是没有提供 dirsize.copydir.rmdirs 等文件夹操作的函数(rmdir也只能删除空目录).所以只能手动编写这些函数,主要的技巧是通过递归将问题逐层分解,直到分解成可以直接解决的最小子问题. ====================查看文件夹大小===================== 由于文件夹是没有大小的,平常所说的文件夹尺寸准确的说应该是文件夹中所有文件的总大小.所以只需要将文件夹逐层查找,统

关于Android工程中的主要文件夹存放的文件种类

Android工程中的文件类型包括以下几种: 一.jar包:是为了解决软件的兼容问题.如果在低版本Android平台上开发一个应用程序,而应用程序又想使用高版本才拥有的功能,就需要使用Support库.Android private libraries 和 Android 4.2.2  .Referenced libraries 等.其实 Android private libraries 和Referenced libraries 两个文件夹中的都是通过configure build  pat

VBA读取文件夹下所有文件夹及文件内容,并以树形结构展示

Const TR_LEVEL_MARK = "+"Const TR_COL_INDEX = "A"Const TR_COL_LEVEL = "E"Const TR_COL_NAME = "C"Const TR_COL_COUNT = "D"Const TR_COL_TREE_START = "F"Const TR_ROW_HEIGHT = 23Const TR_COL_LINE_WIDT

shell命令查看某文件夹下的文件个数

shell命令查看某文件夹下的文件个数 2010-06-25 17:05:15|  分类: shell |字号 订阅 1.查看某文件夹下文件的个数: ls -l |grep "^-"|wc -l 或 find ./company -type f | wc -l 2.查看某文件夹下文件的个数,包括子文件夹里的: ls -lR|grep "^-"|wc -l 3.查看某文件夹下文件夹的个数,包括子文件夹里的: ls -lR|grep "^d"|wc

使用vb.net获取相应文件夹下相应文件的个数并显示

1. 前言 因为我们实验室是做机器视觉方向的, 经常需要拍图, 最近在做双相机的实验, 但是拍图的时候, 可能会有很小几率的丢帧现象, 就是一个相机拍到图了, 另一个相机没有, 这个很讨厌. 我们本来是通过观察 左下角的数字来确定文件夹中相应的图片的数量, 不过这个太费劲了, 并且数字也特别小, 于是我们就想写个小程序来处理这个问题. 2. 实现 这里我们使用vb.net 来实现, 为什么用这个呢, 其实自己也不清楚, 反正, 就是不想用MFC了, MFC 太冗余了. 2.1 需求分析 需求可以

Python---进阶---文件操作---获取文件夹下所有文件的数量和大小

一.####编写一个程序,统计当前目录下每个文件类型的文件数 ####思路: - 打开当前的文件夹 - 获取到当前文件夹下面所有的文件 - 处理我们当前的文件夹下面可能有文件夹的情况(也打印出来) - 做出统计 ------------------------------------- import os #获取到当前文件夹下面所有的文件all_files = os.listdir(os.curdir) #os.curdir 表示当前目录 curdir:currentdirectory type