基于flink的协同过滤

最近flink较火,尝试使用flink做推荐功能试试,说干就干,话说flink-ml确实比较水,包含的算法较少,且只支持scala版本,以至flink1.9已经将flink-ml移除,看来是准备有大动作,但后期的实时推荐,flink能派上大用场。所幸基于物品的协同过滤算法相对简单,实现起来难度不大。先看目前推荐整体的架构。

先说一下用到的相似算法:
X=(x1, x2, x3, … xn),Y=(y1, y2, y3, … yn)
那么欧式距离为:

很明显,值越大,相似性越差,如果两者完全相同,那么距离为0。

第一步准备数据,数据的格式如下:

actionObject 是房屋的编号,actionType是用户的行为,包括曝光未点击,点击,收藏等。

下面的代码是从hdfs中获取数据,并将view事件的数据清除,其他的行为转化为分数


public static DataSet<Tuple2<Tuple2<String, String>, Float>> getData(ExecutionEnvironment env, String path) {
        DataSet<Tuple2<Tuple2<String, String>, Float>> res= env.readTextFile(path).map(new MapFunction<String, Tuple2<Tuple2<String, String>, Float>> (){

            @Override
            public Tuple2<Tuple2<String, String>, Float> map(String value) throws Exception {
                    JSONObject jj=JSON.parseObject(value);
                    if(RecommendUtils.getValidAction(jj.getString("actionType"))) {
                        return new Tuple2<>(new Tuple2<>(jj.getString("userId"),jj.getString("actionObject")),RecommendUtils.getScore(jj.getString("actionType")));
                    }else {
                        return null;
                    }

                }
            }).filter(new FilterFunction<Tuple2<Tuple2<String, String>, Float>>(){
                @Override
                public boolean filter(Tuple2<Tuple2<String, String>, Float> value) throws Exception {
                    return value!=null;
                }
            });

           return res;
    }

数据经过简单的清洗后变成如下的格式

按照前两列聚合,

groupBy(0).reduce(new ReduceFunction<Tuple2<Tuple2<String, String>, Float>>() { 

                @Override
                public Tuple2<Tuple2<String, String>, Float> reduce(Tuple2<Tuple2<String, String>, Float> value1,
                        Tuple2<Tuple2<String, String>, Float> value2) throws Exception {
                    // TODO Auto-generated method stub
                    return new Tuple2<>(new Tuple2<>(value1.f0.f0, value1.f0.f1),(value1.f1+value2.f1));
                }

            })

结构变成

此时,理论上BJCY56167779_03,BJCY56167779_04 的相似度为 (4-3) ^2+(5-2) ^2, 再开方,继续前进。

去掉第一列,格式如下

因为:
(x1-y1)^2+(x2-y2)^2=x1^2+y1^2-2x1y1+x2^2+y2^2-2x2y2=x1^2+y1^2+x2^2+y2^2-2(x1y1+x2y2), 所以我们先求x1^2+x2^2的值,并注册为item表


.map(new MapFunction<Tuple2<String, Float>, Tuple2<String, Float>>() {
                @Override
                public Tuple2<String, Float> map(Tuple2<String, Float> value) throws Exception {
                    return new Tuple2<>(value.f0, value.f1*value.f1);
                }
            }).
groupBy(0).reduce(new ReduceFunction<Tuple2<String, Float>>(){

                @Override
                public Tuple2<String, Float> reduce(Tuple2<String, Float> value1, Tuple2<String, Float> value2)
                        throws Exception {
                     Tuple2<String, Float> temp= new Tuple2<>(value1.f0, value1.f1 +  value2.f1);
                     return temp;
                }

}).map(new MapFunction<Tuple2<String, Float>, ItemDTO> (){

                @Override
                public ItemDTO map(Tuple2<String, Float> value) throws Exception {
                    ItemDTO nd=new ItemDTO();
                    nd.setItemId(value.f0);
                    nd.setScore(value.f1);
                    return nd;
                }

}); 

tableEnv.registerDataSet("item", itemdto); // 注册表信息

经过上面的转化,前半部分的值已经求出,下面要求出(x1y1+x2y2)的值

将上面的原始table再次转一下,变成下面的格式

代码如下:

.map(new MapFunction<Tuple2<String,List<Tuple2<String,Float>>>, List<Tuple2<Tuple2<String, String>, Float>>>() {

                @Override
                public List<Tuple2<Tuple2<String, String>, Float>> map(Tuple2<String,List<Tuple2<String,Float>>> value) throws Exception {
                    List<Tuple2<String, Float>> ll= value.f1;
                    List<Tuple2<Tuple2<String, String>, Float>> list = new ArrayList<>();
                    for (int i = 0; i < ll.size(); i++) {
                        for (int j = 0; j < ll.size(); j++) {
                            list.add(new Tuple2<>(new Tuple2<>(ll.get(i).f0, ll.get(j).f0),
                                    ll.get(i).f1 * ll.get(j).f1));
                        }
                    }
                    return list;
                }

            })

tableEnv.registerDataSet("item_relation", itemRelation); // 注册表信息

下面就是将整个公式连起来,完成最后的计算。

Table similarity=tableEnv.sqlQuery("select ta.firstItem,ta.secondItem,"
        + "(sqrt(tb.score + tc.score - 2 * ta.relationScore)) as similarScore from item tb " +
        "inner join item_relation ta  on tb.itemId = ta.firstItem and ta.firstItem <> ta.secondItem "+
        "inner join item tc on tc.itemId = ta.secondItem "
        );

DataSet<ItemSimilarDTO> ds=tableEnv.toDataSet(similarity, ItemSimilarDTO.class);

现在结构变成

感觉离终点不远了,上述结构依然不是我们想要的,我们希望结构更加清晰,如下格式

代码如下:

DataSet<RedisDataModel> redisResult= ds.map(new MapFunction<ItemSimilarDTO, Tuple2<String, Tuple2<String, Float>>> (){

            @Override
            public Tuple2<String, Tuple2<String, Float>> map(ItemSimilarDTO value) throws Exception {
                return new Tuple2<String, Tuple2<String, Float>>(value.getFirstItem(), new Tuple2<>(value.getSecondItem(), value.getSimilarScore().floatValue()));
            }
        }).groupBy(0).reduceGroup(new GroupReduceFunction<Tuple2<String, Tuple2<String, Float>> , Tuple2<String, List<RoomModel>>>() { 

            @Override
            public void reduce(Iterable<Tuple2<String, Tuple2<String, Float>>> values,
                    Collector<Tuple2<String, List<RoomModel>>> out) throws Exception {

                List<RoomModel> list=new ArrayList<>();
                String key=null;
                for (Tuple2<String, Tuple2<String, Float>> t : values) {
                    key=t.f0;
                    RoomModel rm=new RoomModel();
                    rm.setRoomCode(t.f1.f0);
                    rm.setScore(t.f1.f1);
                    list.add(rm);
                }       

                //升序排序
                Collections.sort(list,new Comparator<RoomModel>(){
                    @Override
                    public int compare(RoomModel o1, RoomModel o2) {
                        return o1.getScore().compareTo(o2.getScore());
                    }
                 });

                out.collect(new Tuple2<>(key,list));
            }

        }).map(new MapFunction<Tuple2<String, List<RoomModel>>, RedisDataModel>(){

            @Override
            public RedisDataModel map(Tuple2<String, List<RoomModel>> value) throws Exception {
                RedisDataModel m=new RedisDataModel();
                m.setExpire(-1);
                m.setKey(JobConstants.REDIS_FLINK_ITEMCF_KEY_PREFIX+value.f0);
                m.setGlobal(true);
                m.setValue(JSON.toJSONString(value.f1));
                return m;
            }

        });

最终将这些数据存入redis中,方便查询

RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat()
                    .setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER))
                    .setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS))
                    .setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE)))
                    .setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL)))
                    .setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS)))
                    .setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW)))
                    .finish();
            redisResult.output(redisOutput);

            env.execute("itemcf");

大功告成,其实没有想象中的那么难。当然这里只是一个demo,实际情况还要进行数据过滤,多表join优化等。

原文地址:https://blog.51cto.com/12597095/2433875

时间: 2024-10-18 03:54:27

基于flink的协同过滤的相关文章

Mahout实现基于用户的协同过滤算法

Mahout中对协同过滤算法进行了封装,看一个简单的基于用户的协同过滤算法. 基于用户:通过用户对物品的偏好程度来计算出用户的在喜好上的近邻,从而根据近邻的喜好推测出用户的喜好并推荐. 图片来源 程序中用到的数据都存在MySQL数据库中,计算结果也存在MySQL中的对应用户表中. package com.mahout.helloworlddemo; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.

基于用户的协同过滤算法(UserCF)

基于用户的协同过滤算法: 找到和目标用户相似的用户集合 找到这个集合中用户喜欢的但目标用户没有听过的物品 #encoding: utf-8 from Similarity import Person from Sort import select_sort file=open('user_bookmark','r') filew=open('user_bookRecommend','w') #加载训练集 trainSet={} while True: line=file.readline().s

基于用户的协同过滤推荐算法原理和实现

在推荐系统众多方法中,基于用户的协同过滤推荐算法是最早诞生的,原理也较为简单.该算法1992年提出并用于邮件过滤系统,两年后1994年被 GroupLens 用于新闻过滤.一直到2000年,该算法都是推荐系统领域最著名的算法. 本文简单介绍基于用户的协同过滤算法思想以及原理,最后基于该算法实现园友的推荐,即根据你关注的人,为你推荐博客园中其他你有可能感兴趣的人. 基本思想 俗话说"物以类聚.人以群分",拿看电影这个例子来说,如果你喜欢<蝙蝠侠>.<碟中谍>.&l

【推荐系统实战】:C++实现基于用户的协同过滤(UserCollaborativeFilter)

好早的时候就打算写这篇文章,但是还是参加阿里大数据竞赛的第一季三月份的时候实验就完成了,硬生生是拖到了十一假期,自己也是醉了...找工作不是很顺利,希望写点东西回顾一下知识,然后再攒点人品吧,只能如此了. 一.问题背景 二.基于用户的协同过滤算法介绍 三.数据结构和实验过程设计 四.代码 一.问题背景 首先介绍一下问题的背景,现在我有四个月的用户.品牌数据<user,brand>,即用户在这四个月中的某一天购买了某个品牌(当然为了简化算法模型,将购买时间省去,后面再说).即现在有这四个月的数据

基于用户的协同过滤

最近在看推荐系统.主要是看<智能web算法>和<推荐系统实战>这两本书.<智能web算法>中推荐系统只花一个章节来讲.<推荐系统实战>整本书都是在讲推荐的内容.有兴趣的朋友可以看看.在此慢慢写下笔记与诸位来宾交流交流 推荐系统应用广泛.推荐的方式也多种多样.比较常用的有三种方式.1.社会化推荐:2.协同过滤推荐:3.基于内容的推荐.而协同过滤推荐又可以分为基于用户的协同过滤推荐(UserCF)和基于物品的协同过滤推荐(ItemCF).本文写的是基于用户的协同

ItemCF_基于物品的协同过滤_MapReduceJava代码实现思路

2017年2月19日, 星期日 ItemCF_基于物品的协同过滤 1.    概念 2.    原理 如何给用户推荐? 给用户推荐他没有买过的物品--103 3.    java代码实现思路 数据集: 第一步:构建物品的同现矩阵 第二步:构建用户的得分矩阵 第三步:同现矩阵*评分矩阵 第四步:拿到最终结果,排序,得到给用户的推荐列表 问题一:物品同现矩阵和用户得分矩阵如何构建? 问题二:矩阵相乘如何来做?   六个MapReduce step1_第一个MapReduce: 目的-->去重去除数据

(数据挖掘-入门-5)基于内容的协同过滤与分类

1.动机 2.基于内容的分类器 3.python实现 一.动机 在前面的文章中介绍了基于用户和基于物品的协同过滤推荐方法,其实无论是基于用户还是基于物品,都是通过群体效应来进行推荐,因为衡量相似度的向量都是基于一定群体用户的评分,所以推荐出来的物品都是热门的流行的物品,对于一些冷门物品可能就无法收到亲睐. 而一个好的推荐系统,不仅能为用户发现热门流行的感兴趣物品,也能为用户提供自己也不了解的但也会感兴趣的物品,即冷门的物品,这样既有益于用户,也照顾了内容提供商. 因此,本文将介绍一种基于内容即物

(数据挖掘-入门)基于用户的协同过滤之最近邻

主要内容: 1.什么是基于用户的协同过滤 2.python实现 1.什么是基于用户协同过滤: 协同过滤:Collaborative Filtering,一般用于推荐系统,如京东,亚马逊等电商网站上的“购买该物品的用户还喜欢/购买”之类的栏目都是根据协同过滤推荐出来的. 基于用户的协同过滤:User-based CF,通过不同用户对item(物品)的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐. 这里介绍一种最简单的过滤方法:最近邻,即找到与某用户最相似的用户,将该用户喜欢的物品(而某

推荐算法之基于物品的协同过滤算法

基于物品的协同过滤算法(ItemCF)是业界应用最多的算法,主要思想是利用用户之前有过的行为,给用户推荐和之前物品类似的物品. 基于物品的协同过滤算法主要分为两步: 1)计算物品之间的相似度. 2)依据物品的相似度和用户的历史行为给用户生成推荐列表. 第一步的关键点在于计算物品之间的相似度,这里并不採用基于内容的相似性,而是去计算在喜欢物品i的用户中有多少是喜欢物品j的,这样计算的前提是用户的兴趣爱好通常是比較确定的,不easy变,那么当一个用户对两个物品都喜欢的时候,我们往往能够觉得这两个物品