第99课:使用Spark Streaming+Kafka实战对论坛网站动态行为的多维度分析及java.lang.NoClassDefFoundError问题解决完整内幕版本解密

第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析

/* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道68917580*/

/**

* *第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析

* 论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从

* Kafka中在线Pull到论坛或者网站的用户在线行为信息,进而进行多维度的在线分析

* 数据格式如下:

* date:日期,格式为yyyy-MM-dd

* timestamp:时间戳

* userID:用户ID

* pageID:页面ID

* chanelID:板块的ID

* action:点击和注册   */

生成的用户点击模拟数据如下:

生成模拟数据关键步骤:

启用一个线程,模拟producer生成 的用户点击行为的数据,发送给kakfa

二:我们计算在线不同模块的PV

三:启动hadoop、spark、zookeeper、kafka集群

1.启动hadoop

2.启动spark 集群

3.启动zookeeper

4.将 SparkStreamingDataManuallyProducerForKafka达成jar包文件,使用winscp从本地传到虚拟机

5.启动kafka集群

6.在linux上运行,运行SparkStreamingDataManuallyProducerForKafka的jar包,将生成的数据加载到kafka集群,测试验证kafka上生产者消费者的情况

第一步:kafka建立 topic

kafka-topics.sh --create --zookeeper master:2181,worker1:2181,worker2:2181 --replication-factor 1 --partitions 1 --topic UserLogs

kafka查看 topic

kafka-topics.sh --describe --zookeeper master:2181,worker1:2181,worker2:2181

第二步:运行SparkStreamingDataManuallyProducerForKafka,java.lang.NoClassDefFoundError的解决办法

在linux上运行,运行SparkStreamingDataManuallyProducerForKafka的jar包

当用java -jar SparkStreamingDataManuallyProducerForKafka.jar来运行一个应用程序时,会找不到第三方的jar包。 当使用-jar参数运行时,JVM会屏蔽所有的外部classpath,而只以内部的class作为类的寻找范围。

解决方法:BootStrap class扩展方案

7.kafka消费 topic

在master生产数据:

[email protected]:/usr/local/IMF_testdata# java -Xbootclasspath/a:/usr/local/kafka_2.10-0.8.2.1/libs/kafka_2.10-0.8.2.1.jar:/usrocal/scala-2.10.4/lib/scala-library.jar:/usr/local/kafka_2.10-0.8.2.1/libs/log4j-1.2.16.jar:/usr/local/kafka_2.10-0.8.2.1/libs/metrics-core-2.2.0.jar:/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/kafka-clients-0.8.2.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/slf4j-log4j12-1.6.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/slf4j-api-1.7.6.jar
    -jar SparkStreamingDataManuallyProducerForKafka.jar

在work1上启动消费

[email protected]:~# kafka-console-consumer.sh --zookeeper master:2181,worker1:2181,worker2:2181 --from-beginning --topic UserLogs

8.将OnlineBBSUserLogs达成jar包文件,使用winscp从本地传到虚拟机

9. 为避免敲入命令写错了,写个脚本来执行数据的产生

[email protected]:/usr/local/IMF_testdata# cat ProducerForKafka.sh

java -Xbootclasspath/a:/usr/local/kafka_2.10-0.8.2.1/libs/kafka_2.10-0.8.2.1.jar:/usr/local/scala-2.10.4/lib/scala-library.jar:/usr/local/kafka_2.10-0.8.2.1/libs/log4j-1.2.16.jar:/usr/local/kafka_2.10-0.8.2.1/libs/metrics-core-2.2.0.jar:/usr/local/spark-1.6.1-bin-hadoop2.6/lib/spark-streaming_2.10-1.6.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/kafka-clients-0.8.2.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/slf4j-log4j12-1.6.1.jar:/usr/local/kafka_2.10-0.8.2.1/libs/slf4j-api-1.7.6.jar
    -jar /usr/local/IMF_testdata/SparkStreamingDataManuallyProducerForKafka.jar

[email protected]:/usr/local/IMF_testdata#

10.OnlineBBSUserLogs成功消费数据,并统计出数值,实验成功

附:

OnlineBBSUserLogs的源代码

知识点:

1、创建kafka的createDirectStream,返回JavaPairInputDStream类型的line值

org.apache.spark.streaming.kafka.createDirectStream 源代码

2、读取kafka的数据流的值以后,进行相关mapToPair、reduceByKey的操作

mapToPair-reduceByKey-PairFunction-Function2的源代码

附录生成模拟数据的源代码:

王家林老师 :DT大数据梦工厂创始人和首席专家。

联系邮箱:[email protected] 电话:18610086859 QQ:1740415547

微信号:18610086859 微博:http://weibo.com/ilovepains/

每天晚上20:00YY频道现场授课频道68917580

IMF Spark源代码版本定制班学员  :

上海-段智华  QQ:1036179833 mail:[email protected] 微信 18918561505

时间: 2024-10-13 23:26:47

第99课:使用Spark Streaming+Kafka实战对论坛网站动态行为的多维度分析及java.lang.NoClassDefFoundError问题解决完整内幕版本解密的相关文章

第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.

第4课:Spark Streaming的Exactly-Once的事务处理和不重复输出彻底掌握

前置知识: 1.事务的特征:1).处理且仅被处理一次:2).输出且只被输出一次 2.SparkStreaming进行事务处理有没有可能处理完全失败? 这个可能性不大,因为Spark是批处理的方式来进行流处理,在SparkStreaming应用程序启动的时候,已经为应用程序分配了相关的资源,而且在调度的过程中可以动态的分配资源,所以除非整个集群所有的硬件都奔溃了,否则一般情况下都会被处理的. 3.SparkStreaming写程序的时候是基于Driver和Executor两部分 SparkStre

第15课:Spark Streaming源码解读之No Receivers彻底思考

本期内容: Direct Access Kafka 前面有几期我们讲了带Receiver的Spark Streaming 应用的相关源码解读.但是现在开发Spark Streaming的应用越来越多的采用No Receivers(Direct Approach)的方式,No Receiver的方式的优势: 1. 更强的控制自由度 2. 语义一致性 其实No Receivers的方式更符合我们读取数据,操作数据的思路的.因为Spark 本身是一个计算框架,他底层会有数据来源,如果没有Receive

第4课:Spark Streaming的Exactly-One的事务处理

Spark Streaming的事务处理和关系型数据库的事务的概念有所不同,关系型数据库事务关注的是语句级别的一致性,例如银行转账.而Spark Streaming的事务关注的是某次job执行的一致性.也就是如何保证Job在处理数据的过程中做到如下两点: 不丢失数据 不重复处理数据 SparkStreaming程序执行架构大致如下: 一.我们先来说说丢失数据的情况: Receiver接收到数据后,首先会在Executor级别上保存数据(根据StorageLevel的设置),例如socketTex

第4课:Spark Streaming的Exactly Once的事务处理

本期内容: Exactly once 输出不重复 Exactly once 1,事务一定会被处理,且只被处理一次: 2,输出能够输出且只会被输出. Receiver:数据通过BlockManager写入内存+磁盘或者通过WAL来保证数据的安全性. WAL机制:写数据时先通过WAL写入文件系统然后存储的Executor(存储在内存和磁盘中,由StorageLevel设定),假设前面没有写成功后面一定不会存储在Executor,如不存在Executor中的话,汇报Driver数据一定不被处理.WAL

(版本定制)第4课:Spark Streaming事务处理彻底详解

本篇文章主要从二个方面展开: 一.Exactly Once 二.输出不重复 事务: 银行转帐为例,A用户转账给B用户,B用户可能收到多笔钱,如何保证事务的一致性,也就是说事务输出,能够输出且只会输出一次,即A只转一次,B只收一次. 从事务视角解密SparkStreaming架构: SparkStreaming应用程序启动,会分配资源,除非整个集群硬件资源奔溃,一般情况下都不会有问题.SparkStreaming程序分成而部分,一部分是Driver,另外一部分是Executor.Receiver接

spark streaming kafka example

// scalastyle:off println package org.apache.spark.examples.streaming import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.stream

第97课:Spark Streaming 结合Spark SQL 案例

代码如下: package com.dt.spark.streaming import org.apache.spark.sql.SQLContext import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.streaming.{StreamingContext, Duration} /**  * 使用SparkStreaming结合SparkSQL对日志进行分析.  * 假设电商网站点击日志格式(简化)

第4课 :Spark Streaming的Exactly-One的事务处理和不重复输出彻底掌握

/* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道68917580*/ Exactly Once的事务处理: 1,数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且通过WAL来保证数据安全: 2,Spark Streaming 1.3的时候为了避免WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka作为文件存储系