数据挖掘:基于Spark+HanLP实现影视评论关键词抽取(1)

1. 背景

近日项目要求基于爬取的影视评论信息,抽取影视的关键字信息。考虑到影视评论数据量较大,因此采用Spark处理框架。关键词提取的处理主要包含分词+算法抽取两部分。目前分词工具包较为主流的,包括哈工大的LTP以及HanLP,而关键词的抽取算法较多,包括TF-IDF、TextRank、互信息等。本次任务主要基于LTP、HanLP、Ac双数组进行分词,采用TextRank、互信息以及TF-IDF结合的方式进行关键词抽取。

说明:本项目刚开始接触,因此效果层面需迭代调优。

2. 技术选型

(1) 词典

1) 基于HanLP项目提供的词典数据,具体可参见HanLP的github

2) 考虑到影视的垂直领域特性,引入腾讯的嵌入的汉语词,参考该地址

(2) 分词

1) LTP分词服务:基于Docker Swarm部署多副本集服务,通过HTTP协议请求,获取分词结果(部署方法可百度); 也可以直接在本地加载,放在内存中调用,效率更高(未尝试)

2) AC双数组:基于AC双数组,采用最长匹配串,采用HanLP中的AC双数组分词器

(3) 抽取

1) 经典的TF-IDF:基于词频统计实现

2) TextRank:借鉴于PageRank算法,基于HanLP提供的接口

3) 互信息:基于HanLP提供的接口

3. 实现代码

(1) 代码结构

1) 代码将分词服务进行函数封装,基于不同的名称,执行名称指定的分词

2) TextRank、互信息、LTP、AC双数组等提取出分词或短语,最后均通过TF-IDF进行统计计算

(2) 整体代码

1) 主体代码:细节层面与下载的原始评论数据结构有关,因此无需过多关注,只需关注下主体流程即可

  1
  2 def extractFilmKeyWords(algorithm: String): Unit ={
  3     // 测试
  4 	println(HanLPSpliter.getInstance.seg("如何看待《战狼2》中的爱国情怀?"))
  5
  6     val sc = new SparkContext(new SparkConf().setAppName("extractFileKeyWords").set("spark.driver.maxResultSize", "3g"))
  7
  8     val baseDir = "/work/ws/video/parse/key_word"
  9
 10     import scala.collection.JavaConversions._
 11     def extractComments(sc: SparkContext, inputInfo: (String, String)): RDD[(String, List[String])] = {
 12       sc.textFile(s"$baseDir/data/${inputInfo._2}")
 13         .map(data => {
 14           val json = JSONObjectEx.fromObject(data.trim)
 15           if(null == json) ("", List())
 16           else{
 17             val id = json.getStringByKeys("_id")
 18             val comments: List[String] = json.getArrayInfo("comments", "review").toList
 19             val reviews: List[String] = json.getArrayInfo("reviews", "review").toList
 20             val titles: List[String] = json.getArrayInfo("reviews", "title").toList
 21             val texts = (comments ::: reviews ::: titles).filter(f => !CleanUtils.isEmpty(f))
 22             (IdBuilder.getSourceKey(inputInfo._1, id), texts)
 23           }
 24         })
 25     }
 26
 27     // 广播停用词
 28     val filterWordRdd = sc.broadcast(sc.textFile(s"$baseDir/data/stopwords.txt").map(_.trim).distinct().collect().toList)
 29
 30     def formatOutput(infos: List[(Int, String)]): String ={
 31       infos.map(info => {
 32         val json = new JSONObject()
 33         json.put("status", info._1)
 34         try{
 35           json.put("res", info._2)
 36         } catch {
 37           case _ => json.put("res", "[]")
 38         }
 39         json.toString.replaceAll("[\\s]+", "")
 40       }).mkString(" | ")
 41     }
 42
 43     def genContArray(words: List[String]): JSONArray ={
 44       val arr = new JSONArray()
 45       words.map(f => {
 46         val json = new JSONObject()
 47         json.put("cont", f)
 48         arr.put(json)
 49       })
 50       arr
 51     }
 52
 53 	// 基于LTP分词服务
 54     def splitWordByLTP(texts: List[String]): List[(Int, String)] ={
 55       texts.map(f => {
 56         val url = "http://dev.content_ltp.research.com/ltp"
 57         val params = new util.HashMap[String, String]()
 58         params.put("s", f)
 59         params.put("f", "json")
 60         params.put("t", "ner")
 61         // 调用LTP分词服务
 62         val result = HttpPostUtil.httpPostRetry(url, params).replaceAll("[\\s]+", "")
 63         if (CleanUtils.isEmpty(result)) (0, f) else {
 64           val resultArr = new JSONArray()
 65
 66           val jsonArr = try { JSONArray.fromString(result) } catch { case _ => null}
 67           if (null != jsonArr && 0 < jsonArr.length()) {
 68             for (i <- 0 until jsonArr.getJSONArray(0).length()) {
 69               val subJsonArr = jsonArr.getJSONArray(0).getJSONArray(i)
 70               for (j <- 0 until subJsonArr.length()) {
 71                 val subJson = subJsonArr.getJSONObject(j)
 72                 if(!filterWordRdd.value.contains(subJson.getString("cont"))){
 73                   resultArr.put(subJson)
 74                 }
 75               }
 76             }
 77           }
 78           if(resultArr.length() > 0) (1, resultArr.toString) else (0, f)
 79         }
 80       })
 81     }
 82
 83 	// 基于AC双数组搭建的分词服务
 84     def splitWordByAcDoubleTreeServer(texts: List[String]): List[(Int, String)] ={
 85       texts.map(f => {
 86         val splitResults = SplitQueryHelper.splitQueryText(f)
 87           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
 88         if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString)
 89       })
 90     }
 91
 92 	// 内存加载AC双数组
 93     def splitWordByAcDoubleTree(texts: List[String]): List[(Int, String)] ={
 94       texts.map(f => {
 95         val splitResults =  HanLPSpliter.getInstance().seg(f)
 96           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
 97         if (0 == splitResults.size) (0, f) else (1, genContArray(splitResults).toString)
 98       })
 99     }
100
101 	// TextRank
102     def splitWordByTextRank(texts: List[String]): List[(Int, String)] ={
103       texts.map(f => {
104         val splitResults = HanLP.extractKeyword(f, 100)
105           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
106         if (0 == splitResults.size) (0, f) else {
107           val arr = genContArray(splitResults)
108           if(0 == arr.length()) (0, f) else (1, arr.toString)
109         }
110       })
111     }
112
113 	// 互信息
114     def splitWordByMutualInfo(texts: List[String]): List[(Int, String)] ={
115       texts.map(f => {
116         val splitResults = HanLP.extractPhrase(f, 50)
117           .filter(f => !CleanUtils.isEmpty(f) && !filterWordRdd.value.contains(f.toLowerCase)).toList
118         if (0 == splitResults.size) (0, f) else {
119           val arr = genContArray(splitResults)
120           if(0 == arr.length()) (0, f) else (1, arr.toString)
121         }
122       })
123     }
124
125     // 提取分词信息
126     val unionInputRdd = sc.union(
127 	  extractComments(sc, SourceType.DB -> "db_review.json"),
128       extractComments(sc, SourceType.MY -> "my_review.json"),
129       extractComments(sc, SourceType.MT -> "mt_review.json"))
130       .filter(_._2.nonEmpty)
131
132     unionInputRdd.cache()
133
134     unionInputRdd.map(data => {
135       val splitResults = algorithm match {
136         case "ltp" => splitWordByLTP(data._2)
137         case "acServer" => splitWordByAcDoubleTreeServer(data._2)
138         case "ac" => splitWordByAcDoubleTree(data._2)
139         case "textRank" => splitWordByTextRank(data._2)
140         case "mutualInfo" => splitWordByMutualInfo(data._2)
141       }
142
143       val output = formatOutput(splitResults)
144       s"${data._1}\t$output"
145     }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/wordSplit/$algorithm"))
146
147     val splitRDD = sc.textFile(s"$baseDir/result/wordSplit/$algorithm/part*", 30)
148       .flatMap(data => {
149         if(data.split("\\t").length < 2) None
150         else{
151           val sourceKey = data.split("\\t")(0)
152           val words = data.split("\\t")(1).split(" \\| ").flatMap(f => {
153             val json = JSONObjectEx.fromObject(f.trim)
154             if (null != json && "1".equals(json.getStringByKeys("status"))) {
155               val jsonArr = try { JSONArray.fromString(json.getStringByKeys("res")) } catch { case _ => null }
156               var result: List[(String, String)] = List()
157               if (jsonArr != null) {
158                 for (j <- 0 until jsonArr.length()) {
159                   val json = jsonArr.getJSONObject(j)
160                   val cont = json.getString("cont")
161                   result ::= (cont, cont)
162                 }
163               }
164               result.reverse
165             } else None
166           }).toList
167           Some((sourceKey, words))
168         }
169       }).filter(_._2.nonEmpty)
170
171     splitRDD.cache()
172
173     val totalFilms = splitRDD.count()
174
175     val idfRdd = splitRDD.flatMap(result => {
176       result._2.map(_._1).distinct.map((_, 1))
177     }).groupByKey().filter(f => f._2.size > 1).map(f => (f._1, Math.log(totalFilms * 1.0 / (f._2.sum + 1))))
178
179     idfRdd.cache()
180     idfRdd.map(f => s"${f._1}\t${f._2}").saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/idf/$algorithm"))
181
182     val idfMap = sc.broadcast(idfRdd.collectAsMap())
183     // 计算TF
184     val tfRdd = splitRDD.map(result => {
185       val totalWords = result._2.size
186       val keyWords = result._2.groupBy(_._1)
187         .map(f => {
188           val word = f._1
189           val tf = f._2.size * 1.0 / totalWords
190           (tf * idfMap.value.getOrElse(word, 0D), word)
191         }).toList.sortBy(_._1).reverse.filter(_._2.trim.length > 1).take(50)
192       (result._1, keyWords)
193     })
194
195     tfRdd.cache()
196     tfRdd.map(f => {
197       val json = new JSONObject()
198       json.put("_id", f._1)
199
200       val arr = new JSONArray()
201       for (keyWord <- f._2) {
202         val subJson = new JSONObject()
203         subJson.put("score", keyWord._1)
204         subJson.put("word", keyWord._2)
205         arr.put(subJson)
206       }
207       json.put("keyWords", arr)
208       json.toString
209     }).saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/withScore"))
210
211     tfRdd.map(f => s"${f._1}\t${f._2.map(_._2).toList.mkString(",")}")
212       .saveAsTextFile(HDFSFileUtil.clean(s"$baseDir/result/keyword/$algorithm/noScore"))
213
214     tfRdd.unpersist()
215
216     splitRDD.unpersist()
217     idfMap.unpersist()
218     idfRdd.unpersist()
219
220     unionInputRdd.unpersist()
221     filterWordRdd.unpersist()
222     sc.stop()
223   }

2) 基于HanLP提供的AC双数组封装

  1
  2 import com.google.common.collect.Lists;
  3 import com.hankcs.hanlp.HanLP;
  4 import com.hankcs.hanlp.seg.Segment;
  5 import com.hankcs.hanlp.seg.common.Term;
  6 import org.slf4j.Logger;
  7 import org.slf4j.LoggerFactory;
  8
  9 import java.io.Serializable;
 10 import java.util.List;
 11
 12 public class HanLPSpliter implements Serializable{
 13     private static Logger logger = LoggerFactory.getLogger(Act.class);
 14
 15     private static HanLPSpliter instance = null;
 16
 17     private static Segment segment = null;
 18
 19     private static final String PATH = "conf/tencent_word_act.txt";
 20
 21     public static HanLPSpliter getInstance() {
 22         if(null == instance){
 23             instance = new HanLPSpliter();
 24         }
 25         return instance;
 26     }
 27
 28     public HanLPSpliter(){
 29         this.init();
 30     }
 31
 32     public void init(){
 33         initSegment();
 34     }
 35
 36     public void initSegment(){
 37         if(null == segment){
 38             addDict();
 39             HanLP.Config.IOAdapter = new HadoopFileIOAdapter();
 40             segment = HanLP.newSegment("dat");
 41             segment.enablePartOfSpeechTagging(true);
 42             segment.enableCustomDictionaryForcing(true);
 43         }
 44     }
 45
 46     public List<String> seg(String text){
 47         if(null == segment){
 48             initSegment();
 49         }
 50
 51         List<Term> terms = segment.seg(text);
 52         List<String> results = Lists.newArrayList();
 53         for(Term term : terms){
 54             results.add(term.word);
 55         }
 56         return results;
 57     }
 58 }

3) HanLP加载HDFS中的自定义词典

  1 import com.hankcs.hanlp.corpus.io.IIOAdapter;
  2 import org.apache.hadoop.conf.Configuration;
  3 import org.apache.hadoop.fs.FileSystem;
  4 import org.apache.hadoop.fs.Path;
  5
  6 import java.io.IOException;
  7 import java.io.InputStream;
  8 import java.io.OutputStream;
  9 import java.net.URI;
 10
 11 public class HadoopFileIOAdapter implements IIOAdapter{
 12     @Override
 13     public InputStream open(String path) throws IOException {
 14         Configuration conf = new Configuration();
 15         FileSystem fs = FileSystem.get(URI.create(path), conf);
 16         return fs.open(new Path(path));
 17     }
 18
 19     @Override
 20     public OutputStream create(String path) throws IOException {
 21         Configuration conf = new Configuration();
 22         FileSystem fs = FileSystem.get(URI.create(path), conf);
 23         OutputStream out = fs.create(new Path(path));
 24         return out;
 25     }
 26 }

4. 采坑总结

(1) Spark中实现HanLP自定义词典的加载

由于引入腾讯的嵌入词,因此使用HanLP的自定义词典功能,参考的方法如下:

a. 《基于hanLP的中文分词详解-MapReduce实现&自定义词典文件》,该方法适用于自定义词典的数量较少的情况,如果词典量较大,如腾讯嵌入词820W+,理论上jar包较为臃肿

b. 《Spark中使用HanLP分词》,该方法的好处在于无需手工构件词典的bin文件,操作简单

切记:如果想让自定义词典生效,需先将data/dictionary/custom中的bin文件删除。通过HanLP源码得知,如果存在bin文件,则直接该bin文件,否则会将custom中用户自定义的词典重新加载,在指定的环境中(如本地或HDFS)中自动生成bin文件。

腾讯820W词典,基于HanLP生成bin文件的时间大概为30分钟。

(2) Spark异常

Spark执行过程中的异常信息:

1) 异常1

a. 异常信息:

Job aborted due to stage failure: Total size of serialized results of 3979 tasks (1024.2 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)

b. 解决:通过设置spark.driver.maxResultSize=4G,参考:《Spark排错与优化

2) 异常2

a. 异常信息:java.lang.OutOfMemoryError: Java heap space

b. 解决:参考https://blog.csdn.net/guohecang/article/details/52088117

如有问题,请留言回复!

原文地址:https://www.cnblogs.com/mengrennwpu/p/9902276.html

时间: 2024-10-11 04:09:26

数据挖掘:基于Spark+HanLP实现影视评论关键词抽取(1)的相关文章

31页PPT:基于Spark的移动大数据挖掘

31页PPT:基于Spark的移动大数据挖掘 数盟11.16 Data Science Meetup(DSM北京)分享:基于Spark的移动大数据挖掘分享嘉宾:张夏天(TalkingData首席数据科学家) @张夏天_机器学习 内容提要: TalkingData移动数据服务现状和挑战 为什么选择Spark TalkingData移动大数据挖掘 应用.系统和算法 Spark不是全部 以下为详细内容:

大数据实时处理-基于Spark的大数据实时处理及应用技术培训

随着互联网.移动互联网和物联网的发展,我们已经切实地迎来了一个大数据 的时代.大数据是指无法在一定时间内用常规软件工具对其内容进行抓取.管理和处理的数据集合,对大数据的分析已经成为一个非常重要且紧迫的需求.目前对大数据的分析工具,首选的是Hadoop/Yarn平台,但目前对大数据的实时分析工具,业界公认最佳为Spark.Spark是基于内存计算的大数据并行计算框架,Spark目前是Apache软件基金会旗下,顶级的开源项目,Spark提出的DAG作为MapReduce的替代方案,兼容HDFS.H

飞谷云六期第三组——基于Spark的机器学习

项目正式开始时间:2015.10.15. 随笔内容:本次项目的主题是基于Spark的ML.对于ML的学习有大概半年了,正好在网上关注到了由上海交通大学所主办的这个飞谷云的大数据项目,我所报名的这期已经是飞谷云的第六期了,在网上和群里了解了一段时间后大算报名参与一次,毕竟之前没有参与过真正的项目开发,也刚好趁着在学习ML的这个时间通过项目把理论和实践都加强.在这篇随笔中,我打算把这次项目的每个过程都写进来,一是为了给正在进行的项目提供一个全程记录:二是给自己一个留念,毕竟是自己独立完成的一个ML方

京东基于Spark的风控系统架构实践和技术细节

京东基于Spark的风控系统架构实践和技术细节 时间 2016-06-02 09:36:32  炼数成金 原文  http://www.dataguru.cn/article-9419-1.html 主题 Spark软件架构 1.背景 互联网的迅速发展,为电子商务兴起提供了肥沃的土壤.2014年,中国电子商务市场交易规模达到13.4万亿元,同比增长31.4%.其中,B2B电子商务市场交易额达到10万亿元,同比增长21.9%.这一连串高速增长的数字背后,不法分子对互联网资产的觊觎,针对电商行业的恶

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

基于Spark的机器学习实践 (九) - 聚类算法

0 相关源码 1 k-平均算法(k-means clustering)概述 1.1 回顾无监督学习 ◆ 分类.回归都属于监督学习 ◆ 无监督学习是不需要用户去指定标签的 ◆ 而我们看到的分类.回归算法都需要用户输入的训练数据集中给定一个个明确的y值 1.2 k-平均算法与无监督学习 ◆ k-平均算法是无监督学习的一种 ◆ 它不需要人为指定一个因变量,即标签y ,而是由程序自己发现,给出类别y ◆ 除此之外,无监督算法还有PCA,GMM等 源于信号处理中的一种向量量化方法,现在则更多地作为一种聚类

基于Spark MLlib平台的协同过滤算法---电影推荐系统

基于Spark MLlib平台的协同过滤算法---电影推荐系统 又好一阵子没有写文章了,阿弥陀佛...最近项目中要做理财推荐,所以,回过头来回顾一下协同过滤算法在推荐系统中的应用. 说到推荐系统,大家可能立马会想到协同过滤算法.本文基于Spark MLlib平台实现一个向用户推荐电影的简单应用.其中,主要包括三部分内容: 协同过滤算法概述 基于模型的协同过滤应用---电影推荐 实时推荐架构分析     一.协同过滤算法概述 本人对算法的研究,目前还不是很深入,这里简单的介绍下其工作原理. 通常,

基于spark排序的一种更廉价的实现方案-附基于spark的性能测试

排序可以说是很多日志系统的硬指标(如按照时间逆序排序),如果一个大数据系统不能进行排序,基本上是这个系统属于不可用状态,排序算得上是大数据系统的一个"刚需",无论大数据采用的是hadoop,还是spark,还是impala,hive,总之排序是必不可少的,排序的性能测试也是必不可少的. 有着计算奥运会之称的Sort Benchmark全球排序每年都会举行一次,每年巨头都会在排序上进行巨大的投入,可见排序速度的高低有多么重要!但是对于大多数企业来说,动辄上亿的硬件投入,实在划不来.甚至远

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现

UserView--第二种方式(避免第一种方式Set饱和),基于Spark算子的java代码实现 测试数据 java代码 1 package com.hzf.spark.study; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import org.apache.spark.SparkConf; 7 import org.apache.spark.api.java.JavaPairRDD; 8 import org.apache.s