实时分析之客户画像项目实践


客户画像的背景描写叙述

原来的互联网,以解决用户需求为目的。衍生出众多的网联网产品,以及产生呈数量级递增的海量数据。当用户需求基本得到满足的时候,须要分析这些海量的数据。得以达到最高效的需求实现,最智能的功能服务。以及最精准的产品推荐,最后提升产品的竞争力。简言之,产品由原来的需求驱动转换成数据驱动。

客户画像就是数据驱动的代表作之中的一个。详细点讲,客户画像就是用户的标签(使用该产品的群体),程序能自己主动调整、组合、生成这些标签,最后再通过这些标签。达到精准营销的目的。


当前流行的实时分析框架

首先一提到大数据,大家脑海中浮现的肯定是Hadoop。可是须要实时分析出结果的话,那Hadoop就力不从心了(先不讲数据多少,单单启动一个M/R就要几分钟的时间),假设没有实时性需求的产品分析则另当别论。

当下最流行的三大实时分析框架各自是Apache SparkApache SamzaApache Storm。以下是网上找到的三大框架的说明和对照:

三者的总体框架类似,仅仅是各个节点的名字和术语不一样罢了

Storm和Samza在消息发送处理的机制上是至少一次,而Spark是有且仅此一次,换句话讲。Storm和Samza可能存在反复发送数据的情况;在消息处理上,Spark是秒级的,而Storm和Samza是压秒级的(性能都不错,压秒级的也还是能够接受^_^);在语言支持上,这个Storm貌似多点。

另外,Storm开源的也比較早,社区比較活跃。版本号迭代的比較快,文档相对来说也比較多。Storm相对Spark也比較轻量级,上手简单,这就是作者选择Storm的原因,只是个人还是推荐Spark的。


环境准备、搭建和执行

以下是作者使用的软件版本号

1. kafka2.11

2. zookeeper3.5.1

3. storm0.9.5

JDK的环境。这个都不明确的人也不用继续看下去了。

作者在測试环境准备了4台虚拟机,改动每台虚拟机的/etc/hosts

172.16.2.235 master
172.16.2.231 slave1
172.16.2.236 slave2
172.16.2.241 slave3

235是主节点,其余三个是子节点,在主节点做好子节点免登录权限设置

主机执行

    ssh-keygen -t dsa -P ‘‘ -f ~/.ssh/id_dsa
    mv id_dsa.pub authorized_keys
    chmod 600 authorized_keys
    scp ~/.ssh/authorized_keys root@slave1:/root/.ssh/
    scp ~/.ssh/authorized_keys root@slave2:/root/.ssh/
    scp ~/.ssh/authorized_keys root@slave3:/root/.ssh/

(复制到各个从机上去)

每个从机都ssh进入一次 记录从机信息

  • zookeeper

zookeeper是大数据必备的框架之中的一个。它是一个分布式的。开放源代码的分布式应用程序协调服务,你能够理解成每个子节点的任务控制中心

解压

tar -zxvf zookeeper-3.5.1-alpha.tar.gz

配置

conf/zoo.cfg

initLimit=10
syncLimit=5
clientPort=2181
tickTime=2000
autopurge.purgeInterval=12
autopurge.snapRetainCount=3
dataDir=/home/zookeeper-3.5.1-alpha/data
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
server.3=slave3:2888:3888

注意:须要在/home/zookeeper-3.5.1-alpha/data文件夹下创建一个myid文件,写入该机的序列号,虚拟机就1。2累加下去

echo 0 >> /home/zookeeper-3.5.1-alpha/data/myid

启动

/home/zookeeper-3.5.1-alpha/bin/zkServer.sh start &

jps一下。列表中出现QuorumPeerMain进程则代表启动OK(各个子节点也启动起来,以下的服务都依赖zookeeper)。

  • kafka

kafka,中文名叫卡夫卡,是一种高吞吐量的分布式公布订阅消息系统,它能够处理消费者规模的站点中的全部动作流数据。

简言之,就是数据採集、发送器。

解压

tar -zxvf kafka_2.11-0.8.2.0.tgz

配置。改动

config/server.properties

broker.id=0
port=9092
num.network.threads=3
num.io.threads=8
host.name=master
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/kafka_2.11-0.8.2.0/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=master:2181,slave1:2181,slave2:2181,slave3:2181
zookeeper.connection.timeout.ms=6000
#真正删除topic
delete.topic.enable=true

注意:这里的broker.id在各个子节点也不能反复

启动

/home/kafka_2.11-0.8.2.0/bin/kafka-server-start.sh /home/kafka_2.11-0.8.2.0/config/server.properties &

jps一下,列表中出现Kafka进程则代表启动OK。

验证kafka集群执行是否正常:

订阅日志

在log服务器上安装kafka,仅仅解压就好了。不须要配置,然后订阅log

tail -0f /home/bigdata/logs/analytics.log | /home/kafka_2.11-0.8.2.0/bin/kafka-console-producer.sh --broker-list master:9092,slave1:9092,slave2:9092,slave3:9092 --topic bigdata_app_logs &

将最新一行的日志文件传输到kafka集群。消息队列叫做bigdata_app_logs(这个ID在kafka集群中唯一)

再查询队列列表

./kafka-topics.sh --list --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181

将会出现刚刚订阅的topic:bigdata_app_logs

./kafka-console-consumer.sh --zookeeper master:2181,slave1:2181,slave2:2181,slave3:2181 --topic topic:bigdata_app_logs --from-beginning

将会实时同步log服务器上面的日志。这样,kafka集群环境就搭建OK了

以下是作者自己整理的kafka流程图:

这里日志採集有两种方式。一种是站点程序通过log4j记录的log文件,然后再客户端执行,也就是上面介绍的那种。

还有一种就是通过KafkaLog4jAppender之间讲日志传输到kafka集群,须要引入一个jar包

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>0.8.2.0</version>
        </dependency>

在log4j的两种配置配置

log4j.logger.com.jjshome.bigdata.controller.CommonController=INFO,KAFKA_HIVE_AUDIT
log4j.appender.KAFKA_HIVE_AUDIT=kafka.producer.KafkaLog4jAppender
log4j.appender.KAFKA_HIVE_AUDIT.BrokerList=master:9092,slave1:9092,slave2:9092,slave3:9092
log4j.appender.KAFKA_HIVE_AUDIT.Topic=bigdata_app_logs
log4j.appender.KAFKA_HIVE_AUDIT.layout=org.apache.log4j.PatternLayout
log4j.appender.KAFKA_HIVE_AUDIT.layout.ConversionPattern=%m%n
log4j.appender.KAFKA_HIVE_AUDIT.ProducerType=async
    <!-- kafka -->
    <appender name="KAFKA_HIVE_AUDIT" class="kafka.producer.KafkaLog4jAppender">
        <param name="DatePattern" value="‘.‘yyyy-MM-dd"/>
        <param name="BrokerList" value="master:9092,slave1:9092,slave2:9092,slave3:9092"/>
        <param name="Topic" value="jjs-fang-web-bigDatas"/>
        <param name="ProducerType" value="async"/>
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %x - %m%n"/>
        </layout>
    </appender>

个人建议使用另外一种。可是要做好服务器之间的容错机制,作者前期就吃过亏,在採集日志的时候,直接影响了业务流程。

  • storm

这里就不介绍了

解压

tar -zxvf apache-storm-0.9.5.tar.gz

配置

conf/storm.yaml

 storm.zookeeper.servers:
     - "master"
     - "slave1"
     - "slave2"
     - "slave3"

 storm.local.dir: "/home/storm/data"
 nimbus.host: "master"
 supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703
 ui.port: 80

子节点配置都一样。直接丢过去就好了

启动

作者是在主节点启动nimbus和ui、supervisor,其它的三个节点启动supervisor

主节点

storm nimbus &
storm ui &
storm supervisor &

jps后出现nimbus和core、supervisor的进程,或者直接訪问http://master就可以(端口配置的是80)

:这里作者配置了环境变量。所以能够直接storm

子节点分别都执行

storm supervisor &

以下是作者画的storm结构图

后面的数据落地,是结合业务。将数据存储起来

好了,到此环境以及准备完毕。

若是要关闭各种进程。直接jps后直接kill掉。


Topology开发

topology是storm中job的别名,它的工作流程大概如图:

这里spout消息发送源,bolt是数据处理节点,计算出来的记过能够多次使用

项目准备:

storm-lib.zip

[big-data-client]

[big-data-storm]

第一个作者开发的Topology须要的lib包,将该lib替换到全部storm集群的storm/lib下

第二个作者开发环境须要的中间件,第三个storm项目。

项目中有两个案例。一个TopN案例。一个客户画像案例(针对自自有业务的客户画像)

bolt是工作节点,remote是外部调用的数据接口,spout是消息源,topology是job主文件夹。

以下是客户画像的

Topology

package com.jjshome.storm.topology;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import storm.kafka.BrokerHosts;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import backtype.storm.Config;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

import com.google.common.collect.ImmutableList;
import com.jjshome.storm.bolt.house.BoltFCWSplit;
import com.jjshome.storm.bolt.house.BoltLogFormat;
import com.jjshome.storm.bolt.house.BoltLogFormat4App;
import com.jjshome.storm.bolt.house.BoltSave;
import com.jjshome.storm.bolt.house.BoltThreshold;
import com.jjshome.storm.utils.CommonConstant;
import com.jjshome.storm.utils.StormRunner;

/**
 * @功能描写叙述: 用户行为分析的Topology
 * @项目版本号: 1.0.0
 * @项目名称: 大数据
 * @相对路径: com.jjshome.storm.topology.UserLogTopology.java
 * @创建作者: 欧阳文斌
 * @问题反馈: [email protected]
 * @创建日期: 2015年12月7日 上午10:20:27
 */
public class UserLogTopology {
    private static Logger logger = LoggerFactory.getLogger(UserLogTopology.class);
    /** 本地调试执行时间单位(秒) */
    private static final int DEFAULT_RUNTIME_IN_SECONDS = 60*30;
    /** kafka集群 */
    private static final String kafka_zookeeper_local = "master:2181,slave1:2181,slave2:2181,slave3:2181";
    private static final String kafka_zookeeper_online = "bigdata-99-51-master.jjshome.com:2181,bigdata-99-52-slave.jjshome.com:2181,bigdata-99-53-slave.jjshome.com:2181,bigdata-99-54-slave.jjshome.com:2181";
    /** Storm集群列表 */
    private static final List<String> zk_servers_local = ImmutableList.of("master","slave1", "slave2", "slave3");
    private static final List<String> zk_servers_online = ImmutableList.of("bigdata-99-51-master.jjshome.com","bigdata-99-52-slave.jjshome.com", "bigdata-99-53-slave.jjshome.com", "bigdata-99-54-slave.jjshome.com");

    private static Config createTopologyConfiguration() {
        Config conf = new Config();
        //是否是本地模式
        conf.setDebug(CommonConstant.IS_LOCAL?

true:false);
        //设置工作机数量
        conf.setNumWorkers(CommonConstant.IS_LOCAL?4:16);
        return conf;
    }

    /**
     * @功能描写叙述: 获取KafkaConfig
     * @创建作者: 欧阳文斌
     * @创建日期: 2015年12月11日 下午2:08:36
     * @return
     */
    private static KafkaSpout getKafkaSpout(){
        // 房产网 bigdata日志的消息
        String kafkaZookeeper = CommonConstant.IS_LOCAL?kafka_zookeeper_local:kafka_zookeeper_online;
        BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
        SpoutConfig kafka_config_fang = new SpoutConfig(brokerHosts,
                "jjs-fang-web-bigDatas", "/jjs-fang-web-bigDatas", "jjs-fang-web-bigDatas");
        kafka_config_fang.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafka_config_fang.zkServers = CommonConstant.IS_LOCAL?zk_servers_local:zk_servers_online;
        kafka_config_fang.zkPort = 2181;
        return new KafkaSpout(kafka_config_fang);
    }

    /**
     * @功能描写叙述: 获取KafkaConfig
     * @创建作者: 欧阳文斌
     * @创建日期: 2015年12月11日 下午2:08:36
     * @return
     */
    private static KafkaSpout getKafkaSpout_App(){
        // 房产网 bigdata日志的消息
        String kafkaZookeeper = CommonConstant.IS_LOCAL?kafka_zookeeper_local:kafka_zookeeper_online;
        BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
        SpoutConfig kafka_config_fang = new SpoutConfig(brokerHosts,
                "bigdata_app_logs", "/bigdata_app_logs", "bigdata_app_logs");
        kafka_config_fang.scheme = new SchemeAsMultiScheme(new StringScheme());
        kafka_config_fang.zkServers = CommonConstant.IS_LOCAL?zk_servers_local:zk_servers_online;
        kafka_config_fang.zkPort = 2181;
        return new KafkaSpout(kafka_config_fang);
    }

    public static void main(String[] args) {
        //Topology构造器
        TopologyBuilder builder = new TopologyBuilder();
        String topologyName = "UserLogTopology";
        //配置器
        Config topologyConfig = createTopologyConfiguration();
        int runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;

        final String app_index = "s_app";
        final String pc_index = "s_pc";
        final String fcwsplit_index = "b_fcwsplit";
        final String logformat_index = "b_logformat";
        final String logformatapp_index = "b_logformatapp";
        //final String mongodb_index = "b_mongodb";
        final String threshold_index = "b_threshold";
        final String save_index = "b_save";
        //设置 手机app log日志源
        builder.setSpout(app_index, getKafkaSpout_App(), 4).setNumTasks(4);
        //设置 房产网日志源
        builder.setSpout(pc_index, getKafkaSpout(), 8).setNumTasks(8);

        //房产网日志分割和过滤
        builder.setBolt(fcwsplit_index, new BoltFCWSplit(), 8).setNumTasks(8).shuffleGrouping(pc_index);

        //日志格式化
        builder.setBolt(logformat_index, new BoltLogFormat(), 4).setNumTasks(4).shuffleGrouping(fcwsplit_index);
        //手机日志格式化
        builder.setBolt(logformatapp_index, new BoltLogFormat4App(), 4).setNumTasks(4).shuffleGrouping(app_index);

        //存储 _USER_INTENTION 到mongoDB
        /*builder.setBolt(mongodb_index, new BoltMongo(), 2)
        .shuffleGrouping(logformat_index)
        .shuffleGrouping(logformatapp_index);*/

        //数据 阀 控制
        builder.setBolt(threshold_index, new BoltThreshold(2,60), 6).setNumTasks(6)
        .fieldsGrouping(logformat_index, new Fields("ip"))
        .fieldsGrouping(logformatapp_index, new Fields("ip"));

        //数据落地
        builder.setBolt(save_index, new BoltSave(), 4).setNumTasks(4).fieldsGrouping(threshold_index, new Fields("ip"));
        try {
            StormRunner.runTopologyLocally(builder.createTopology(), topologyName,
                    topologyConfig, runtimeInSeconds);
        } catch (Exception e) {
            logger.error("[email protected]", e);
        }
    }
}

builder的整个构建过程,实际上也就是数据流的加工过程。

kafka的spout是引用第三方的jar,pom中有配置。

bolt

package com.jjshome.storm.bolt.house;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.jjshome.bigdata.entity.log._JJS_Log;
import com.jjshome.bigdata.util.SystemConstant;

/**
 * @功能描写叙述: 房产网日志解析
 * @项目版本号: 1.0.0
 * @项目名称: 大数据
 * @相对路径: com.jjshome.storm.bolt.BoltFCWSplit.java
 * @创建作者: 欧阳文斌
 * @问题反馈: [email protected]
 * @创建日期: 2015年12月11日 下午2:20:07
 */
public class BoltFCWSplit implements IRichBolt {
    private static final long serialVersionUID = 1L;
    private Logger logger = LoggerFactory.getLogger(BoltFCWSplit.class);
    private OutputCollector collector;

    /** 用户行为分析的LOG正则 */
    private static Pattern s = Pattern.compile(""
            //时间
            + "(.*?),.*"
            //类别
            + "(YslHouseController|EsfHouseController|ZfHouseController|AgentInfoController|YywtController).*"
            //ip
            + "ip=(.*?

),.*"
            //cityCode
            + "cityCode=(.*?),.*"
            //userId
            + "userId=(.*?),.*"
            //phone
            + "phone=(.*?

),.*"
            //refererAddress
            + "refererAddress=(.*?),.*"
            //accessAddress
            + "accessAddress=(.*?),.*"
            //tags
            + "tags=(.*?),.*"
            //keyWord
            + "keyWord=(.*?),.*"
            //cookiesId
            + "cookiesId=(.*?

),.*");

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("object"));
    }

    @SuppressWarnings("rawtypes")
    @Override
    public void prepare(Map stormConf, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple input) {
        String msg = "NOTHING";
        try {
            //获取消息流
            msg = input.getString(0);
            //异常日志推断
            if(msg!=null&&msg.length()<1000){
                //正则匹配
                Matcher sm = s.matcher(msg);
                if(sm.find()){
                    //LOG日志格式转换这对象
                    _JJS_Log jjsLog = new _JJS_Log();
                    log2entity(sm, jjsLog);
                    if (jjsLog.getUrl_type() == 5) {
                        if (jjsLog.getNew_url() != null
                                && jjsLog.getNew_url().indexOf("saveReserveOrderInfo") > -1
                                && !"".equals(jjsLog.getUserId())
                                && null != jjsLog.getUserId()) {
                            //发送消息到下一个bolt
                            collector.emit(new Values(jjsLog));
                        }
                    } else {
                        //发送消息到下一个bolt
                        collector.emit(new Values(jjsLog));
                    }
                }
            }
        } catch (Exception e) {
            //错误记录做记录  不须要反复发送
            logger.error("[email protected] "+msg, e);
        } finally {
            //消息处理成功
            collector.ack(input);
        }
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    /**
     * @功能描写叙述: log日志转化
     * @创建作者: 欧阳文斌
     * @创建日期: 2015年12月15日 上午11:34:45
     * @param sm
     * @param jjsLog
     */
    private void log2entity(Matcher sm, _JJS_Log jjsLog){
        if(sm!=null&&jjsLog!=null){
            int i=0;
            jjsLog.setS_date(sm.group(++i));
            jjsLog.setType(SystemConstant.FCW_INDEX);
            String type = sm.group(++i);
            if(StringUtils.isNotEmpty(type)){
                if(type.equals("YslHouseController")){
                    jjsLog.setUrl_type(1);
                }else if(type.equals("EsfHouseController")){
                    jjsLog.setUrl_type(2);
                }else if(type.equals("ZfHouseController")){
                    jjsLog.setUrl_type(3);
                }else if(type.equals("AgentInfoController")){
                    jjsLog.setUrl_type(4);
                } else if(type.equals("YywtController")){
                    jjsLog.setUrl_type(5);
                }
            }
            jjsLog.setIp(sm.group(++i));
            jjsLog.setCityCode(sm.group(++i));
            jjsLog.setUserId(sm.group(++i));
            jjsLog.setTel_num(sm.group(++i));
            jjsLog.setOld_url(sm.group(++i));
            jjsLog.setNew_url(sm.group(++i));
            jjsLog.setTags(sm.group(++i));
            jjsLog.setKeyWord(sm.group(++i));
            jjsLog.setCookies(sm.group(++i));
        }
    }
}

bolt中就是数据的逻辑处理,关键的方法是input.getString(0);获取数据,collector.emit(new Values(jjsLog));发送数据,collector.ack(input);告诉前一个发送者,信息处理成功。

在topology的grouping策略就是在Spout与Bolt、Bolt与Bolt之间传递Tuple的方式。总共同拥有七种方式:

1)shuffleGrouping(随机分组)

2)fieldsGrouping(依照字段分组,在这里即是同一个单词仅仅能发送给一个Bolt)

3)allGrouping(广播发送。即每个Tuple。每个Bolt都会收到)

4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)

5)noneGrouping(随机分派)

6)directGrouping(直接分组,指定Tuple与Bolt的相应发送关系)

7)Local or shuffle Grouping

8)customGrouping (自己定义的Grouping)

经常使用的也就是随机分组、按字段分组以及全局分组。

在自己Topology开发完毕后。能够讲执行模型改动成本地,然后执行Topology,方便进行调试。若是要公布到进群环境中。则将Storm项目打包,maven install(作者是maven项目),将打好的jar上传到nimbus服务器。

storm jar storm-kafka-topology.jar com.jjshome.storm.topology.UserLogTopology

在jar的根文件夹上传jar到storm集群中。后面的类名是一个带main的topology,也就是上面的客户画像的topology。

公布成功后,能够在UI界面看到topology的执行情况,各个节点的日志处理数量,延迟时间

topology执行起来后,能够在各个数据存储的节点中。获取storm实时分析的结果。通过分析的结构,得到各个用户实时的各种标签,最后通过这些标签。在产品库中筛选最匹配的产品。

以下是作者的客户画像架构图

数据流程

1.用户操作产生日志

2.kafka收集日志

3.Storm分析处理日志

1)日志详情存储到mongoDB

2)半小时外意向模型存储到mongoDB

3)半小时内意向模型存储到redis

4)假设用户登录后的操作。则唤醒mongodb中全部的半小时意向模型,又一次组装模型更新到mysql热表中

5)监控日志,假设发生预警事件操作,则触发意向模型以及精准推荐的生成

模型构建

在生成各种标签集合时。要增加权重因子(可变),针对不同产品。构建不同标签,再对各种操作以及权重因子。来生产用户标签。

深度分析能够考虑增加机器学习在里面。


开发问题和运维问题的分析和解决

Q:在搭建集群的时候,通过UI看到各个节点的主机名一样。都是localhost,导致topology全然不工作。

A:检測各个虚拟机的hostname,保持和hosts中配置的一致,再重新启动zookeeper和storm集群


Q:在公布topology到集群上后。在UI界面中看到各种class找不到的错误

A:将storm项目中的lib打包统一都放到storm中lib,这里要注意jar包冲突和版本号问题


Q:在日志累加的时候。fail的日志越来越多,导致延迟越来越大

A:这个问题跟业务处理有关系,检查出现故障的bolt,通过删剪法,反复提交測试,找出有问题的代码


Q:发现设置的works节点不生效,实际的比设置的少非常多

A:检查topology的配置器,是不是本地模式。


Q:数据实时处理,怎么才干高效的让数据落地

A:作者这里用了滚筒模式,累积半小时的数据。再统一存储。半小时以内的。直接存放在redis集群中


Q:在使用kafka的producer命令监控日志的时候,老是出现日志终端的现象

A:看看log4j是否配置了日志时间戳,由于开启了时间戳,日志将会定时或不定时的将文件重命名,然后新开硬盘地址做存储。这样kafka是没有办法获取新的log硬盘地址。

解决的方法:换用KafkaLog4jAppender方式,或者让log文件不替换,每天定时清理一次就好了


Q:kafka集群服务器硬盘空间满了

A:在没有什么设定的操作下,kafka收到的日志会存储在硬盘中,终究有一天。硬盘会满掉。解决的方法:在各个节点增加crontab计划

0 6 * * * /home/zookeeper/bin/zkCleanup.sh -n 3
时间: 2024-10-22 02:21:17

实时分析之客户画像项目实践的相关文章

《Java项目实践》:简单聊天程序

<Java项目实践>:简单聊天程序 由于这个简单程序,还是涉及到很多的知识点,下面我们就一点一点的来完成. 我们熟悉的QQ聊天,就有一个界面,是吧,我们自己做一个简单的聊天程序,因此我们也就需要为Client写一个界面.因此,这就是我们第一步需要完成的任务. 第一步:为Client端写一个界面 完成一个界面有两种方法,一种是使用Frame对象来完成,另外一种是继承JFrame类来完成.本项目使用第二种. 第二种继承JFrame类完成的界面的程序如下: public class ChatClie

Django项目实践4 - Django网站管理(后台管理员)

http://blog.csdn.net/pipisorry/article/details/45079751 上篇:Django项目实践3 - Django模型 Introduction 对于某一类站点, 管理界面 是基础设施中很重要的一部分. 这是以网页和有限的可信任管理者为基础的界面,它能够让你加入,编辑和删除站点内容. 常见的样例: 你能够用这个界面公布博客,后台的站点管理者用它来润色读者提交的内容,你的客户用你给他们建立的界面工具更新新闻并公布在站点上.这些都是使用管理界面的样例. 创

走在大数据的边缘 基于Spark的机器学习-智能客户系统项目实战(项目实战)

38套大数据,云计算,架构,数据分析师,Hadoop,Spark,Storm,Kafka,人工智能,机器学习,深度学习,项目实战视频教程 视频课程包含: 38套大数据和人工智能精品高级课包含:大数据,云计算,架构,数据挖掘实战,实时推荐系统实战,电视收视率项目实战,实时流统计项目实战,离线电商分析项目实战,Spark大型项目实战用户分析,智能客户系统项目实战,Linux基础,Hadoop,Spark,Storm,Docker,Mapreduce,Kafka,Flume,OpenStack,Hiv

个推用户画像的实践与应用

"以用户为核心"的概念在互联网时代深入人心,然而要真正了解用户懂得用户,就不得不提到"用户画像". 随着大数据技术的深入研究与应用,借助用户画像,企业或APP可以深入挖掘用户需求,从而实现精细化运营以及为精准营销打下坚实基础.本文将重点介绍何为用户画像,用户画像的构建流程以及应用场景. 用户画像,本质是数据能力的体现 用户画像,即用户信息的标签化,而从本质上来说,用户画像是数据的标签化.常见的用户画像体系有三种:结构化体系.非结构化体系和半结构化体系.非结构化体系没

Hangfire项目实践

Hangfire项目实践分享 Hangfire项目实践分享 目录 Hangfire项目实践分享 目录 什么是Hangfire Hangfire基础 基于队列的任务处理(Fire-and-forget jobs) 延迟任务执行(Delayed jobs) 定时任务执行(Recurring jobs) 延续性任务执行(Continuations) 与quartz.net对比 Hangfire扩展 Hangfire Dashborad日志查看 Hangfire Dashborad授权 IOC容器之Au

linux驱动开发重点关注内容--摘自《嵌入式Linux驱动模板精讲与项目实践》

本文摘自本人拙著 <嵌入式Linux驱动模板精讲与项目实践> 初步看起来Linux设备驱动开发涉及内容非常多,而须要实现驱动的设备千差万别.事实上做一段时间驱动之后回首看来主要就是下面几点: (1)对驱动进行分类.先归纳为哪个类型的驱动.归类正确再利用内核提供的子系统进行开发,往往会发现事实上非常多通用的事情内核已经帮我们做了,一个优秀的驱动project师应该最大程度上利用内核的资源.内核已经实现的毕竟稳定性强.可移植性高. (2)找到内核的提供的子系统.接下来就是要制作该子系统对该类设备提

第六周作业:《人月神话》对我做项目实践的启示(一)

<人月神话>这本书有两个老师都有给我们推荐,第一个老师推荐时不以为然,第二个老师也推荐时,自己感觉应该是挺重要的吧,于是去图书馆借了这本书来看,刚借回来时,总觉得时间不够.作业很多,也没来的及看,就一直搁置在了那里,直到上周,在我们的项目实践开始近三周,但进度却一直赶不上来的情况下,看到了这本书,才拿起来看.目前还没看完,先写一点儿领悟到的东西. 作者从焦油坑,提出项目失败的表现,把过去几十年的大型系统开发比作一个炼焦坑,各种团队一个个地淹没在焦油坑,他们都试图解决面对的问题,但他们都必须去了

MVC项目实践,在三层架构下实现SportsStore-02,DbSession层、BLL层

SportsStore是<精通ASP.NET MVC3框架(第三版)>中演示的MVC项目,在该项目中涵盖了MVC的众多方面,包括:使用DI容器.URL优化.导航.分页.购物车.订单.产品管理.图像上传......是不错的MVC实践项目,但该项目不是放在多层框架下开发的,离真实项目还有一段距离.本系列将尝试在多层框架下实现SportsStore项目,并用自己的方式实现一些功能. 本篇为系列第二篇,包括: ■ 4.三层架构设计    □ 4.2 创建DbSession层 数据访问层的统一入口   

Linux开源模块移植概述暨交叉编译跨平台移植总结--摘自《嵌入式Linux驱动模板精讲与项目实践》

本文摘自<嵌入式Linux驱动模板精讲与项目实践>一书中的"开发与调试技巧". Linux的强大威力就在于有很多开源项目可以使用,通常很多需求可以通过寻找相关的开源模块做为快速解决方案.要把这些开源模块应用到嵌入式中,其中一个关键点就是要使用交叉编译工具对开源项目进行交叉编译. 根据具体情况,下载的开源项目在组织上有很多情况,在此对各种情况进行归类介绍. 1. 下载的开源软件包找不到Makefile 对于这种开源包通常是采用configure的方式组织的,那么第一步就是使用