Flink用户画像系统之实时品牌爱好

技术点:springcloud + kafka + hbase + mogodb

1、建立实体对象

     浏览商品行为    ScanProductLog
        收藏商品行为    CollectProductLog
        购物车行为        BuyCartProductLog
        关注商品行为    AttentionProductLog

2、搭建kafka并创建topic ,搭建可参考:https://www.cnblogs.com/ywjfx/p/10305161.html ,整合springboot可参考:https://www.cnblogs.com/ywjfx/p/11197646.html

            bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scanProductLog

            bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic collectProductLog

            bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic buyCartProductLog

            bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic attentionProductLog

3、添加flink stream 依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${project.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${project.version}</version>
        </dependency>

4、BrandLikeTask.java

package com.yangwj.task;

import com.yangwj.entity.BrandLike;
import com.yangwj.kafka.KafkaEvent;
import com.yangwj.map.BrandLikeMap;
import com.yangwj.reduce.BrandLikeReduce;
import com.yangwj.reduce.BrandLikeSink;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import com.yangwj.kafka.KafkaEventSchema;
import javax.annotation.Nullable;

/**
 * Created by li on 2019/1/6.
 */
public class BrandlikeTask {
    public static void main(String[] args) {
        // parse input arguments
        args = new String[]{"--input-topic","scanProductLog","--bootstrap.servers","192.168.80.134:9092","--zookeeper.connect","192.168.80.134:2181","--group.id","yangwj"};
        final ParameterTool parameterTool = ParameterTool.fromArgs(args);

//        if (parameterTool.getNumberOfParameters() < 5) {
//            System.out.println("Missing parameters!\n" +
//                    "Usage: Kafka --input-topic <topic> --output-topic <topic> " +
//                    "--bootstrap.servers <kafka brokers> " +
//                    "--zookeeper.connect <zk quorum> --group.id <some id>");
//            return;
//        }

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<KafkaEvent> input = env
                .addSource(
                        new FlinkKafkaConsumer010<>(
                                parameterTool.getRequired("input-topic"),
                                new KafkaEventSchema(),
                                parameterTool.getProperties())
                                .assignTimestampsAndWatermarks(new CustomWatermarkExtractor()));//订阅并读取kafka数据
        DataStream<BrandLike> brandLikeMap = input.flatMap(new BrandLikeMap());

        DataStream<BrandLike> brandLikeReduce = brandLikeMap.keyBy("groupbyfield").timeWindowAll(Time.seconds(2)).reduce(new BrandLikeReduce());

        brandLikeReduce.addSink(new BrandLikeSink());

        try {
            env.execute("brandLike analy");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static class CustomWatermarkExtractor implements AssignerWithPeriodicWatermarks<KafkaEvent> {

        private static final long serialVersionUID = -742759155861320823L;

        private long currentTimestamp = Long.MIN_VALUE;

        @Override
        public long extractTimestamp(KafkaEvent event, long previousElementTimestamp) {
            // the inputs are assumed to be of format (message,timestamp)
            this.currentTimestamp = event.getTimestamp();
            return event.getTimestamp();
        }

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
        }
    }
}

5、KafkaEvent.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.yangwj.kafka;

/**
 * The event type used in the {@link Kafka010Example}.
 *
 * <p>This is a Java POJO, which Flink recognizes and will allow "by-name" field referencing
 * when keying a {@link org.apache.flink.streaming.api.datastream.DataStream} of such a type.
 * For a demonstration of this, see the code in {@link Kafka010Example}.
 */
public class KafkaEvent {
    private final static String splitword = "##";
    private String word;
    private int frequency;
    private long timestamp;

    public KafkaEvent() {}

    public KafkaEvent(String word, int frequency, long timestamp) {
        this.word = word;
        this.frequency = frequency;
        this.timestamp = timestamp;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public int getFrequency() {
        return frequency;
    }

    public void setFrequency(int frequency) {
        this.frequency = frequency;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public static KafkaEvent fromString(String eventStr) {
        String[] split = eventStr.split(splitword);
        return new KafkaEvent(split[0], Integer.valueOf(split[1]), Long.valueOf(split[2]));
    }

    @Override
    public String toString() {
        return word +splitword + frequency + splitword + timestamp;
    }
}

6、KafkaEventSchema.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.yangwj.kafka;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.IOException;

/**
 * The serialization schema for the {@link KafkaEvent} type. This class defines how to transform a
 * Kafka record‘s bytes to a {@link KafkaEvent}, and vice-versa.
 */
public class KafkaEventSchema implements DeserializationSchema<KafkaEvent>, SerializationSchema<KafkaEvent> {

    private static final long serialVersionUID = 6154188370181669758L;

    @Override
    public byte[] serialize(KafkaEvent event) {
        return event.toString().getBytes();
    }

    @Override
    public KafkaEvent deserialize(byte[] message) throws IOException {
        return KafkaEvent.fromString(new String(message));
    }

    @Override
    public boolean isEndOfStream(KafkaEvent nextElement) {
        return false;
    }

    @Override
    public TypeInformation<KafkaEvent> getProducedType() {
        return TypeInformation.of(KafkaEvent.class);
    }
}

7、BrandLikeMap.java

注意:使用 FlatMapFunction,是因为要有两个标签给到reduce,如果只有一个标签,可以使用MapFunction

package com.yangwj.map;

import com.alibaba.fastjson.JSONObject;
import com.yangwj.entity.BrandLike;
import com.yangwj.kafka.KafkaEvent;
import com.yangwj.log.ScanProductLog;
import com.yangwj.util.HbaseUtils;
import com.yangwj.utils.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by li on 2019/1/6.
 */
public class BrandLikeMap implements FlatMapFunction<KafkaEvent, BrandLike>  {

    @Override
    public void flatMap(KafkaEvent kafkaEvent, Collector<BrandLike> collector) throws Exception {
            String data = kafkaEvent.getWord();
            ScanProductLog scanProductLog = JSONObject.parseObject(data,ScanProductLog.class);
            int userid = scanProductLog.getUserid();
            String brand = scanProductLog.getBrand();
            String tablename = "userflaginfo";
            String rowkey = userid+"";
            String famliyname = "userbehavior";
            String colum = "brandlist";//运营
            String mapdata = HbaseUtils.getdata(tablename,rowkey,famliyname,colum);
            Map<String,Long> map = new HashMap<String,Long>();
            if(StringUtils.isNotBlank(mapdata)){
                map = JSONObject.parseObject(mapdata,Map.class);
            }
            //获取之前的品牌偏好
            String maxprebrand = MapUtils.getmaxbyMap(map);

            long prebarnd = map.get(brand)==null?0l:map.get(brand);
            map.put(brand,prebarnd+1);
            String finalstring = JSONObject.toJSONString(map);
            HbaseUtils.putdata(tablename,rowkey,famliyname,colum,finalstring);

            String maxbrand = MapUtils.getmaxbyMap(map);
            if(StringUtils.isNotBlank(maxbrand)&&!maxprebrand.equals(maxbrand)){
                BrandLike brandLike = new BrandLike();
                brandLike.setBrand(maxprebrand);
                brandLike.setCount(-1l);
                brandLike.setGroupbyfield("==brandlik=="+maxprebrand);
                collector.collect(brandLike);
            }

            BrandLike brandLike = new BrandLike();
            brandLike.setBrand(maxbrand);
            brandLike.setCount(1l);
            collector.collect(brandLike);
            brandLike.setGroupbyfield("==brandlik=="+maxbrand);
            colum = "brandlike";
            HbaseUtils.putdata(tablename,rowkey,famliyname,colum,maxbrand);

    }

}

8、BrandLikeReduce.java

package com.yangwj.reduce;

import com.yangwj.entity.BrandLike;
import com.yangwj.entity.CarrierInfo;
import org.apache.flink.api.common.functions.ReduceFunction;

/**
 * Created by li on 2019/1/6.
 */
public class BrandLikeReduce implements ReduceFunction<BrandLike> {
    @Override
    public BrandLike reduce(BrandLike brandLike, BrandLike t1) throws Exception {
        String brand = brandLike.getBrand();
        long count1 = brandLike.getCount();
        long count2 = t1.getCount();
        BrandLike brandLikefinal = new BrandLike();
        brandLikefinal.setBrand(brand);
        brandLikefinal.setCount(count1+count2);
        return brandLikefinal;
    }
}

9、BrandLikeSink.java

package com.yangwj.reduce;

import com.yangwj.entity.BrandLike;
import com.yangj.util.MongoUtils;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.bson.Document;

/**
 * Created by li on 2019/1/6.
 */
public class BrandLikeSink implements SinkFunction<BrandLike> {
    @Override
    public void invoke(BrandLike value, Context context) throws Exception {
        String brand = value.getBrand();
        long count = value.getCount();
        Document doc = MongoUtils.findoneby("brandlikestatics","portrait",brand);
        if(doc == null){
            doc = new Document();
            doc.put("info",brand);
            doc.put("count",count);
        }else{
            Long countpre = doc.getLong("count");
            Long total = countpre+count;
            doc.put("count",total);
        }
        MongoUtils.saveorupdatemongo("brandlikestatics","portrait",doc);
    }
}

10、BrandLike.java

package com.yangwj.entity;

/**
 * Created by li on 2019/1/6.
 */
public class BrandLike {
    private String brand;
    private long count;
    private String groupbyfield;

    public String getGroupbyfield() {
        return groupbyfield;
    }

    public void setGroupbyfield(String groupbyfield) {
        this.groupbyfield = groupbyfield;
    }

    public String getBrand() {
        return brand;
    }

    public void setBrand(String brand) {
        this.brand = brand;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }
}

原文地址:https://www.cnblogs.com/ywjfx/p/12349496.html

时间: 2024-08-30 14:29:54

Flink用户画像系统之实时品牌爱好的相关文章

Flink流处理的动态实时亿级全端用户画像系统视频课程分享

基于Flink流处理的动态实时亿级全端用户画像系统课程下载: https://pan.baidu.com/s/1YtMs-XG5-PsTFV9_7-AlfA 提取码: 639m 项目中采用到的算法包含Logistic Regression.Kmeans.TF-IDF等,Flink暂时支持的算法比较少,对于以上算法,本课程将手把手带大家用Flink实现,并且结合真实场景,学完即用.本套教程的Flink算法部分属于国内课程首创. 系统包含所有终端的数据(移动端.pc端.小程序端,快应用等等),支持亿

个推用户画像的实践与应用

"以用户为核心"的概念在互联网时代深入人心,然而要真正了解用户懂得用户,就不得不提到"用户画像". 随着大数据技术的深入研究与应用,借助用户画像,企业或APP可以深入挖掘用户需求,从而实现精细化运营以及为精准营销打下坚实基础.本文将重点介绍何为用户画像,用户画像的构建流程以及应用场景. 用户画像,本质是数据能力的体现 用户画像,即用户信息的标签化,而从本质上来说,用户画像是数据的标签化.常见的用户画像体系有三种:结构化体系.非结构化体系和半结构化体系.非结构化体系没

用户画像的技术选型与架构实现

这里讲解下用户画像的技术架构和整体实现,那么就从数据整理.数据平台.面向应用三个方面来讨论一个架构的实现(个人见解). 数据整理: 1.数据指标的的梳理来源于各个系统日常积累的日志记录系统,通过sqoop导入hdfs,也可以用代码来实现,比如spark的jdbc连接传统数据库进行数据的cache.还有一种方式,可以通过将数据写入本地文件,然后通过sparksql的load或者hive的export等方式导入HDFS. 2.通过hive编写UDF 或者hiveql 根据业务逻辑拼接ETL,使用户对

干货:如何利用CRM系统数据做用户画像?

企业在使用CRM系统一段时间后,系统就会积累很大的用户数据,那这些数据有什么用呢?当然有,我们可以对数据进行分析,做用户画像,从而更了解我们的客户,帮助企业做决策. 今天智云通CRM系统小编跟大家一起聊聊如何利用CRM系统数据做用户画像. 一步步来.当我们谈到了解我们的客户(understand our member base),无论是用户类群(segmentation) 还是用户肖像(persona) ,其实说白了是对两类客户认知的判断: l 现存客户 (Existing Customer)

大数据可视化分析电商快销用户画像分析系统开发

大数据的时代,每一个企业都希望从用户数据中分析出有价值的信息.尤其是电商行业,用户画像分析可以让商品推广范围更加精准,从而提升销量.大数据分析系统可以从海量数据分析预测出商品的发展的趋势,提高产品质量,同时提高用户满意度. 用户画像也叫用户信息标签化,根据用户社会属性.生活习惯和消费行为等信息而抽象出的一个标签化的用户模型.在电商的大数据中,可以通过用户的消费习惯,在电商平台上填的信息分析出大致的标签. 大数据可视化电商用户画像分析系统的优势: 1.精准营销:通过用户画像分析后,可以针对潜在用户

大数据时代下的用户洞察:用户画像建立(ppt版)

大数据是物理世界在网络世界的映射,是一场人类空前的网络画像运动.网络世界与物理世界不是孤立的,网络世界是物理世界层次的反映.数据是无缝连接网络世界与物理世界的DNA.发现数据DNA.重组数据DNA是人类不断认识.探索.实践大数据的持续过程. 图1 大数据发展路径 陈新河把网络画像分为行为画像.健康画像.企业信用画像.个人信用画像.静态产品画像.旋转设备画像.社会画像和经济画像等八类,并通过实践案例进行了阐释. 未来,人生的每个历程无时无刻不由数据驱动. 图2 数据驱动人生 未来,设备全生命周期也

用户画像的构建与使用2应用

 用户画像的具体应用包括售前的精准营销,售中的个性化推荐以及售后的增值服务等.用户的标签纬度和应用是相互相城的关系,一面可以根据现有的标签纬度开发应用,另一方面可以通过应用需求扩展维度,两者互相促进. 我们在这里举的列子分为3类,第一类是售前的精准营销,比如电商客户和企业客户,需要经过精准营销,把站外的用户吸引到你的网站上面来.第二,售中的个性化推荐,就是把这些用户吸引过来之后,如果通过个性化的方式能够更好的提升站内的转化效果.第三类是售后增值服务,就是你把这个产品卖出去之后,不是这样就结束了,

大数据项目之dmp用户画像

一.互联网广告精准投放介绍 (1)dsp的展示原理: ① 用户浏览媒体网站,媒体网站通过添加的 SSP 代码向 AdExchange 发起广告请求.② AdExchange 将这次请求的关键信息(如域名 URL.IP.Cookie 等)同时发送给多家 DSP,我们把这个请求称为 Bid Request.③ DSP 收到请求后通过 Cookie.IP.URL 等信息决策是否参与竞价,DSP 可以通过 Cookie 来查询此用户在自己系统中的历史行为来推算人口属性和兴趣爱好,如果DSP没有这个能力,

基于大数据技术的手机用户画像与征信研究

内容提要:手机用户画像是电信运营商实现“数据驱动业务与运营”的重要举措.首先,介绍了手机用户画像过程中对个人隐私保护的方法,然后分析手机用户画像的数据来源与大数据实现技术,最后,通过数据样本实例分析手机用户画像在个人征信中的应用. 引言 随着计算机网络技术的不断发展,“数据即资源”的大数据时代已经来临.用户画像是电信运营商为了避免管道化风险,实现“数据驱动业务与运营”的重要举措.用户画像与应用大数据技术对客户分类密切相关,是单个客户的众多属性标签的累积:另一方面,在运营商涉足的消费金融领域,对手