Spark2.0 特征提取、转换、选择之二:特征选择、文本处理,以中文自然语言处理(情感分类)为例

特征选择

RFormula

RFormula是一个很方便,也很强大的Feature选择(自由组合的)工具。 
输入string 进行独热编码(见下面例子country) 
输入数值型转换为double(见下面例子hour) 
label为string,也用StringIndexer进行编号

RFormula produces a vector column of features and a double or string
column of label. Like when formulas are used in R for linear regression,
string input columns will be one-hot encoded, and numeric columns will be
cast to doubles. If the label column is of type string, it will be first
transformed to double with StringIndexer. If the label column does not exist
in the DataFrame, the output label column will be created from the specified
response variable in the formula.
如输入数据为
+---+-------+----+------+-------+
| id|country|hour|salary|clicked|
+---+-------+----+------+-------+
|  7|     US|  18|2500.0|    1.0|
|  8|     CA|  12|1500.0|    0.0|
|  9|     NZ|  15|1250.0|    0.0|
| 10|     US|  10|3200.0|    1.0|
+---+-------+----+------+-------+
使用RFormula:
RFormula formula = new RFormula()
                  .setFormula("clicked ~ country + hour + salary")
                  //clicked作为label,~之后的三个为选择的特征
                  .setFeaturesCol("features")
                  .setLabelCol("label");
+---------------------+-----+
|features             |label|
+---------------------+-----+
|[1.0,0.0,18.0,2500.0]|1.0  |
|[0.0,0.0,12.0,1500.0]|0.0  |
|[0.0,1.0,15.0,1250.0]|0.0  |
|[1.0,0.0,10.0,3200.0]|1.0  |
+---------------------+-----+
[1.0,0.0]为"US"的独热编码,以此类推             

卡方独立检验

ChiSqSelector 
参考: 
http://www.blogjava.net/zhenandaci/archive/2008/08/31/225966.html 
里面举得例子很好理解(原文真的很通俗易懂,直接参考原文吧,瞬间明白的感觉)。

在Spark中似乎非常慢???

ChiSqSelector chiSqSelector=new ChiSqSelector()
                .setFeaturesCol("TF")
                .setOutputCol("features")
                .setLabelCol("label")
                .setNumTopFeatures(2);
Dataset<Row> wordsChiSq=chiSqSelector.fit(wordsTF).transform(wordsTF);

文本转换及特征提取

英文分词

中文分词

中文分词工具比较多,Java,Python版本都有,这里以IKAnalyzer2012+Java版本为例说明。 
     使用时参考IKAnalyzer2012自带的中文帮助文档(有比较详细的用法)。 
     IKAnalyzer2012它 的 安 装 部 署 十 分 简 单 , 将 IKAnalyzer2012.jar 部 署 于 项 目 的 lib 目 录 中 ;IKAnalyzer.cfg.xml 与 stopword.dic 文件放置在 class 根目录。 
     依赖Lucene的类org.wltea.analyzer.luceneorg.wltea.analyzer,分词主类。

//创建分词对象
Analyzer anal=new IKAnalyzer(true);
StringReader reader=new StringReader(row.getString(1));
//分词
TokenStream ts=anal.tokenStream("", reader);
CharTermAttribute term=(CharTermAttribute) ts
                    .getAttribute(CharTermAttribute.class);
                    //遍历分词数据
                    String words="";
                    while(ts.incrementToken()){
                        words+=(term.toString()+"|");
                    }  

正则表达式分词

word2vect

TF-IDF

去停用词

应用例子

下面是一个综合的实例子,用到了Spark的一些特征转换API。由于需要处理中文,还需要一个分词器。

准备语料

(I)首先准备一个数据集,谭松波老师收集的中文情感分析酒店评论语料 
从CSDN上可以下载:http://download.csdn.net/download/x_i_y_u_e/9273533 
1、-ChnSentiCorp-Htl-ba-2000: 平衡语料,正负类各1000篇。 
2、ChnSentiCorp-Htl-ba-4000: 平衡语料,正负类各2000篇。 
3、ChnSentiCorp-Htl-ba-6000: 平衡语料,正负类各3000篇。 
4、ChnSentiCorp-Htl-unba-10000: 非平衡语料,正类为7000篇。

(II) 在linux下是乱码的,需要转换: 
编码查看: 
%file -i neg.9.txt 
neg.9.txt: text/plain; charset=iso-8859-1 
需要转换为utf8 
%iconv -f gb18030 -t utf8 neg.9.txt -o neg.9.o.txt 
-f :from -t :to 
批量转如下: 
(1)复制文件目录 find ChnSen* -type d -exec mkdir -p utf/{} \; 
(2)转换 find ChnSen* -type f -exec iconv -f GBK -t UTF-8 {} -o utf/{} \; 
ChnSen*是单前文件夹下的目录,utf是输出目录

(III) python 处理文件,合并为一个文件,去掉一条评论中所有换行

# -*- coding: utf-8 -*-
#将所有评论读入到一个文件中,每个原始文件文件为一条评论,中间不要出现换行
#repalce("\r"," "),是将^M(window下产生的 linux不认识的换行)去掉
#输出csv格式

path_out="E:/data/utf/ChnSentiCorp_htl_ba_2000/all.csv"
fw=open(path_out,‘w+‘)

#负样本
for i in range(999):    

    path1="E:/data/utf/ChnSentiCorp_htl_ba_2000/neg."+str(i+1)+".txt"
    fr=open(path1)
    lines=fr.readlines()

    fw.write("0.0,")#label    

    for line in lines:
        #repalce("\r"," "),是将^M(window下产生的 linux不认识的换行)去掉
        #replace(",",""),是为输出CSV格式做准备
        line=line.replace("\r",‘‘).strip().replace(",","")
        fw.write(line)

    fw.write("\n")
    fr.close()

#正样本
for i in range(999):   

    path2="E:/data/utf/ChnSentiCorp_htl_ba_2000/pos."+str(i+1)+".txt"
    fr=open(path2)

    fw.write("1.0,")#label    

    lines=fr.readlines()
    for line in lines:
        line=line.replace("\r",‘‘).strip().replace(",","")
        fw.write(line)
    fw.write("\n")

    fr.close()

fw.close

完整流程

可参考论文(只是分词工具不同): 
基于 Spark 的文本情感分析 
http://www.ibm.com/developerworks/cn/cognitive/library/cc-1606-spark-seniment-analysis/index.html 
思路是一样的,不过我是用Java实现的,写起来远远不如Python简洁。

//初步完整的流程,还需要进一步优化
//IKAnalyzer2012分词->TF-IDF特征->NaiveBayes ML

package my.spark.ml.practice.classification;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.ml.classification.NaiveBayes;
import org.apache.spark.ml.classification.NaiveBayesModel;

import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator;

import org.apache.spark.ml.feature.HashingTF;
import org.apache.spark.ml.feature.IDF;
import org.apache.spark.ml.feature.IDFModel;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
import java.io.StringReader;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;

//引用IKAnalyzer2012的类
import org.wltea.analyzer.lucene.IKAnalyzer;

import my.spark.ml.practice.classification.LabelValue;;

//文本处理,酒店评论
public class myHotelTextClassifer3 {        

    public static void main(String[] args) throws IOException {

        SparkSession spark=SparkSession
                .builder()
                .appName("Chinese Text Processing")
                .master("local[4]")
                .config("spark.sql.warehouse.dir",
                        "file///:G:/Projects/Java/Spark/spark-warehouse" )
                .getOrCreate();  

        //屏蔽日志
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
        Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF);
        //--------------------------(0)读入数据,数据预处理--------------------------------
        //原始数据文件包含两行,一行是label,一行是sentence,csv格式的
        Dataset<Row> raw=spark.read().format("csv")
                .load("E:/data/utf/ChnSentiCorp_htl_ba_2000/all.csv");        

        //去掉空值,不然一定后面一定抛出nullpointer异常
        //distinct数据去重 ,并打乱数据的顺序:不然数据是先负样本后正样本有规律排列的,预测效果偏高
        Dataset<Row> rawFilterNaN=raw
                .filter(raw.col("_c0").isNotNull())
                .filter(raw.col("_c1").isNotNull())
                .distinct();              

        //--------------------------(1)分词----------------------------------------
        //为Map自定义了Class LabelValue,见后面
        Encoder<LabelValue> LongStringEncoder=Encoders.bean(LabelValue.class);
        Dataset<LabelValue> wordsDF=rawFilterNa.map(new MapFunction<Row,LabelValue>() {
            @Override
            public LabelValue call(Row row) throws Exception {
                if (row!=null) {
                    LabelValue ret = new LabelValue();
                    double Label=1.0;
                    if (row.getString(0).equals("0.0")) {
                        Label=0.0;
                    }else{
                        Label=1.0;
                    }
                    ret.setLabel(Label);

                    //-------------KAnalyzer分词--------------------
                    //创建分词对象
                    Analyzer anal=new IKAnalyzer(true);
                    StringReader reader=new StringReader(row.getString(1));
                    //分词
                    TokenStream ts=anal.tokenStream("", reader);
                    CharTermAttribute term=(CharTermAttribute) ts
                            .getAttribute(CharTermAttribute.class);
                    //遍历分词数据
                    String words="";
                    while(ts.incrementToken()){
                        words+=(term.toString()+"|");
                    }
                    ret.setValue(words);
                    reader.close();

                    return ret;
                }
                else {
                    return null;
                }
            }

        }, LongStringEncoder);        

        //--------------------------(1)-2 RegexTokenizer分词器-----------------------------
        RegexTokenizer regexTokenizer = new RegexTokenizer()
                      .setInputCol("value")
                      .setOutputCol("words")
                      .setPattern("\\|");

        Dataset<Row> wordsDF2 = regexTokenizer.transform(wordsDF); 

        //--------------------------(2) HashingTF训练词频矩阵---------------------------------       

        HashingTF tf=new HashingTF()
                .setInputCol("words")
                .setOutputCol("TF");
        Dataset<Row> wordsTF=tf.transform(wordsDF2).select("TF","label");
        wordsTF.show();wordsTF.printSchema();
        Dataset<Row> wordsTF2=wordsTF
                .filter(wordsTF.col("TF").isNotNull())
                .filter(wordsTF.col("label").isNotNull());        

        //------------------------- (4)计算 TF-IDF 矩阵--------------------------------------
        IDFModel idf=new IDF()
                .setInputCol("TF")
                .setOutputCol("features")
                .fit(wordsTF2);
        Dataset<Row> wordsTfidf=idf.transform(wordsTF2);         

       //----------------------------(5)NaiveBayesModel ML---------------------
        Dataset<Row>[] split=wordsTfidf.randomSplit(new double[]{0.6,0.4});
        Dataset<Row> training=split[0];
        Dataset<Row> test=split[1];          

        NaiveBayes naiveBayes=new NaiveBayes()
                .setLabelCol("label")
                .setFeaturesCol("features");
        NaiveBayesModel naiveBayesModel=naiveBayes.fit(training);

        Dataset<Row> predictDF=naiveBayesModel.transform(test);

        //自定义计算accuracy
        double total=(double) predictDF.count();
        Encoder<Double> doubleEncoder=Encoders.DOUBLE();

        Dataset<Double> accuracyDF=predictDF.map(new MapFunction<Row,Double>() {
            @Override
            public Double call(Row row) throws Exception {
                if((double)row.get(1)==(double)row.get(5)){return 1.0;}
                else {return 0.0;}
            }
        }, doubleEncoder);       

        accuracyDF.createOrReplaceTempView("view");
        double correct=(double) spark.sql("SELECT value FROM view WHERE value=1.0").count();
        System.out.println("accuracy "+(correct/total));

        //计算areaUnderRoc
        double areaUnderRoc=new BinaryClassificationEvaluator()
                    .setLabelCol("label")
                    .setRawPredictionCol("prediction")
                    .evaluate(predictDF);
        //(areaUnderROC|areaUnderPR) (default: areaUnderROC)
        System.out.println("areaUnderRoc "+areaUnderRoc);
   }
}

//结果分析
//accuracy 0.7957860615883307
//areaUnderRoc 0.7873761854583772
//应该还有提升的空间

//Class LabelValue
package my.spark.ml.practice.classification;

import java.io.Serializable;

public class LabelValue implements Serializable {
  private String value;
  private double label;

  public String getValue() {
    return value;
  }

  public void setValue(String value) {
    this.value = value;
  }

  public double getLabel() {
        return label;
      }

  public void setLabel(double label) {
        this.label = label;
      }
}

先使用word2vect,然后将词产生的向量作为特征,分别用随机森林,GBTC, 
LogisticRegression,其中GBTC效果最好。但是普遍不如Naive-Bayes,可能还需要某些地方进行改进。

//转换为词向量,并进行标准化
Word2Vec word2Vec=new Word2Vec()
                .setInputCol("words")
                .setOutputCol("vect")
                .setVectorSize(10);
        Dataset<Row> vect=word2Vec
                .fit(wordsDF2)
                .transform(wordsDF2);
        //vect.show();vect.printSchema();
        //正则化
        Dataset<Row> vect2=new MinMaxScaler()
                .setInputCol("vect")
                .setOutputCol("features")
                .setMax(1.0)
                .setMin(0.0)
                .fit(vect)
                .transform(vect);
        //vect2.show();vect2.printSchema();

//GBTC分类
GBTClassifier gbtc=new GBTClassifier()
                .setLabelCol("label")
                .setFeaturesCol("vect")
                .setMaxDepth(10)
                .setMaxIter(10)
                .setStepSize(0.1);
        Dataset<Row> predictDF=gbtc.fit(training0).transform(test0);
 //其余代码是一样的,可以尝试不同的参数组合。 

原文地址:https://www.cnblogs.com/itboys/p/8393638.html

时间: 2024-10-28 14:45:23

Spark2.0 特征提取、转换、选择之二:特征选择、文本处理,以中文自然语言处理(情感分类)为例的相关文章

文本情感分类(二):深度学习模型

在<文本情感分类(一):传统模型>一文中,笔者简单介绍了进行文本情感分类的传统思路.传统的思路简单易懂,而且稳定性也比较强,然而存在着两个难以克服的局限性:一.精度问题,传统思路差强人意,当然一般的应用已经足够了,但是要进一步提高精度,却缺乏比较好的方法:二.背景知识问题,传统思路需要事先提取好情感词典,而这一步骤,往往需要人工操作才能保证准确率,换句话说,做这个事情的人,不仅仅要是数据挖掘专家,还需要语言学家,这个背景知识依赖性问题会阻碍着自然语言处理的进步. 庆幸的是,深度学习解决了这个问

Spark2.0机器学习系列之2:基于Pipeline、交叉验证、ParamMap的模型选择和超参数调优

Spark中的CrossValidation Spark中采用是k折交叉验证 (k-fold cross validation).举个例子,例如10折交叉验证(10-fold cross validation),将数据集分成10份,轮流将其中9份做训练1份做验证,10次的结果的均值作为对算法精度的估计. 10折交叉检验最常见,是因为通过利用大量数据集.使用不同学习技术进行的大量试验,表明10折是获得最好误差估计的恰当选择,而且也有一些理论根据可以证明这一点.但这并非最终结论,争议仍然存在.而且似

IBM专家亲自解读 Spark2.0 操作指南

Spark 背景介绍 1.什么是Spark 在Apache的网站上,有非常简单的一句话,"Spark is a fast and general engine ",就是Spark是一个统一的计算引擎,而且突出了fast.那么具体是做什么的呢?是做large-scale的processing,即大数据的处理. "Spark is a fast and general engine for large-scale processing"这句话非常简单,但是它突出了Spa

Spark2.0机器学习系列之6:GBDT(梯度提升决策树)、GBDT与随机森林差异、参数调试及Scikit代码分析

概念梳理 GBDT的别称 GBDT(Gradient Boost Decision Tree),梯度提升决策树.     GBDT这个算法还有一些其他的名字,比如说MART(Multiple Additive Regression Tree),GBRT(Gradient Boost Regression Tree),Tree Net等,其实它们都是一个东西(参考自wikipedia – Gradient Boosting),发明者是Friedman. 研究GBDT一定要看看Friedman的pa

Spark2.0机器学习系列之8: 聚类分析(K-Means,Bisecting K-Means,LDA,高斯混合模型)

在写这篇文章之前,先说一些题外话. 许多机器学习算法(如后面将要提到的LDA)涉及的数学知识太多,前前后后一大堆,理解起来不是那么容易. 面对复杂的机器学习模型,尤其是涉及大量数学知识的模型,我们往往要花费大量的时间和精力去推导数学算法(公式),如果过分沉湎于此会忽略了很多背后也许更重要的东西,正所谓只见树木,不见森林,而这是缺乏远见,是迷茫的. 我们需要深入理解模型背后的逻辑和所蕴含的或简或繁的思想.某些思想甚至可能是很美的思想,很伟大的思想.这些理解,使得面对复杂的问题时候,面对陌生问题时,

学习Spark2.0中的Structured Streaming(一)

转载自:http://lxw1234.com/archives/2016/10/772.htm Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL).Structured Streaming顾名思义,它将数据源和计算结果都映射成一张"结构化"的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率. Sp

基于spark2.0整合spark-sql + mysql + parquet + HDFS

一.概述 spark 2.0做出的改变大家可以参考官网以及其他资料,这里不再赘述由于spark1.x的sqlContext在spark2.0中被整合到sparkSession,故而利用spark-shell客户端操作会有些许不同,具体如下文所述 二.spark额外配置 1. 正常配置不再赘述,这里如果需要读取MySQL数据,则需要在当前用户下的环境变量里额外加上JDBC的驱动jar包 例如我的是:mysql-connector-java-5.1.18-bin.jar 存放路径是$SPARK_HO

mongodb3.0 性能測试报告 二

mongodb3.0 性能測试报告 一 mongodb3.0 性能測试报告 二 mongodb3.0 性能測试报告 三 測试环境: 服务器:X86 pcserver   共6台 cpu:  单颗8核 内存:64G 磁盘: raid 10 操作系统 :centos 6.5 mongodb:3.0 java驱动:2.13.0 jdk:1.6 网络:千兆以太网 測试场景 : 单台monodb服务,一台同配置server作为压力server,数据量不超过内存大小. 库里背景为1亿条大小为10K的数据.

C#与C++数据类型比较及结构体转换(搜集整理二)

原文网址:http://www.blogjava.net/heting/archive/2010/03/20/315998.html C++ C# ===================================== WORD ushort DWORD uint UCHAR int/byte 大部分情况都可以使用int代替,而如果需要严格对齐的话则应该用bytebyte UCHAR* string/IntPtr unsigned char* [MarshalAs(UnmanagedType