Spark-再接着上次的Lamda架构

日志分析

单机日志分析,适用于小数据量的。(最大10G),awk/grep/sort/join等都是日志分析的利器。

例子:

1、shell得到Nginx日志中访问量最高的前十个IP

cat access.log.10 | awk ‘(a[$1]++) END (for(b in a) print b"\t"a[b])‘ | sort -k2 -r | head -n 10

2、python 统计每个IP的地址点击数

 import re
 import sys
 contents=sys.argv[1]
 def NginxIpHit(logfile_path):
     ipadd = r‘\.‘.join([r‘\d{1,3}‘]*4)
     re_ip = re.compile(ipadd)
     iphitlisting = {}
     for line in open(contents):
     match = re_ip.match(line)
     if match:
        ip = match.group()
        iphitlisting[ip]=iphitlisting.get(ip,0)+1
     print iphitlisting
 NginxIpHit(contents)

**大规模的日志处理,日志分析指标:

PV、UV、PUPV、漏斗模型和准化率、留存率、用户属性

最终用UI展示各个指标的信息。**

架构

  • 1、实时日志处理流线

数据采集:采用Flume NG进行数据采集

数据汇总和转发:用Flume 将数据转发和汇总到实时消息系统Kafka

数据处理:采用spark streming 进行实时的数据处理

结果显示:flask作为可视化工具进行结果显示

  • 2、离线日志处理流线

数据采集:通过Flume将数据转存到HDFS

数据处理:使用spark sql进行数据的预处理

结果呈现:结果汇总到mysql上,最后使用flask进行结果的展现

Lamda架构:低响应延迟的组合数据传输环境。

查询过程:一次流处理、一次批处理。对应着实时和离线处理。

项目流程

安装flume

Flume进行日志采集,web端的日志一般Nginx、IIS、Tomcat等。Tomcat的日志在var/data/log

安装jdk

安装Flume

wget http://mirrors.cnnic.cn/apache/flume/1.5.0/apache-flume-1.5.0-bin.tar.gz
tar –zxvf  apache-flume-1.5.0-bin.tar.gz
mv apache-flume-1.5.0 –bin  apache-flume-1.5.0
ln   -s  apache-flume-1.5.0   fiume 

环境变量配置

Vim  /etc/profile
Export JAVA_HOME=/usr/local/jdk
Export CLASS_PATH = .:$ JAVA_HOME/lib/dt.jar: $ JAVA_HOME/lib/tools.jar
Export PATH=$ PATH:$ JAVA_HOME/bin
Export FlUME_HOME=/usr/local/flume
Export FlUME_CONF_DIR=$ FlUME_HOME/conf
Export PATH=$ PATH:$ FlUME_HOME /bin
Souce  /etc/profile 

创建agent配置文件将数据输出到hdfs上,修改flume.conf:

a1.sources = r1
a1.sinks = k1
a1.channels =c1
描述和配置sources
第一步:配置数据源
a1.sources.r1.type =exec
a1.sources.r1.channels =c1
配置需要监控的日志输出目录
a1.sources.r1.command=tail  –f  /va/log/data
第二步:配置数据输出
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.177:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.writeFormat=Text
a1.sink.k1.hdfs.rollInterval =60
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
配置数据通道
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
第四步:将三者级联
a1.souces.r1.channels =c1
a1.sinks.k1.channel =c1

启动Flume Agent

cd  /usr/local/flume
nohup bin/flume-ng  agent  –n  conf  -f  conf/flume-conf.properties
&

已经将flume整合到了hdfs中

  • 整合Flume、kafka、hhdfs
#hdfs输出端
a1.sink.k1.type =hdfs
a1.sink.k1.channels =c1
a1.sink.k1.hdfs.useLocalTimeStamp=true
a1.sink.k1.hdfs.path =hdfs://192.168.11.174:9000/flume/events/%Y/%m/%d/%H/%M
a1.sink.k1.hdfs.filePrefix =cmcc-%H
a1.sink.k1.hdfs.minBlockReplicas=1
a1.sink.k1.hdfs.fileType =DataStream
a1.sink.k1.hdfs.rollInterval =3600
a1.sink.k1.hdfs.rollSize =0
a1.sink.k1.hdfs.rollCount=0
a1.sink.k1.hdfs.idleTimeout =0
#kafka输出端 为了提高性能使用内存通道
a1.sink.k2.type =com.cmcc.chiwei.Kafka.CmccKafkaSink
a1.sink.k2.channels =c2
a1.sink.k2.metadata.broker.List=192.168.11.174:9002;192.168.11.175:9092; 192.168.11.174:9092
a1.sink.k2.partion.key =0
a1.sink.k2.partioner.class= com.cmcc.chiwei.Kafka.Cmcc Partion
a1.sink.k2.serializer.class= kafka. Serializer.StringEncoder
a1.sink.k2.request.acks=0
a1.sink.k2.cmcc.encoding=UTF-8
a1.sink.k2.cmcc.topic.name=cmcc
a1.sink.k2.producer.type =async
a1.sink.k2.batchSize =100

a1.sources.r1.selector.type=replicating

a1.sources = r1
a1.sinks = k1 k2
a1.channels =c1 c2

#c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/home/flume/flumeCheckpoint
a1.channels.c1.dataDir=/home/flume/flumeData, /home/flume/flumeDataExt
a1.channels.c1.capacity=2000000
a1.channels.c1.transactionCapacity=100
#c2
a1.channels.c2.type=memory
a1.channels.c2.capacity=2000000
a1.channels.c2.transactionCapacity=100

用Kafka将日志汇总

1.4 Tar –zxvf  kafka_2.10-0.8.1.1.tgz
1.5 配置kafka和zookeeper文件
配置zookeeper.properties
dataDir=/tmp/zookeeper
client.Port=2181
maxClientCnxns = 0
initLimit = 5
syncLimit = 2
##
server.43 = 10.190.182.43:2888:3888
server.38 = 10.190.182.38:2888:3888

server.33 = 10.190.182.33:2888:3888

配置zookeeper myid

在每个服务器dataDir 创建 myid文件 写入本机id
//server.43   myid  本机编号43
echo “43” >  /tmp/ zookeeper/myid
配置kafka文件, config/server.properties
每个节点根据不同主机名配置
broker.id :43
host.name:10.190.172.43
zookeeper.connect=10.190.172.43:2181, 10.190.172.33:2181,10.190.172.38:2181

启动zookeeper

kafka通过zookeeper存储元数据,先启动它,提供kafka相应的连接地址

Kafka自带的zookeeper

在每个节点 bin/zookeeper-server-start.sh config/zookeeper. properties

启动Kafka

Bin/Kafka-server-start.sh

创建和查看topic

Topic和flume中的要一致,spark streming 也用的这个

Bin/ Kafka-topics.sh  --create  --zookeeper 10.190.172.43:2181
 --replication-factor  1  -- partions   1  --topic  KafkaTopic

查看下:

Bin/ Kafka-topics.sh   --describe   -- zookeeper  10.190.172.43:2181

整合kafka sparkstreming

Buid.sbt
Spark-core
Spark-streming
Spark-streamng-kafka
kafka
  • Spark streming 实时分析

    数据收集和中转已经好了,kafka给sparkstreming

  • Spark sql 离线分析
  • Flask可视化

代码

移步: github.com/jinhang

时间: 2024-09-12 19:06:52

Spark-再接着上次的Lamda架构的相关文章

spark再总结

1.Spark是什么? UCBerkeley AMPlab所开源的类HadoopMapReduce的通用的并行计算框架. dfsSpark基于mapreduce算法实现的分布式计算,拥有HadoopMapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法. 2.Spark与Hadoop的对比(Spark的优势) 1.Spark的中间数据放到内存

【转载】Spark系列之运行原理和架构

参考 http://www.cnblogs.com/shishanyuan/p/4721326.html 1. Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor代码: lDriver:Spark中的Driver即运行上述Application的main()函数并且创建SparkCon

Spark学习(三): 基本架构及原理

Apache Spark是一个围绕速度.易用性和复杂分析构建的大数据处理框架,最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一,与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势: Spark提供了一个全面.统一的框架用于管理各种有着不同性质(文本数据.图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求 官方资料介绍Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍

再理下系统分层架构模式

MVC是横向分工:三层架构,是纵向分层. 在大多数架构图中,MVC被画成三角形,View是使用者能够看到并操作的层(可能是Web页面,也可能是GUI),Controller是处理各种操作请求的核心,Model是用于保存各种数据,并且会影响View的显示结果. 而三层架构,通常会被画成上中下三层,上层是前端:中层是业务逻辑:底层是数据存储. 从这个介绍来看,MVC与三层架构的确似乎存在对应关系.在很多简单项目中,我们可以将业务逻辑写在Controller里,而保存数据的逻辑也无非是数据库的表与Mo

再谈PG索引-存储架构

1.索引的基本架构 PG的索引是B+树,B+树是为磁盘或其他直接存取辅助设备而设计的一种平衡查找树,在B+树中,所有记录节点都是按键值的大小顺序存放在同一层的叶节点中,各叶节点指针进行连接: meta page | root page(8kb,一个记录占32个bit,那么就能存256个branch page,超过了就需要扩充一级branch page来存储leaf page) | branch page … | | | branch page branch page branch page …

再看内核的frace架构, tracepoint宏扩展(二)

ftrace接口中是时间都id是啥意思,还有format,enable的时候发生了啥 id, enable, filter相关的函数接口全部都在 kernel/trace/trace_events.c event_create_dir <--__trace_add_new_event <-- trace_add_event_dirs <-- event_trace_add_tracer <--- trace_event_call (include/linux/trace_event

3.spark streaming Job 架构和容错解析

一.Spark streaming Job 架构 SparkStreaming框架会自动启动Job并每隔BatchDuration时间会自动触发Job的调用. Spark Streaming的Job 分为两大类: 每隔BatchInterval时间片就会产生的一个个Job,这里的Job并不是Spark Core中的Job,它只是基于DStreamGraph而生成的RDD的DAG而已:从Java角度讲相当于Runnable接口的实现类,要想运行Job需要将Job提交给JobScheduler,在J

再议Citrix PVS架构与机制

一.PVS是存储架构和网络计算架构 我们说Citrix PVS架构本质上是一个存储架构,是因为在Citrix PVS架构下,实现了计算和存储的分离.首先在传统的计算机上,计算资源和存储资源是同处于相同计算机内部的通过高速总线连接起来的组件,而在存储设备上,服务器本地的存储资源不在用于存储数据文件,存储数据文件的存储空间通过网络(TCP/IP or FC)传送到专门的存储控制器,由存储控制器来分配和管理这些元数据和IO,并最终将数据落地到硬盘空间存储起来.所以从原理出发,Citrix PVS就相当

Spark学习笔记5:Spark集群架构

Spark的一大好处就是可以通过增加机器数量并使用集群模式运行,来扩展计算能力.Spark可以在各种各样的集群管理器(Hadoop YARN , Apache Mesos , 还有Spark自带的独立集群管理器)上运行,所以Spark应用既能够适应专用集群,又能用于共享的云计算环境. Spark运行时架构 Spark在分布式环境中的架构如下图: 在分布式环境下,Spark集群采用的是主/从结构.在Spark集群,驱动器节点负责中央协调,调度各个分布式工作节点.执行器节点是工作节点,作为独立的Ja