日活跃用户统计函数

题记:

  在做运营统计的时候,一个最常见的指标是日活跃用户数(DAU),它的一般性概念为当日所有用户的去重,但是在大部分情况下,我们获取到的数据中会有登录用户与有匿名用户,而这部分用户是会出现重叠的。常规的做法是利用cookie或者imei(移动端)进行自关联,然后算出有多少用户同时是登录用户和匿名用户,最终的 日活跃用户数 = 登录用户+匿名用户-匿名转登录用户。

  在实际操作中需要写复杂的HQL才能完成这部分工作,而且运行效率低下,为此需要开发一个UDAF函数进行处理。

首先说明一下函数的原理:

/**
 * 根据flag,uid和imei信息计算个数
 * -fla为1    : 将对应的UID存储在UID集合中,该集合代表登录用户
 * -flag不为1 : 将对应的imei|wyy存储在IMEI集合中,该集合代表匿名用户
 * 将imei|wyy存储一个Map当中,并且判断该imei|wyy对应的flag是否同时出现过0和1俩个值,如果是则map中对应的value = 2否则为flag
 * 参数原型:
 *      int itemcount(flag,uid,imei)
 * 参数说明:
 *      flag: 1或者不为1
 *      uid: 用户id
 *      imei: 用户的第二个参照标识(imei|wyy|cookie)
 *
 * 返回值:
 *      int类型,dau值
 *
 * 使用示例:
 *      > SELECT flag, uid, imei FROM test;
 *      1   uid1 imei1
 *      1   uid2 imei1
 *      0   uid3 imei3
 *
 *      > SELECT daucount(flag,uid,imei) FROM test;
 *      1
 */

  其中flag参数可以用其它udf函数进行替换,用以判断uid是否是登录用户。

下面是具体的代码块:

package yy.juefan.udaf;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentLengthException;
import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
import org.apache.hadoop.hive.serde2.objectinspector.StandardListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;

@Description (
        name = "dau_count",
        value = "_FUNC_(flag,uid,imei)"
        )
public class GenericDauCount extends AbstractGenericUDAFResolver {

    private static final boolean DEBUG = false;
    private static final boolean TRACE = false;

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
            throws SemanticException {

        if (parameters.length != 3) {
            throw new UDFArgumentLengthException(
                    "Exactly 3 argument is expected.");
        }

        if (((PrimitiveTypeInfo) parameters[0]).getPrimitiveCategory() != PrimitiveCategory.INT) {
            throw new UDFArgumentTypeException(0,
                    "Only int argument is accepted, but "
                            + parameters[0].getTypeName() + " is passed");
        }

        if (((PrimitiveTypeInfo) parameters[1]).getPrimitiveCategory() != PrimitiveCategory.STRING) {
            throw new UDFArgumentTypeException(1,
                    "Only string argument is accepted, but "
                            + parameters[1].getTypeName() + " is passed");
        }

        if (((PrimitiveTypeInfo) parameters[2]).getPrimitiveCategory() != PrimitiveCategory.STRING) {
            throw new UDFArgumentTypeException(2,
                    "Only string argument is accepted, but "
                            + parameters[2].getTypeName() + " is passed");
        }

        return new GenericDauCountEvaluator();
    }

    public static class GenericDauCountEvaluator extends GenericUDAFEvaluator {
        // 封装接口
        StructField uidSetField;
        StructField imeiSetField;
        StructField imeiMapField;

        StructObjectInspector map2red;

        // for PARTIAL1 and COMPLETE
        IntObjectInspector flagIO;
        StringObjectInspector uidIO;
        StringObjectInspector imeiIO;

        // for PARTIAL2 and FINAL
        StandardListObjectInspector uidSetIO;
        StandardListObjectInspector imeiSetIO;
        StandardMapObjectInspector imeiMapIO;
        private static class DivideAB implements AggregationBuffer {
            Set<String> uidSet;
            Set<String> imeiSet;
            Map<String, Integer> imeiMap;
        }

        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            DivideAB dab = new DivideAB();
            reset(dab);
            return dab;
        }

        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            DivideAB dab = (DivideAB) agg;
            dab.uidSet = new HashSet<String>();
            dab.imeiSet = new HashSet<String>();
            dab.imeiMap = new HashMap<String, Integer>();
        }

        boolean warned = false;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
            super.init(m, parameters);

            // input
            if (m == Mode.PARTIAL1 || m == Mode.COMPLETE) { // for iterate
                assert (parameters.length == 3);
                flagIO = (IntObjectInspector) parameters[0];
                uidIO = (StringObjectInspector) parameters[1];
                imeiIO = (StringObjectInspector) parameters[2];
            } else { // for merge
                map2red = (StructObjectInspector) parameters[0];
                uidSetField = map2red.getStructFieldRef("uidSet");
                imeiSetField = map2red.getStructFieldRef("imeiSet");
                imeiMapField = map2red.getStructFieldRef("imeiMap");

                uidSetIO = (StandardListObjectInspector) uidSetField
                        .getFieldObjectInspector();
                imeiSetIO = (StandardListObjectInspector) imeiSetField
                        .getFieldObjectInspector();
                imeiMapIO = (StandardMapObjectInspector) imeiMapField
                        .getFieldObjectInspector();
            }
            if (m == Mode.PARTIAL1 || m == Mode.PARTIAL2) {

                ArrayList<ObjectInspector> foi = new ArrayList<ObjectInspector>();
                ArrayList<String> fname = new ArrayList<String>();

                foi.add(ObjectInspectorFactory
                        .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
                foi.add(ObjectInspectorFactory
                        .getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
                foi.add(ObjectInspectorFactory
                        .getStandardMapObjectInspector(
                                PrimitiveObjectInspectorFactory.javaStringObjectInspector,
                                PrimitiveObjectInspectorFactory.javaIntObjectInspector));
                fname.add("uidSet");
                fname.add("imeiSet");
                fname.add("imeiMap");
                return ObjectInspectorFactory.getStandardStructObjectInspector(
                        fname, foi);
            } else {
                return PrimitiveObjectInspectorFactory.javaLongObjectInspector;
            }
        }

        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            if (parameters.length != 3) {
                return;
            }
            DivideAB dab = (DivideAB) agg;
            int check = PrimitiveObjectInspectorUtils.getInt(parameters[0],
                    flagIO);
            String uid = PrimitiveObjectInspectorUtils.getString(parameters[1],
                    uidIO);
            String imei = PrimitiveObjectInspectorUtils.getString(
                    parameters[2], imeiIO);
            if (check == 1) {// 登录用户
                dab.uidSet.add(uid);
            } else {// 匿名用户
                dab.imeiSet.add(imei);
            }
            if (dab.imeiMap.containsKey(imei)) {
                int flag = dab.imeiMap.get(imei);
                if (flag < 2 && flag != check) {
                    dab.imeiMap.put(imei, 2);
                }
            } else {
                dab.imeiMap.put(imei, check);
            }
        }

        @Override
        public Object terminatePartial(AggregationBuffer agg)
                throws HiveException {
            DivideAB myagg = (DivideAB) agg;
            // 存储中间结果
            Object[] partialResult = new Object[3];
            partialResult[0] = new ArrayList<String>(myagg.uidSet);
            partialResult[1] = new ArrayList<String>(myagg.imeiSet);
            partialResult[2] = new HashMap<String, Integer>(myagg.imeiMap);
            return partialResult;
        }

        @SuppressWarnings("unchecked")
        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            if (partial != null) {
                DivideAB dab = (DivideAB) agg;
                Object uidSet = map2red
                        .getStructFieldData(partial, uidSetField);
                Object imeiSet = map2red.getStructFieldData(partial,
                        imeiSetField);
                Object imeiMap = map2red.getStructFieldData(partial,
                        imeiMapField);

                List<Object> uidlist = (List<Object>) uidSetIO.getList(uidSet);
                System.err.println("uidList = " + uidlist.size());

                if (uidlist != null) {
                    System.err.println("uidSet = " + dab.uidSet.size());
                    for (Object obj : uidlist) {
                        dab.uidSet.add(obj.toString());
                    }
                }

                List<Object> imeilist = (List<Object>) uidSetIO
                        .getList(imeiSet);
                if (imeilist != null) {
                    for (Object obj : imeilist) {
                        dab.imeiSet.add(obj.toString());
                    }
                }

                Map<String, Integer> imeimap = (Map<String, Integer>) imeiMapIO
                        .getMap(imeiMap);
                for (Entry<?, ?> ele : imeimap.entrySet()) {
                    Object kobj = ele.getKey();
                    String key = kobj.toString();
                    Object vobj = ele.getValue();
                    Object val = vobj.toString();
                    if (dab.imeiMap.containsKey(key)) {
                        int flag = dab.imeiMap.get(key);
                        if (flag < 2
                                && flag != Integer.parseInt(val.toString())) {
                            dab.imeiMap.put(key, 2);
                        }
                    } else {
                        dab.imeiMap.put(key, Integer.parseInt(val.toString()));
                    }
                }
            }
        }

        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            DivideAB dab = (DivideAB) agg;
            int mix = 0;
            for (int val : dab.imeiMap.values()) {
                if (val == 2) {
                    mix++;
                }
            }
            return (long) (dab.uidSet.size() + dab.imeiSet.size() - mix);
        }
    }
}

又有工作要忙了,先把代码放上来,下次再写分析

日活跃用户统计函数

时间: 2024-10-11 22:03:47

日活跃用户统计函数的相关文章

redis 用setbit(bitmap)统计活跃用户

getspool.com的重要统计数据是实时计算的.Redis的bitmap让我们可以实时的进行类似的统计,并且极其节省空间.在模拟1亿2千8百万用户的模拟环境下,在一台MacBookPro上,典型的统计如“日用户数”(dailyunique users) 的时间消耗小于50ms, 占用16MB内存.Spool现在还没有1亿2千8百万用户,但是我们的方案可以应对这样的规模.我们想分享这是如何做到的,也许能帮到其它创业公司. Bitmap以及Redis Bitmaps快速入门(Crash Cour

支撑5亿用户、1.5亿活跃用户的Twitter最新架构详解及相关实现

如果你对项目管理.系统架构有兴趣,请加微信订阅号"softjg",加入这个PM.架构师的大家庭 摘要:Twitter出道之初只是个奋斗在RoR上的小站点,而如今已拥有1.5亿的活跃用户,系统日传输tweet更多达4亿条,并已完成了以服务为核心的系统架构蜕变. Twitter如今在世界范围内已拥有1.5亿的活跃用户,为了给用户生成timeline(时间轴)需支撑30万QPS,其firehose每秒同样生成22MB数据.整个系统每天传输tweet 4亿条,并且只需要5分钟就可以让一条twe

通向高可扩展性之路(推特篇) ---- 一个推特用来支撑1亿5千万活跃用户、30万QPS、22MB每秒Firehose、以及5秒内推送信息的架构

原文链接:http://highscalability.com/blog/2013/7/8/the-architecture-twitter-uses-to-deal-with-150m-active-users.html 写于2013年7月8日,译文如下: “可以解决推特所面临的挑战”的玩具般的方案是一个常用在扩展性上的比喻.每个人都觉得推特很容易实现.稍微具备一些系统架构的知识我们就可以构建一个推特,就这么简单.但是根据推特软件开发部门的VP Raffi Krikorian在 Timelin

用户频次分布、新增用户留存、活跃用户留存

select * from a WHERE a.field1 NOT IN (select field1 from b) select * from a WHERE NOT EXISTS (select 1 from b where a.field1 = b.field1) 表a的条件加在最后,表b的条件加在括弧中. select id from aa left join bb on aa.id=bb.id and bb.id is null select count(uid) as onl,d

用Redis bitmap统计活跃用户、留存

用Redis bitmap统计活跃用户.留存 Spool的开发者博客,描述了Spool利用Redis的bitmaps相关的操作,进行网站活跃用户统计工作. 原文:http://blog.getspool.com/2011/11/29/fast-easy-realtime-metrics-using-redis-bitmaps/ Redis支持对String类型的value进行基于二进制位的置位操作.通过将一个用户的id对应value上的一位,通过对活跃用户对应的位进行置位,就能够用一个value

微信小程序活跃用户破4亿 小程序靠什么能成为实体店引流神器?

2017年12月,以"跳一跳"为代表的微信小游戏让人们认识到了小程序的流量潜力.在微信的流量红利下,小程序生态发展迅速,成为开发者们中炙手可热的新风口.2018年初,小程序做到了日活1.7亿.月活4.3亿,参与开发者100多万. 近日,TalkingData发布2018微信小程序洞察报告--<场景+链接,数据视角下的小程序浪潮>.报告预测,2018年小程序数量将突破250万,数量超越AppStore应用总和. 小程序一季度活跃用户破4亿 游戏类别最"受宠"

5.2亿活跃用户“不活跃”?有点山寨的美图电商尴尬了

前不久,美图正式上线了独立的电商平台--"美铺". 根据美图公布的信息,美铺被定义为一个时尚分享购物社区.有别于传统的B2C或C2C模式,美铺采取的是B2C2C的买手模式,即通过包括买手.网红.KOL在内的时尚达人来连接品牌商和消费者. 文/张书乐(TMT行业观察者.游戏产业时评人,人民网.人民邮电报专栏作者) 刊载于<计算机应用文摘>2017年5月上旬刊 然而,这并没有多少新鲜感.或者说,这不过是微商式的KOL版. 亦步亦趋学网易?这个可能有 把潮自拍的定制功能单拿了出来

[PHP]基于Sort Set进行活跃用户统计

作者:zhanhailiang 日期:2014-12-14 参考文章: 使用Redis bitmap进行活跃用户统计 本文提供基于Sort Set进行活跃用户统计的PHP版本: https://github.com/billfeller/billfeller.github.io/blob/master/code/UserTj.php

QlikView报表显示连续若干个月内活跃用户的数量

之前有朋友提到了这样一个需求,要计算三年中每年都有销售记录的客户量,只有最近两年有销售纪录的客户量(假如某个用户2012年和2014年都有记录,在2013年没有则不计算在内),以及最近一年的新增客户数量.这里大概描述一下我的思路. Sales表有两个字段Id, Year,要求除上述结果需要Group by Id,然后算Count(DISTINCT Year)的值要等于Min(Year)和Max(Year)的差+1,比如下面关于记录: Year, Id 2014, 14 2013, 14 2012