cascading--wordcount

在eclipse下运行wordcount,使用cascading封装

准备:centos系统,jdk,hadoop,eclipse,cascading的lib包,官网可下载,自带cascading封装的wordcount源码,以及爬虫数据data目录,这些均可以在官网下载

我是在cascading官网把材料下载好后,在eclipse中运行,可以得到测试数据

难点:cascading的版本与官网自带的wordcount实例可能不匹配,这需要自己自行修改,我的cascading版本不是在官网下载的

给出我的运行结果图:

代码如下:完整版

package com.zjf.cascading.example;

/*
 * WordCount example
 * zjf-pc
 * Copyright (c) 2007-2012 Concurrent, Inc. All Rights Reserved.
 * Project and contact information: http://www.concurrentinc.com/
 */

import java.util.Map;
import java.util.Properties;

import cascading.cascade.Cascade;
import cascading.cascade.CascadeConnector;
import cascading.cascade.Cascades;
import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.operation.Identity;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexFilter;
import cascading.operation.regex.RegexGenerator;
import cascading.operation.regex.RegexReplace;
import cascading.operation.regex.RegexSplitter;
import cascading.operation.xml.TagSoupParser;
import cascading.operation.xml.XPathGenerator;
import cascading.operation.xml.XPathOperation;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.pipe.SubAssembly;
import cascading.scheme.SequenceFile;
import cascading.scheme.TextLine;
import cascading.tap.Tap;
import cascading.tap.Hfs;
import cascading.tap.Lfs;
import cascading.tuple.Fields;

public class WordCount
  {
  @SuppressWarnings("serial")
private static class ImportCrawlDataAssembly extends SubAssembly
    {
    public ImportCrawlDataAssembly( String name )
      {
      //拆分文本行到url和raw
      RegexSplitter regexSplitter = new RegexSplitter( new Fields( "url", "raw" ) );
      Pipe importPipe = new Each( name, new Fields( "line" ), regexSplitter );
      //删除所有pdf文档
      importPipe = new Each( importPipe, new Fields( "url" ), new RegexFilter( ".*\\.pdf$", true ) );
      //把":n1"替换为"\n",丢弃无用的字段
      RegexReplace regexReplace = new RegexReplace( new Fields( "page" ), ":nl:", "\n" );
      importPipe = new Each( importPipe, new Fields( "raw" ), regexReplace, new Fields( "url", "page" ) );
      //此句强制调用
      setTails( importPipe );
      }
    }

  @SuppressWarnings("serial")
private static class WordCountSplitAssembly extends SubAssembly
    {
    public WordCountSplitAssembly( String sourceName, String sinkUrlName, String sinkWordName )
      {
      //创建一个新的组件,计算所有页面中字数,和一个页面中的字数
      Pipe pipe = new Pipe(sourceName);
     //利用TagSoup将HTML转成XHTML,只保留"url"和"xml"去掉其它多余的
      pipe = new Each( pipe, new Fields( "page" ), new TagSoupParser( new Fields( "xml" ) ), new Fields( "url", "xml" ) );
      //对"xml"字段运用XPath(XML Path Language)表达式,提取"body"元素
      XPathGenerator bodyExtractor = new XPathGenerator( new Fields( "body" ), XPathOperation.NAMESPACE_XHTML, "//xhtml:body" );
      pipe = new Each( pipe, new Fields( "xml" ), bodyExtractor, new Fields( "url", "body" ) );
      //运用另一个XPath表达式删除所有元素,只保留文本节点,删除在"script"元素中的文本节点
      String elementXPath = "//text()[ name(parent::node()) != ‘script‘]";
      XPathGenerator elementRemover = new XPathGenerator( new Fields( "words" ), XPathOperation.NAMESPACE_XHTML, elementXPath );
      pipe = new Each( pipe, new Fields( "body" ), elementRemover, new Fields( "url", "words" ) );
      //用正则表达式将文档打乱成一个个独立的单词,和填充每个单词(新元组)到当前流使用"url"和"word"字段
      RegexGenerator wordGenerator = new RegexGenerator( new Fields( "word" ), "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)" );
      pipe = new Each( pipe, new Fields( "words" ), wordGenerator, new Fields( "url", "word" ) );
      //按"url"分组
      Pipe urlCountPipe = new GroupBy( sinkUrlName, pipe, new Fields( "url", "word" ) );
      urlCountPipe = new Every( urlCountPipe, new Fields( "url", "word" ), new Count(), new Fields( "url", "word", "count" ) );
      //按"word"分组
      Pipe wordCountPipe = new GroupBy( sinkWordName, pipe, new Fields( "word" ) );
      wordCountPipe = new Every( wordCountPipe, new Fields( "word" ), new Count(), new Fields( "word", "count" ) );
      //此句强制调用
      setTails( urlCountPipe, wordCountPipe );
      }
    }

  public static void main( String[] args )
    {
      //设置当前工作jar
     Properties properties = new Properties();
     FlowConnector.setApplicationJarClass(properties, WordCount.class);
     FlowConnector flowConnector = new FlowConnector(properties);
     /**
      * 在运行设置的参数里设置如下代码:
      * 右击Main.java,选择run as>run confugrations>java application>Main>Agruments->Program arguments框内写入如下代码
      * data/url+page.200.txt output local
      * 分析:
      * args[0]代表data/url+page.200.txt,它位于当前应用所在的目录下面,且路径必须是本地文件系统里的路径
      * 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/data/url+page.200.txt
      * 且该路径需要自己创建,url+page.200.txt文件也必须要有,可以在官网下下载
      *
      * args[1]代表output文件夹,第二个参数,它位于分布式文件系统hdfs中
      * 我的路径是:hdfs://s104:9000/user/hadoop/output,该路径需要自己创建
      * 在程序运行成功后,output目录下会自动生成三个文件夹pages,urls,words
      * 里面分别包含所有的page,所有的url,所有的word
      *
      * args[2]代表local,第三个参数,它位于本地文件系统中
      * 我的所在目录是/home/hadoop/app/workspace/HadoopApplication001/local
      * 该文件夹不需要自己创建,在程序运行成功后会自动生成在我的上述目录中,
      * 且在该local文件夹下会自动生成两个文件夹urls和words,里面分别是url个数和word个数
      */
      String inputPath = args[ 0 ];
      String pagesPath = args[ 1 ] + "/pages/";
      String urlsPath = args[ 1 ] + "/urls/";
      String wordsPath = args[ 1 ] + "/words/";
      String localUrlsPath = args[ 2 ] + "/urls/";
      String localWordsPath = args[ 2 ] + "/words/";

    // import a text file with crawled pages from the local filesystem into a Hadoop distributed filesystem
    // the imported file will be a native Hadoop sequence file with the fields "page" and "url"
    // note this examples stores crawl pages as a tabbed file, with the first field being the "url"
    // and the second being the "raw" document that had all new line chars ("\n") converted to the text ":nl:".

    //初始化Pipe管道处理爬虫数据装配,返回字段url和page
    Pipe importPipe = new ImportCrawlDataAssembly( "import pipe" );

     //创建tap实例
    Tap localPagesSource = new Lfs( new TextLine(), inputPath );
    Tap importedPages = new Hfs( new SequenceFile( new Fields( "url", "page" ) ), pagesPath );

    //链接pipe装配到tap实例
    Flow importPagesFlow = flowConnector.connect( "import pages", localPagesSource, importedPages, importPipe );

    //拆分之前定义的wordcount管道到新的两个管道url和word
    // these pipes could be retrieved via the getTails() method and added to new pipe instances
    SubAssembly wordCountPipe = new WordCountSplitAssembly( "wordcount pipe", "url pipe", "word pipe" );

    //创建hadoop SequenceFile文件存储计数后的结果
    Tap sinkUrl = new Hfs( new SequenceFile( new Fields( "url", "word", "count" ) ), urlsPath );
    Tap sinkWord = new Hfs( new SequenceFile( new Fields( "word", "count" ) ), wordsPath );

    //绑定多个pipe和tap,此处指定的是pipe名称
    Map<String, Tap> sinks = Cascades.tapsMap( new String[]{"url pipe", "word pipe"}, Tap.taps( sinkUrl, sinkWord ) );
    //wordCountPipe指的是一个装配
    Flow count = flowConnector.connect( importedPages, sinks, wordCountPipe );

   //创建一个装配,导出hadoop sequenceFile 到本地文本文件
    Pipe exportPipe = new Each( "export pipe", new Identity() );
    Tap localSinkUrl = new Lfs( new TextLine(), localUrlsPath );
    Tap localSinkWord = new Lfs( new TextLine(), localWordsPath );

   // 使用上面的装配来连接两个sink
    Flow exportFromUrl = flowConnector.connect( "export url", sinkUrl, localSinkUrl, exportPipe );
    Flow exportFromWord = flowConnector.connect( "export word", sinkWord, localSinkWord, exportPipe );

    ////装载flow,顺序随意,并执行
    Cascade cascade = new CascadeConnector().connect( importPagesFlow, count, exportFromUrl, exportFromWord );
    cascade.complete();
    }
  }
时间: 2024-08-16 14:44:42

cascading--wordcount的相关文章

Storm入门(四)WordCount示例

Storm API文档网址如下: http://storm.apache.org/releases/current/javadocs/index.html 一.关联代码 使用maven,代码如下. pom.xml  和Storm入门(三)HelloWorld示例相同 RandomSentenceSpout.java /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor lice

彻底解密WordCount运行原理(DT大数据梦工厂)

主要内容: 数据流动视角解密WordCount RDD依赖关系视角解密WordCount DAG与Lineage的思考 ==========数据流动视角============ 新建文件,里面输入 Hello Spark Hello Scala Hello Hadoop Hello Flink Spark is awesome 修改代码: package com.dt.spark.SparkApps.cores; import java.util.Arrays; import java.util

Spark入门之WordCount详细版

1 package cn.spark.study.core; 2 3 import java.util.Arrays; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaPairRDD; 7 import org.apache.spark.api.java.JavaRDD; 8 import org.apache.spark.api.java.JavaSparkContext; 9 impo

运行Hadoop自带的wordcount单词统计程序

0.前言 前面一篇<Hadoop初体验:快速搭建Hadoop伪分布式环境>搭建了一个Hadoop的环境,现在就使用Hadoop自带的wordcount程序来做单词统计的案例. 1.使用示例程序实现单词统计 (1)wordcount程序 wordcount程序在hadoop的share目录下,如下: [[email protected] mapreduce]# pwd /usr/local/hadoop/share/hadoop/mapreduce [[email protected] mapr

hadoop wordcount入门

配置 ubuntu14.04 伪分布式 hadoop1.04 wordcount入门程序, 摘自hadoop基础教程 import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.

【Big Data - Hadoop - MapReduce】初学Hadoop之图解MapReduce与WordCount示例分析

Hadoop的框架最核心的设计就是:HDFS和MapReduce.HDFS为海量的数据提供了存储,MapReduce则为海量的数据提供了计算. HDFS是Google File System(GFS)的开源实现. MapReduce是Google MapReduce的开源实现. HDFS和MapReduce实现是完全分离的,并不是没有HDFS就不能MapReduce运算. 本文主要参考了以下三篇博客学习整理而成. 1. Hadoop示例程序WordCount详解及实例 2. hadoop 学习笔

SparkContext, map, flatMap, zip以及例程wordcount

SparkContext 通常作为入口函数,可以创建并返回一个RDD. 如把Spark集群当作服务端那Spark Driver就是客户端,SparkContext则是客户端的核心: 如注释所说 SparkContext用于连接Spark集群.创建RDD.累加器(accumlator).广播变量(broadcast variables) map操作: 会对每一条输入进行指定的操作,然后为每一条输入返回一个对象: flatMap操作: "先映射后扁平化" 操作1:同map函数一样:对每一条

标志数在wordcount程序中的应用与拓展

wordcount程序要求测出文本中的单词数,字符数和行数. 设计思路: 将文件读入,逐字检测,检测到空格单词数加一,检测到回车行数单词数加一,如果既不是回车也不是空格则说明是字符,字符数加一 编程时发现的问题: 以上思路针对的是正常输入的文本,实际输入时还会出现种种特殊情况. 1.在一行结束时(换行符之前)出现空格,也就是文本后面加一个空格再按回车,这种情况会导致单词数多1. 2.文本结束时没有按下回车,会导致行数和单词数少1. 3.连续出现几个空格,也会导致单词数增多. 解决方案 设置标志数

Hadoop 6、第一个mapreduce程序 WordCount

1.程序代码 Map: import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.StringUtils; public

CSS(Cascading Style Shee)

1.CSS是Cascading Style Sheet这个几个英文单词的缩写,翻译成中文是“层叠样式表”的意思 CSS能让网页制作者有效的定制.改善网页的效果. CSS是对HTML的补充,网页设计师曾经为无法很好的控制网页的显示效果而倍感苦恼,CSS的出现解决了这个问题. Css实现了网页内容和页面效果的彻底分离. 参考资料: 完整版W3CSchool线下教程  链接: http://pan.baidu.com/s/1jGSIqxg  密码: iuch TakeColor 8.0 CN Gree