茄子快传数据分析之原理分析及数据清洗

茄子快传数据分析之原理分析及数据清洗

版权声明:闻道有先后,术业有专攻。 https://blog.csdn.net/wlk_328909605/article/details/82227410

需求:联想集团有一款app产品叫茄子快传(有上亿的活跃用户,集中在第三世界国家) 
现在需要开发一个数据分析系统,来对app的用户行为数据做各类分析;

原理: 
流程如下图: 

流程简单介绍: 
用户通过茄子的客户端产生数据, 
将使用时间,手机号,ip地址,手机的序列号,app的版本,app的下载渠道等重要信息上传到联想的web日志服务器上,服务器的后台系统打印出日志文件,通过flume(一种日志采集工具)将生成的日志上传到hdfs上,先进行数据清洗,将版本,渠道,用户等重要信息丢失的过滤掉,生成新的文件,数据加载到hive中,进行运算处理,处理后的结果通过sqoop(一种数据迁移工具)保存到关系型数据库中,比如MySql,再通过web服务器,将分析出的结果显示到浏览器上。 
预处理需求(mapreduce): 
1/ 请对app事件请求日志进行预处理: 
a) 过滤掉一些不合法数据(缺失device_id,app_ver_name,os_name,app_token,city,release_channel字段需要过滤) 
b) 将原格式json,解析成csv(逗号分隔的文本)格式,并去掉”events”字段 
c) 在原始数据中,追加一个字段user_id(如果是苹果,就用device_id,如果是android,就用android_id) 
数据预处理的时候,只需要map就可以完成,所以就不需要reduce了。 
处理要求:device_id,app_ver_name,os_name,app_token,city,release_channel 缺失则过滤 
代码如下:

package com.cleanLog;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;

public class AppLogClean {

    public static class MapTask extends Mapper<LongWritable, Text, Text, NullWritable>{
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
                throws IOException, InterruptedException {
            String line = value.toString();
            ObjectMapper mapper = new ObjectMapper();
            JsonNode log = mapper.readTree(line);
            JsonNode header = log.get("header");

            if(StringUtils.isBlank(header.get("device_id").getTextValue())||
                    //也可以直接getString()
                    StringUtils.isBlank(header.get("app_ver_name").getTextValue())||
                    StringUtils.isBlank(header.get("os_name").getTextValue())||
                    StringUtils.isBlank(header.get("app_token").getTextValue())||
                    StringUtils.isBlank(header.get("city").getTextValue())||
                    StringUtils.isBlank(header.get("release_channel").getTextValue())) {
                return;
            }else {
                String user_id = "";
                if (header.get("device_id_type").getTextValue().equals("mac")) {
                    user_id = header.get("device_id").getTextValue();
                } else {
                    user_id = header.get("android_id").getTextValue();
                }

                StringBuilder sb = new StringBuilder();
                sb.append(header.get("cid_sn").getTextValue()).append(",");
                sb.append(header.get("mobile_data_type").getTextValue()).append(",");
                sb.append(header.get("os_ver").getTextValue()).append(",");
                sb.append(header.get("mac").getTextValue()).append(",");
                sb.append(header.get("resolution").getTextValue()).append(",");
                sb.append(header.get("commit_time").getTextValue()).append(",");
                sb.append(header.get("sdk_ver").getTextValue()).append(",");
                sb.append(header.get("device_id_type").getTextValue()).append(",");
                sb.append(header.get("city").getTextValue()).append(",");
                sb.append(header.get("android_id").getTextValue()).append(",");
                sb.append(header.get("device_model").getTextValue()).append(",");
                sb.append(header.get("carrier").getTextValue()).append(",");
                sb.append(header.get("promotion_channel").getTextValue()).append(",");
                sb.append(header.get("app_ver_name").getTextValue()).append(",");
                sb.append(header.get("imei").getTextValue()).append(",");
                sb.append(header.get("app_ver_code").getTextValue()).append(",");
                sb.append(header.get("pid").getTextValue()).append(",");
                sb.append(header.get("net_type").getTextValue()).append(",");
                sb.append(header.get("device_id").getTextValue()).append(",");
                sb.append(header.get("app_device_id").getTextValue()).append(",");
                sb.append(header.get("release_channel").getTextValue()).append(",");
                sb.append(header.get("country").getTextValue()).append(",");
                sb.append(header.get("time_zone").getTextValue()).append(",");
                sb.append(header.get("os_name").getTextValue()).append(",");
                sb.append(header.get("manufacture").getTextValue()).append(",");
                sb.append(header.get("commit_id").getTextValue()).append(",");
                sb.append(header.get("app_token").getTextValue()).append(",");
                sb.append(header.get("account").getTextValue()).append(",");
                sb.append(header.get("app_id").getTextValue()).append(",");
                sb.append(header.get("build_num").getTextValue()).append(",");
                sb.append(header.get("language").getTextValue()).append(",");
                sb.append(user_id);

                context.write(new Text(sb.toString()), NullWritable.get());
            }

        }

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(AppLogClean.class);
            job.setMapperClass(MapTask.class);
            job.setOutputKeyClass(Text.class);

            //设置reduce的数量为0
            job.setNumReduceTasks(0);

            FileInputFormat.addInputPath(job, new Path("D:\\data\\appuserdata\\input\\20170102"));//这里可以设置成参数args[0]
            FileOutputFormat.setOutputPath(job, new Path("D:\\data\\appuserdata\\output\\20170102"));

            boolean completion = job.waitForCompletion(true);//提交的时候也可以是submit,只不过这个是能看到过程。

            System.out.println(completion?"成功":"失败");

        }
    }
}

这样,数据就简单的清理了,只需要将生成的文件再放到集群上就可以用hive进行处理了。

原文地址:https://www.cnblogs.com/timxgb/p/10659952.html

时间: 2024-10-13 04:28:22

茄子快传数据分析之原理分析及数据清洗的相关文章

茄子快传数据分析(一)----数据清理

茄子快传数据分析(一)----数据清理 2018年09月03日 18:41:44 amin_hui 阅读数:117 茄子快传原理 流程图:  数据 “events”: “1473367236143\u00010\u0001connectByQRCode\u0001\u00010\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001\u0001160907223957000

从”茄子快传”看应用程序怎样获取手机已安装程序的apk文件

"茄子快传"是联想开发的一款近距离文件共享软件.它通过wifi-direct(速度飞快,不须要联网)或者普通的网络(速度慢)在不同手机间传递文件. 不知为何.它就火了起来,火的也飞快.当中.共享传输已安装程序文件apk这一功能引起了我强烈的兴趣. 我们知道android对每一个应用的权限做了非常苛刻的控制,每一个应用程序有自己的用户id,每一个应用程序仅仅能訪问自己的数据,比方程序com.android.calculator计算器程序仅仅被同意訪问/data/data/com.andr

从”茄子快传”看应用程序如何获取手机已安装程序的apk文件

"茄子快传"是联想开发的一款近距离文件共享软件,它通过wifi-direct(速度飞快,不需要联网)或者普通的网络(速度慢)在不同手机间传递文件.不知为何,它就火了起来,火的也飞快.其中,共享传输已安装程序文件apk这一功能引起了我强烈的兴趣. 我们知道android对每个应用的权限做了很苛刻的控制,每个应用程序有自己的用户id,每个应用程序只能访问自己的数据,比如程序com.android.calculator计算器程序只被允许访问/data/data/com.android.cal

Android如何实现茄子快传

Android如何实现茄子快传茄子快传是一款文件传输应用,相信大家都很熟悉这款应用,应该很多人用过用来文件的传输.它有两个核心的功能: 端到端的文件传输Web端的文件传输这两个核心的功能我们具体来分析一下! 端到端的文件传输所谓的端到端的文件传输是指应用端发送到应用端(这里的应用端指Android应用端),这种文件传输方式是文件发送端和文件接收端必须安装应用. 效果图文件发送方 文件接收方 简单的文件传输的话,我们可以用蓝牙,wifi直连,ftp这几种方式来进行文件的传输.但是: 蓝牙传输的话,

Hadoop之HDFS原理及文件上传下载源码分析(下)

上篇Hadoop之HDFS原理及文件上传下载源码分析(上)楼主主要介绍了hdfs原理及FileSystem的初始化源码解析, Client如何与NameNode建立RPC通信.本篇将继续介绍hdfs文件上传.下载源解析. 文件上传 先上文件上传的方法调用过程时序图: 其主要执行过程: FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(楼主上篇已经介绍过了) 调用FileSystem的create()方法,由于实现类为Dis

spi协议及工作原理分析

转自----http://blog.csdn.net/skyflying2012/article/details/11710801 一.概述. SPI, Serial Perripheral Interface, 串行外围设备接口, 是 Motorola 公司推出的一种同步串行接口技术. SPI 总线在物理上是通过接在外围设备微控制器(PICmicro) 上面的微处理控制单元 (MCU) 上叫作同步串行端口(Synchronous Serial Port) 的模块(Module)来实现的, 它允

深入理解HTTP协议、HTTP协议原理分析

深入理解HTTP协议.HTTP协议原理分析 目录(?)[+] http协议学习系列 1. 基础概念篇 1.1 介绍 HTTP是Hyper Text Transfer Protocol(超文本传输协议)的缩写.它的发展是万维网协会(World Wide Web Consortium)和Internet工作小组IETF(Internet Engineering Task Force)合作的结果,(他们)最终发布了一系列的RFC,RFC 1945定义了HTTP/1.0版本.其中最著名的就是RFC 26

BT下载原理分析

版权声明:本文为博主原创文章,未经博主允许不得转载. BitTorrent协议. BT全名为BitTorrent,是一个p2p软件,你在下载download的同时,也在为其他用户提供上传upload,因为大家是“互相帮助”,所以不会随着用户数的增加而降低下载速度. 下面是一般用ftp,http等分享流程: 下面是用BitTorrent分享的流程: 其实跟ED也十分相似,ED跟BT不同的地方有: ED--要连上一个固定server BT--没有固定server,只要分享者制作出该分享档案的.tor

Chromium网页CPU光栅化原理分析

Chromium除了支持网页分块GPU光栅化,还支持CPU光栅化.GPU光栅化的特点是快,缺点是硬件差异可能会导差异性,以及不是所有的绘图操作硬件都能很好地支持.CPU光栅化的特点是通用,以及能够支持所有的绘图操作,缺点是较慢,特别是在网页使用硬件加速渲染的情况下,CPU的光栅化结果还需要上传到GPU去渲染.本文接下来将详细分析CPU光栅化的原理,着重描述它是如何快速地光栅化结果上传到GPU去的. 老罗的新浪微博:http://weibo.com/shengyangluo,欢迎关注! 从前面Ch