Spark Java API 计算 Levenshtein 距离

Spark Java API 计算 Levenshtein 距离

上一篇文章中,完成了Spark开发环境的搭建,最终的目标是对用户昵称信息做聚类分析,找出违规的昵称。聚类分析需要一个距离,用来衡量两个昵称之间的相似度。这里采用levenshtein距离。现在就来开始第一个小目标,用Spark JAVA API 计算字符串之间的Levenshtein距离。

1. 数据准备

样本数据如下:

{"name":"Michael", "nick":"Mich","age":50}

{"name":"Andy", "nick":"Anc","age":30}

{"name":"Anch", "nick":"MmAc","age":19}

把数据保存成文件并上传到hdfs上:./bin/hdfs dfs -put levestein.json /user/panda

2. 代码实现

定义一个类表示样本数据:

    public static class User{
        private String name;
        private String nick;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getNick() {
            return nick;
        }

        public void setNick(String nick) {
            this.nick = nick;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

创建SparkSession

SparkSession sparkSession = SparkSession.builder()
                .appName("levenshtein example")
                .master("spark://172.25.129.170:7077")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();

在Spark命令行./bin/pyspark启动Spark时,会默认创建一个名称为 spark 的SparkSession。而这里是写代码,也需要创建SparkSession对象。

The SparkSession instance is the way Spark executes user-defined

manipulations across the cluster. There is a one-to-one correspondence between a SparkSession and

a Spark Application.

定义数据类型

Encoder<User> userEncoder = Encoders.bean(User.class);

JAVA里面定义了一套数据类型,比如java.util.String是字符串类型;类似地,Spark也有自己的数据类型,因此Encoder就定义了如何将Java对象映射成Spark里面的对象。

Used to convert a JVM object of type T to and from the internal Spark SQL representation.

To efficiently support domain-specific objects, an Encoder is required. The encoder maps the domain specific type T to Spark‘s internal type system. For example, given a class Person with two fields, name (string) and age (int), an encoder is used to tell Spark to generate code at runtime to serialize the Person object into a binary structure. This binary structure often has much lower memory footprint as well as are optimized for efficiency in data processing (e.g. in a columnar format). To understand the internal binary representation for data, use the schema function.

构建Dataset:

Dataset<User> userDataset = sparkSession.read().json(path).as(userEncoder);

说明一下Dataset与DataFrame区别,Dataset是针对Scala和JAVA特有的。Dataset是有类型的,Dataset的每一行是某种类型的数据,比如上面的User类型。

A Dataset is a strongly typed collection of domain-specific objects that can be transformed in parallel using functional or relational operations. Each Dataset also has an untyped view called a DataFrame, which is a Dataset of Row.

而DataFrame的每一行的类型是Row(看官方文档,我就这样理解了,哈哈。。)

DataFrame is represented by a Dataset of Row。While, in Java API, users need to use Dataset<Row> to represent a DataFrame.

这个图就很好地解释了DataFrame和Dataset的区别。

计算levenshtein距离,将之 transform 成一个新DataFrame中:

Column lev_res = functions.levenshtein(userDataset.col("name"), userDataset.col("nick"));
Dataset<Row> leveDataFrame = userDataset.withColumn("distance", lev_res);

完整代码

import org.apache.spark.sql.*;

public class LevenstenDistance {
    public static class User{
        private String name;
        private String nick;
        private int age;

        public String getName() {
            return name;
        }

        public void setName(String name) {
            this.name = name;
        }

        public String getNick() {
            return nick;
        }

        public void setNick(String nick) {
            this.nick = nick;
        }

        public int getAge() {
            return age;
        }

        public void setAge(int age) {
            this.age = age;
        }
    }

    public static void main(String[] args) {
        SparkSession sparkSession = SparkSession.builder()
                .appName("levenshtein example")
                .master("spark://172.25.129.170:7077")
                .config("spark.some.config.option", "some-value")
                .getOrCreate();
        String path = "hdfs://172.25.129.170:9000/user/panda/levestein.json";
        Encoder<User> userEncoder = Encoders.bean(User.class);
        Dataset<User> userDataset = sparkSession.read().json(path).as(userEncoder);
        userDataset.show();

        Column lev_res = functions.levenshtein(userDataset.col("name"), userDataset.col("nick"));
        Dataset<Row> leveDataFrame = userDataset.withColumn("distance", lev_res);
//        userDataset.show();
        leveDataFrame.show();
        System.out.println(lev_res.toString());
    }
}

原来的Dataset:

计算Levenshtein距离后的得到的DataFrame:

根据上面的示例,下面来演示一下一个更实际点的例子:计算昵称和签名之间的levenshtein距离,若levenshtein距离相同,就代表该用户的 昵称 和 签名 是相同的:

数据格式如下:

{"nick":"赖求","uid":123456}

{"details":"时尚是一种态度,时尚第一品牌。看我的。","nick":"冰冷世家@蹦蹦","signature":"轻装时代看我的。艾莱依时尚羽绒服。。","uid":123456}

{"nick":"[潗團軍-6]明 明『招 募』","signature":"我是来擂人的,擂死人不偿命!","uid":123456}

  1. 加载数据

            Dataset<Row> dataset = spark.read().format("json")
                    .option("header", "false")
                    .load("hdfs://172.25.129.170:9000/user/panda/profile_noempty.json");

    ?

  2. 取出昵称和签名
            //空字符串 与 null 是不同的
            Dataset<Row> nickSign = dataset.filter(col("nick").isNotNull())
                    .filter(col("signature").isNotNull())
                    .select(col("nick"), col("signature"), col("uid"));

    ?

  3. 计算昵称和签名的Levenshtein距离
    Column lev_distance = functions.levenshtein(nickSign.col("nick"), nickSign.col("signature"));
            Dataset<Row> nickSignDistance = nickSign.withColumn("distance", lev_distance);

    ?

  4. 按距离进行过滤
    Dataset<Row> sameNickSign = nickSignDistance.filter("distance = 0");

    ?

这样就能找出昵称和签名完全一样的用户了。

原文:https://www.cnblogs.com/hapjin/p/9954191.html

原文地址:https://www.cnblogs.com/hapjin/p/9954191.html

时间: 2024-10-19 04:54:01

Spark Java API 计算 Levenshtein 距离的相关文章

Spark Java API 之 CountVectorizer

Spark Java API 之 CountVectorizer 由于在Spark中文本处理与分析的一些机器学习算法的输入并不是文本数据,而是数值型向量.因此,需要进行转换.而将文本数据转换成数值型的向量有很多种方法,CountVectorizer是其中之一. A CountVectorizer converts a collection of text documents into a vector representing the word count of text documents.

java文本相似度计算(Levenshtein Distance算法(中文翻译:编辑距离算法))----代码和详解

算法代码实现: package com.util; public class SimFeatureUtil { private static int min(int one, int two, int three) { int min = one; if (two < min) { min = two; } if (three < min) { min = three; } return min; } public static int ld(String str1, String str2)

Spark实时流计算Java案例

现在,网上基于spark的代码基本上都是Scala,很多书上也都是基于Scala,没办法,谁叫spark是Scala写出来的了,但是我现在还没系统的学习Scala,所以只能用java写spark程序了,spark支持java,而且Scala也基于JVM,不说了,直接上代码 这是官网上给出的例子,大数据学习中经典案例单词计数 在linux下一个终端 输入 $ nc -lk 9999 然后运行下面的代码 package com.tg.spark.stream; import java.util.Ar

Spark基础与Java Api介绍

原创文章,转载请注明: 转载自http://www.cnblogs.com/tovin/p/3832405.html  一.Spark简介 1.什么是Spark 发源于AMPLab实验室的分布式内存计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足. 相比于MapReduce,Spark能充分利用内存资源提高计算效率. 2.Spark计算框架 Driver程序启动很多workers,然后workers在(分布式)文件系统中读取数据后转化为RDD(弹性分布式数据集),最后对RD

利用SparkLauncher 类以JAVA API 编程的方式提交Spark job

一.环境说明和使用软件的版本说明: hadoop-version:hadoop-2.9.0.tar.gz spark-version:spark-2.2.0-bin-hadoop2.7.tgz java-version:jdk1.8.0_151 集群环境:单机伪分布式环境. 二.适用背景 在学习Spark过程中,资料中介绍的提交Spark Job的方式主要有两种(我所知道的):第一种是通过命令行的方式提交Job,使用spark 自带的spark-submit工具提交,官网和大多数参考资料都是已这

计算字符串距离

描述: Levenshtein 距离,又称编辑距离,指的是两个字符串之间,由一个转换成另一个所需的最少编辑操作次数.许可的编辑操作包括将一个字符替换成另一个字符,插入一个字符,删除一个字符.编辑距离的算法是首先由俄国科学家Levenshtein提出的,故又叫Levenshtein Distance. Ex: 字符串A:abcdefg 字符串B: abcdef 通过增加或是删掉字符”g”的方式达到目的.这两种方案都需要一次操作.把这个操作所需要的次数定义为两个字符串的距离. 要求: 给定任意两个字

Spark(十) -- Spark Streaming API编程

本文测试的Spark版本是1.3.1 Spark Streaming编程模型: 第一步: 需要一个StreamingContext对象,该对象是Spark Streaming操作的入口 ,而构建一个StreamingContext对象需要两个参数: 1.SparkConf对象:该对象是配置Spark 程序设置的,例如集群的Master节点,程序名等信息 2.Seconds对象:该对象设置了StreamingContext多久读取一次数据流 第二步: 构建好入口对象之后,直接调用该入口的方法读取各

ElasticSearch Java api 详解_V1.0

原英文文档:http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/index.html (各个版本的api) Search部分: 查询有两种方法:query Java API和filter Java API 首先看一个例子: import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.Se

spark2.x由浅入深深到底系列六之RDD java api详解一

以下对RDD的三种创建方式.单类型RDD基本的transformation api.采样Api以及pipe操作进行了java api方面的阐述 一.RDD的三种创建方式 从稳定的文件存储系统中创建RDD,比如local fileSystem或者hdfs等,如下: //从hdfs文件中创建 JavaRDD<String> textFileRDD = sc.textFile("hdfs://master:9999/users/hadoop-twq/word.txt"); //从