luigi学习2-在hadoop上运行Top Artists

一、AggregateArtistsHadoop

class AggregateArtistsHadoop(luigi.contrib.hadoop.JobTask):
    date_interval = luigi.DateIntervalParameter()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget("data/artist_streams_%s.tsv" % self.date_interval)

    def requires(self):
        return [StreamsHdfs(date) for date in self.date_interval]

    def mapper(self, line):
        timestamp, artist, track = line.strip().split()
        yield artist, 1

    def reducer(self, key, values):
        yield key, sum(values)

实现的功能和AggregateArtists类似,需要注意的是:

luigi.contrib.hadoop.JobTask不需要你实现run方法,需要你实现mapper和reducer方法。mapper和combiner需要yield包含两个元素的tuple,这两个元素也可以是tuple类型的。

这个task是依赖StreamsHdfs类型task的。现在看看StreamsHdfs的内容吧:

二、StreamHdfs

class StreamsHdfs(Streams):
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.date.strftime(‘data/streams_%Y_%m_%d_faked.tsv‘))

这个类和Stream类的工作是一样的,所以它继承了Stream类,并且重写了output方法,也就是说这个类最终的结果输出是在hdfs上。

三、执行AggregateArtistsHadoop

执行下面的命令,出现了报错,从错误信息中,我们可以看到NoSectionError,这是关于配置文件的错误,详情请参考luigi的配置文件,我的博客也给出了部分常用的配置

 PYTHONPATH=‘‘ luigi --module top_artists AggregateArtistsHadoop --local-scheduler --date-interval 2012-06
  File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 103, in get
    return self._get_with_default(ConfigParser.get, section, option, default, **kwargs)
  File "/root/miniconda2/envs/my_python_env/lib/python2.7/site-packages/luigi/configuration.py", line 93, in _get_with_default
    return method(self, section, option, **kwargs)
  File "/root/miniconda2/envs/my_python_env/lib/python2.7/ConfigParser.py", line 607, in get
    raise NoSectionError(section)
NoSectionError: No section: ‘hadoop‘
Exception AttributeError: "‘DefaultHadoopJobRunner‘ object has no attribute ‘tmp_dir‘" in <bound method DefaultHadoopJobRunner.__del__ of <luigi.contrib.hadoop.DefaultHadoopJobRunner object at 0x7fda9c9b38d0>> ignored

在当前工作目录创建luigi.cfg,并且写入下面的配置,然后重新执行该命令

(my_python_env)[[email protected] pythonCode]# cat luigi.cfg
[hadoop]
command=hadoop#提交hadoop作业的命令
python-executable=python#本机执行python脚本的命令
scheduler=fair#hadoop的调度器
streaming-jar=/usr/local/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar#streaming包的位置
version=apache1#hadoop的版本

四、find top Artists

如果你看到了这里,说明已经计算出了每个artist的出现次数,并且保存在本地或者HDFS上的文件中。现在我们将要找出前10个artist。这里我们选用普通的python程序来计算。

class Top10Artists(luigi.Task):
    date_interval = luigi.DateIntervalParameter()
    use_hadoop = luigi.BoolParameter()

    def requires(self):
        if self.use_hadoop:
            return AggregateArtistsHadoop(self.date_interval)
        else:
            return AggregateArtists(self.date_interval)

    def output(self):
        return luigi.LocalTarget("data/top_artists_%s.tsv" % self.date_interval)

    def run(self):
        top_10 = nlargest(10, self._input_iterator())
        with self.output().open(‘w‘) as out_file:
            for streams, artist in top_10:
                print >> out_file, self.date_interval.date_a, self.date_interval.date_b, artist, streams

    def _input_iterator(self):
        with self.input().open(‘r‘) as in_file:
            for line in in_file:
                artist, streams = line.strip().split()
                yield int(streams), int(artist)

运行下面命令,来完成top10的计算

PYTHONPATH=‘‘ luigi --module top_artists Top10Artists  --local-scheduler --date-interval 2012-06

最终会在data目录下产生新的文件:

(my_python_env)[[email protected] pythonCode]# ls data/
artist_streams_2012-06.tsv    streams_2012_06_06_faked.tsv  streams_2012_06_12_faked.tsv  streams_2012_06_18_faked.tsv  streams_2012_06_24_faked.tsv  streams_2012_06_30_faked.tsv
streams_2012_06_01_faked.tsv  streams_2012_06_07_faked.tsv  streams_2012_06_13_faked.tsv  streams_2012_06_19_faked.tsv  streams_2012_06_25_faked.tsv  top_artists_2012-06.tsv
streams_2012_06_02_faked.tsv  streams_2012_06_08_faked.tsv  streams_2012_06_14_faked.tsv  streams_2012_06_20_faked.tsv  streams_2012_06_26_faked.tsv
streams_2012_06_03_faked.tsv  streams_2012_06_09_faked.tsv  streams_2012_06_15_faked.tsv  streams_2012_06_21_faked.tsv  streams_2012_06_27_faked.tsv
streams_2012_06_04_faked.tsv  streams_2012_06_10_faked.tsv  streams_2012_06_16_faked.tsv  streams_2012_06_22_faked.tsv  streams_2012_06_28_faked.tsv
streams_2012_06_05_faked.tsv  streams_2012_06_11_faked.tsv  streams_2012_06_17_faked.tsv  streams_2012_06_23_faked.tsv  streams_2012_06_29_faked.tsv
(my_python_env)[[email protected] pythonCode]# cat data/top_artists_2012-06.tsv
2012-06-01    2012-07-01    858    47
2012-06-01    2012-07-01    594    47
2012-06-01    2012-07-01    248    47
2012-06-01    2012-07-01    164    46
2012-06-01    2012-07-01    846    45
2012-06-01    2012-07-01    776    44
2012-06-01    2012-07-01    75    44
2012-06-01    2012-07-01    345    44
2012-06-01    2012-07-01    195    44
2012-06-01    2012-07-01    750    43

五、将top10插入到msyql中

class ArtistToplistToMysql(luigi.Task):
    date_interval = luigi.DateIntervalParameter()
    use_hadoop = luigi.BoolParameter()

    def requires(self):
        return Top10Artists(self.date_interval,self.use_hadoop)

    def run(self):
        conn = MySQLdb.connect(host=‘localhost‘, port=3306, user=‘root‘, passwd=‘123456‘, db=‘test‘,charset=‘utf8‘, use_unicode=False)
        cursor = conn.cursor()
        with self.input().open(‘r‘) as in_file:
            for line in in_file:
                start_date,end_date,artist,count = line.strip().split()
                insert_sql = "insert into artist (startdate,enddate,artist,score) values(‘%s‘,‘%s‘,‘%s‘,‘%d‘)" % (start_date , end_date, artist, int(count))
                cursor.execute(insert_sql)
        conn.commit()
        conn.close()

执行下面的命令来完成插入:

PYTHONPATH=‘‘ luigi --module top_artists ArtistToplistToMysql  --local-scheduler --date-interval 2012-06

完成之后查看数据库内容:

所有的任务到此已经完成了调试

时间: 2024-10-20 15:27:53

luigi学习2-在hadoop上运行Top Artists的相关文章

在Hadoop上运行基于RMM中文分词算法的MapReduce程序

原文:http://xiaoxia.org/2011/12/18/map-reduce-program-of-rmm-word-count-on-hadoop/ 在Hadoop上运行基于RMM中文分词算法的MapReduce程序 23条回复 我知道这个文章标题很“学术”化,很俗,让人看起来是一篇很牛B或者很装逼的论文!其实不然,只是一份普通的实验报告,同时本文也不对RMM中文分词算法进行研究.这个实验报告是我做高性能计算课程的实验里提交的.所以,下面的内容是从我的实验报告里摘录出来的,当作是我学

在hadoop上运行java文件

hadoop 2.x版本 编译:javac -d . -classpath /usr/lib/hadoop/hadoop-common-2.2.0.2.0.6.0-102.jar TestGetPathMark.java 在com的同级目录上建立manifest.mf 在里面写上Main-Class: com.test.path.mark.TestGetPathMark d打包:然后保存并执行jar -cvfm test.jar manifest.mf com/ 然后执行hadoop jar t

深度学习之在iOS上运行CNN

1 引言 作为曾经的iOS开发者,在研究深度学习的时候,总有一个想法就是在iPhone上运行深度学习,不管是在手机上训练还是利用训练好的数据进行测试. 因为iOS的开发环境支持C++,因此,只要你的代码是C/C++,本质上就可以在iOS上运行. 怎么才能更快更好地在iOS上运行CNN呢? 2 方法1:通过Matlab转码 Matlab自带转成c的工具,如果你研究过UFLDL的深度学习教程,就知道如何在Matlab上使用CNN,那么,转换成c后,放到iOS的开发环境中,然后将参数存成txt格式再读

原生态在Hadoop上运行Java程序

第一种:原生态运行jar包1,利用eclipse编写Map-Reduce方法,一般引入Hadoop-core-1.1.2.jar.注意这里eclipse里没有安装hadoop的插件,只是引入其匝包,该eclipse可以安装在windows或者linux中,如果是在windows中安装的,且在其虚拟机安装的linux,可以通过共享文件夹来实现传递.2,编写要测试的数据,如命名为tempdata3,利用eclipse的export来打包已编写好的,在利用eclipse打包jar的时候,只需要选择sr

让python在hadoop上跑起来

duang~好久没有更新博客啦,原因很简单,实习啦-好吧,我过来这边上班表示觉得自己简直弱爆了.第一周,配置环境:第二周,将数据可视化,包括学习了excel2013的一些高大上的技能,例如数据透视表和mappower绘制3d地图,当然本来打算是在tkinter里面运用matplotlib制作一个交互式的图表界面,然而,画出来的图简直不是excel2013能比的,由于对界面和matplotlib研究的也不是很深,短时间是没法研究出来,上周真是多灾多难:现在,第三周,开始接触hadoop,虽说大多数

Hadoop学习总结之五:Hadoop的运行痕迹

Hadoop学习总结之五:Hadoop的运行痕迹 Hadoop 学习总结之一:HDFS简介 Hadoop学习总结之二:HDFS读写过程解析 Hadoop学习总结之三:Map-Reduce入门 Hadoop学习总结之四:Map-Reduce的过程解析 在使用hadoop的时候,可能遇到各种各样的问题,然而由于hadoop的运行机制比较复杂,因而出现了问题的时候比较难于发现问题. 本文欲通过某种方式跟踪Hadoop的运行痕迹,方便出现问题的时候可以通过这些痕迹来解决问题. 一.环境的搭建 为了能够跟

学习笔记:Caffe上配置和运行MNIST

学习笔记:Caffe上配置和运行MNIST MNIST,一个经典的手写数字库,包含60000个训练样本和10000个测试样本,图片大小28*28,在Caffe上配置的第一个案例 1首先,获取minist的数据包. 这个版本是四个数据包cd $CAFFE_ROOT./data/mnist/get_mnist.sh [html] view plaincopy #!/usr/bin/env sh # This scripts downloads the mnist data and unzips it

学习笔记:Caffe上配置和运行Cifar10的示例

学习笔记:Caffe上配置和运行Cifar10的示例 CIFAR-10数据集含有6万个32*32的彩色图像,共分为10种类型,由 Alex Krizhevsky, Vinod Nair和 Geoffrey Hinton收集而来.包含50000张训练图片,10000张测试图片 http://www.cs.toronto.edu/~kriz/cifar.html 数据集的数据存在一个10000*3072 的 numpy数组中,单位是uint8s,3072是存储了一个32*32的彩色图像.(3072=

hadoop学习笔记(七)——hadoop权威指南中天气数据运行

1) hdfs文件系统准备工作 a) # hadoop fs –ls /user/root #查看hdfs文件系统 b) # hadoop fs -rm /user/root/output02/part-r-00000 c) 删除文档,删除文件夹 d) # hadoop fs -rm –r /user/root/output02 e) # hadoop fs –mkdir –p input/ncdc f) 解压缩输入文件,hadoop无法识别.zip或者.rar g) # hadoop fs -