简述大数据实时处理框架

现如今,我们来到了数据时代,数据信息化与我们的生活与工作息息相关。此篇文章简述利用大数据框架,实时处理数据的流程与相关框架的介绍,主要包括:

数据实时处理的概念和意义
数据实时处理能做什么
数据实时处理架构简介
数据实时处理代码演示
数据实时处理的概念和意义
什么是数据实时处理呢?我个人对数据实时处理的理解为:数据从生成->实时采集->实时缓存存储->(准)实时计算->实时落地->实时展示->实时分析。这一个流程线下来,处理数据的速度在秒级甚至毫秒级。

数据实时处理有什么意义呢?我们得到数据可以进行数据分析,利用数据统计方法,从错综复杂的数据关系中梳理出事物的联系,比如发展趋势、影响因素、因果关系等。甚至建立一些BI,对一些数据的有用信息进行可视化呈现,并形成数据故事。

数据实时处理能做什么
数据的实时计算
何为数据的实时计算?我们从数据源端拿到数据,可能不尽如人意,我们想对得到的数据进行 ETL 操作、或者进行关联等等,那么我们就会用到数据的实时计算。目前主流的实时计算框架有 spark,storm,flink 等。

数据的实时落地
数据的实时落地,意思是将我们的源数据或者计算好的数据进行实时的存储。在大数据领域,推荐使用 HDFS,ES 等进行存储。

数据的实时展示与分析
我们拿到了数据,要会用数据的价值。数据的价值体现在数据中相互关联关系,或与历史关联,或能预测未来。我们实时得到数据,不仅能够利用前端框架进行实时展示,还可以对其中的一些数据进行算法训练,预测未来走势等。

example:

淘宝双 11 大屏,每年的双 11 是淘宝粉丝疯狂的日子。马云会在双 11 的当天在阿里总部竖起一面大的电子屏幕,展示淘宝这一天的成绩。例如成交额,访问人数,订单量,下单量,成交量等等。这个电子大屏的背后,就是用到的我们所说的数据的实时处理。首先,阿里的服务器遍布全国各地,这些服务器收集PC端、手机端等日志,上报到服务器,在服务上部署数据采集工具。接下来,由于数据量庞大,需要做数据的缓存缓冲处理。下一步,对原始日志进行实时的计算,比如筛选出上面所述的各个指标。最后,通过接口或者其他形式,进行前端屏幕的实时展示。

数据实时处理架构简介
接下来是我们介绍的重点,先放一张数据流程图:

数据采集端,选用目前采集数据的主流控件 flume。
数据缓冲缓存,选用分布式消息队列 kafka。
数据实时计算,选用 spark 计算引擎。
数据存储位置,选用分布式数据存储 ES。
其他,指从 ES 中拿到数据后进行可视化展示,数据分析等。
下面将分别简单的介绍下各个组件:

flume
flume 是一个分布式的数据收集系统,具有高可靠、高可用、事务管理、失败重启、聚合和传输等功能。数据处理速度快,完全可以用于生产环境。

flume 的核心概念有:event,agent,source,channel,sink

event
flume 的数据流由事件 (event) 贯穿始终。event 是 flume 的基本数据单位,它携带日志数据并且携带数据的头信息,这些 event 由 agent 外部的 source 生成,当 source 捕获事件后会进行特定的格式化,然后 source 会把事件推入 channel 中。可以把 channel 看作是一个缓冲区,它将保存事件直到 sink 处理完该事件。sink 负责持久化日志或者把事件推向另一个 source。

agent
flume 的核心是 agent。agent 是一个 java 进程,运行在日志收集端,通过 agent 接收日志,然后暂存起来,再发送到目的地。 每台机器运行一个 agent。 agent 里面可以包含多个 source,channel,sink。

source
source 是数据的收集端,负责将数据捕获后进行特殊的格式化,将数据封装到 event 里,然后将事件推入 channel 中。flume 提供了很多内置的 source,支持 avro,log4j,syslog 等等。如果内置的 source 无法满足环境的需求,flume 还支持自定义 source。

channel
channel 是连接 source 和 sink 的组件,大家可以将它看做一个数据的缓冲区(数据队列),它可以将事件暂存到内存中也可以持久化到本地磁盘上, 直到 sink 处理完该事件。两个较为常用的 channel,MemoryChannel 和 FileChannel。

sink
sink 从 channel 中取出事件,然后将数据发到别处,可以向文件系统、数据库、hadoop、kafka,也可以是其他 agent 的 source。

flume 的可靠性与可恢复性
flume 的可靠性:当节点出现故障时,日志能够被传送到其他节点上而不会丢失。Flume 提供了可靠性保障,收到数据首先写到磁盘上,当数据传送成功后,再删除;如果数据发送失败,可以重新发送。
flume 的可恢复性:可恢复性是靠 channel。
口述抽象,上两张官网贴图:

单个 agent 收集数据流程图

多个 agent 协作处理数据流程图

kafka
Kafka 是一个高吞吐量的分布式发布-订阅消息系统。企业中一般使用 kafka 做消息中间件,做缓冲缓存处理。需要 zookeeper 分布式协调组件管理。

kafka 的设计目标:

提供优秀的消息持久化能力,对 TB 级以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的机器上也能做到每台机每秒 100000 条消息的传输。
支持 kafka server 间的消息分区,及分布式消费,同时保证每个 partition 内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
kafka 核心概念

broker:消息中间件处理结点,一个 kafka 节点就是一个 broker,多个 broker 可以组成一个 kafka 集群。
topic:主题,kafka 集群能够同时负责多个 topic 的分发。
partition:topic 物理上的分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
offset:每个 partition 都由一系列有序的、不可变的消息组成,这些消息被连续的追加到 partition 中。partition 中的每个消息都有一个连续的序列号叫做 offset,用于 partition 唯一标识一条消息。
producer:负责发布消息到 kafka broker。
consumer:消息消费者,向 kafka broker读取消息的客户端。
consumer group:每个 consumer 属于一个特定的 consumer group。
贴两张官网图

prodecer-broker-consumer

分区图

spark
spark 是一个分布式的计算框架,是我目前认为最火的计算框架。

spark,是一种"one stack to rulethem all"的大数据计算框架,期望使用一个技术栈就完美地解决大数据领域的各种计算任务。apache 官方,对 spark 的定义是:通用的大数据快速处理引擎(一“栈”式)。

spark组成
spark core 用于离线计算
spark sql 用于交互式查询
spark streaming,structed streaming 用于实时流式计算
spark MLlib 用于机器学习
spark GraphX 用于图计算
spark 特点
速度快:spar k基于内存进行计算(当然也有部分计算基于磁盘,比如 shuffle)。
容易上手开发:spark 的基于 rdd 的计算模型,比 hadoop 的基于 map-reduce 的计算模型要更加易于理解,更加易于上手开发,实现各种复杂功能。
通用性:spark 提供的技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。
与其他技术的完美集成:例如 hadoop,hdfs、hive、hbase 负责存储,yarn 负责资源调度,spark 负责大数据计算。
极高的活跃度:spark 目前是 apache 的顶级项目,全世界有大量的优秀工程师是 spark 的 committer,并且世界上很多顶级的 IT 公司都在大规模地使用 spark。
贴个spark架构图

数据实时处理代码演示
搭建好各个集群环境
需要搭建 flume 集群,kafka 集群,es 集群,zookeeper 集群,由于本例 spark 是在本地模式运行,所以无需搭建 spark 集群。

配置好组件之间整合的配置文件
搭建好集群后,根据集群组件直接的整合关系,配置好配置文件。其中主要的配置为 flume 的配置,如下图:

可以看到,我们的 agent 的 source 为 r1,channel 为 c1,sink 为 k1,source 为我本地 nc 服务,收集日志时,只需要打开 9999 端口就可以把日志收集。channel 选择为 memory 内存模式。sink 为 kafka 的 topic8 主题。

开启各个集群进程
开启 zookeeper 服务。其中 QuorumPeerMain 为 zookeeper 进程。
开启 kafka 服务。
开启 es 服务。
开启 flume 服务。其中 Application 为 flume 进程。

创建好 es 对应 table
创建好 es 对应的表,表有三个字段,对应代码里面的 case class(代码随后贴上)。

代码如下:

package run

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.spark.rdd.EsSpark

/**

  • @author wangjx
  • 测试kafka数据进行统计 kafka自身维护offset(建议使用自定义维护方式维护偏移量)
    */
    object SparkStreamingAutoOffsetKafka {
    //定义样例类 与es表对应
    case class people(name:String,country:String,age:Int)

    def main(args: Array[String]): Unit = {
    val logger = Logger.getLogger(this.getClass);
    //spark 配置
    val conf = new SparkConf().setAppName("SparkStreamingAutoOffsetKafka").setMaster("local[2]")
    conf.set("es.index.auto.create","true")
    conf.set("es.nodes","127.0.0.1")
    conf.set("es.port","9200")
    //spark streaming实时计算初始化 定义每10秒一个批次 准实时处理 企业一般都是准实时 比如每隔10秒统计近1分钟的数据等等
    val ssc = new StreamingContext(conf, Seconds(10))
    val spark = SparkSession.builder()
    .config(conf)
    .getOrCreate()
    spark.sparkContext.setLogLevel("WARN");
    //设置kafka参数
    val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> "x:9092",
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> "exactly-once",
    "auto.offset.reset" -> "latest",
    "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    //kafka主题
    val topic = Set("kafka8")
    //从kafka获取数据
    val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topic, kafkaParams)
    )
    //具体的业务逻辑
    val kafkaValue: DStream[String] = stream.flatMap(line=>Some(line.value()))
    val peopleStream = kafkaValue
    .map(_.split(":"))
    //形成people样例对象
    .map(m=>people(m(0),m(1),m(2).toInt))
    //存入ES
    peopleStream.foreachRDD(rdd =>{
    EsSpark.saveToEs(rdd, "people/man")
    })
    //启动程序入口
    ssc.start()
    ssc.awaitTermination()
    }
    }

原文地址:http://blog.51cto.com/13952953/2176661

时间: 2024-11-05 02:30:35

简述大数据实时处理框架的相关文章

大数据实时处理-基于Spark的大数据实时处理及应用技术培训

随着互联网.移动互联网和物联网的发展,我们已经切实地迎来了一个大数据 的时代.大数据是指无法在一定时间内用常规软件工具对其内容进行抓取.管理和处理的数据集合,对大数据的分析已经成为一个非常重要且紧迫的需求.目前对大数据的分析工具,首选的是Hadoop/Yarn平台,但目前对大数据的实时分析工具,业界公认最佳为Spark.Spark是基于内存计算的大数据并行计算框架,Spark目前是Apache软件基金会旗下,顶级的开源项目,Spark提出的DAG作为MapReduce的替代方案,兼容HDFS.H

Flume+Kafka+Storm+Redis构建大数据实时处理系统:实时统计网站PV、UV+展示

[TOC] 1 大数据处理的常用方法 前面在我的另一篇文章中<大数据采集.清洗.处理:使用MapReduce进行离线数据分析完整案例>中已经有提及到,这里依然给出下面的图示: 前面给出的那篇文章是基于MapReduce的离线数据分析案例,其通过对网站产生的用户访问日志进行处理并分析出该网站在某天的PV.UV等数据,对应上面的图示,其走的就是离线处理的数据处理方式,而这里即将要介绍的是另外一条路线的数据处理方式,即基于Storm的在线处理,在下面给出的完整案例中,我们将会完成下面的几项工作: 1

一文读懂大数据计算框架与平台

1.前言 计算机的基本工作就是处理数据,包括磁盘文件中的数据,通过网络传输的数据流或数据包,数据库中的结构化数据等.随着互联网.物联网等技术得到越来越广泛的应用,数据规模不断增加,TB.PB量级成为常态,对数据的处理已无法由单台计算机完成,而只能由多台机器共同承担计算任务.而在分布式环境中进行大数据处理,除了与存储系统打交道外,还涉及计算任务的分工,计算负荷的分配,计算机之间的数据迁移等工作,并且要考虑计算机或网络发生故障时的数据安全,情况要复杂得多. 举一个简单的例子,假设我们要从销售记录中统

零基础大数据学习框架

大数据开发最核心的课程就是Hadoop框架,几乎可以说Hadoop就是大数据开发.这个框架就类似于Java应用开发的SSH/SSM框架,都是Apache基金会或者其他Java开源社区团体的能人牛人开发的贡献给大家使用的一种开源Java框架.科多大数据大数据来带你看看. Java语言是王道就是这个道理,Java的核心代码是开源的,是经过全球能人牛人共同学习共同研发共同检验的,所以说Java是最经得住检验的语言,而且任何人都可以学习Java核心技术并且使用核心技术开发出像android一样的系统和H

高并发、大数据企业级框架整合maven_Springmvc_Mybatis_Shiro_REST_WebService_JMS_Lucene_Bootstrap

1. 使用阿里巴巴Druid连接池(高效.功能强大.可扩展性好的数据库连接池.监控数据库访问性能.支持Common-Logging.Log4j和JdkLog,监控数据库访问) 2. 提供高并发JMS消息处理机制 3. 所有功能模块化.所有模块服务化.所有服务原子化的方式,提供可拓展的服务模型,使程序稳定运行,永不宕机 4. 提供Wink Rest.Webservice服务,故可作为独立服务平台部署 框架整合: Springmvc + Mybatis + Shiro(权限) + REST(服务)

【高并发、大数据企业级框架分享、集成lucene】maven_Springmvc_Mybatis_Shiro_REST_WebService_JMS_Lucene_Bootstrap

1. 使用阿里巴巴Druid连接池(高效.功能强大.可扩展性好的数据库连接池.监控数据库访问性能.支持Common-Logging.Log4j和JdkLog,监控数据库访问) 2. 提供高并发JMS消息处理机制 3. 所有功能模块化.所有模块服务化.所有服务原子化的方式,提供可拓展的服务模型,使程序稳定运行,永不宕机 4. 提供Wink Rest.Webservice服务,故可作为独立服务平台部署 框架整合: Springmvc + Mybatis + Shiro(权限) + REST(服务)

学习大数据基础框架hadoop需要什么基础

什么是大数据?进入本世纪以来,尤其是2010年之后,随着互联网特别是移动互联网的发展,数据的增长呈爆炸趋势,已经很难估计全世界的电子设备中存储的数据到底有多少,描述数据系统的数据量的计量单位从MB(1MB大约等于一百万字节).GB(1024MB).TB(1024GB),一直向上攀升,目前,PB(等于1024TB)级的数据系统已经很常见,随着移动个人数据.社交网站.科学计算.证券交易.网站日志.传感器网络数据量的不断加大,国内拥有的总数据量早已超出 ZB(1ZB=1024EB,1EB=1024PB

学习hadoop大数据基础框架需要什么基础

什么是大数据?进入本世纪以来,尤其是2010年之后,随着互联网特别是移动互联网的发展,数据的增长呈爆炸趋势,已经很难估计全世界的电子设备中存储的数据到底有多少,描述数据系统的数据量的计量单位从MB(1MB大约等于一百万字节).GB(1024MB).TB(1024GB),一直向上攀升,目前,PB(等于1024TB)级的数据系统已经很常见,随着移动个人数据.社交网站.科学计算.证券交易.网站日志.传感器网络数据量的不断加大,国内拥有的总数据量早已超出 ZB(1ZB=1024EB,1EB=1024PB

大数据技术框架

大数据整体技术框架 --------------------------------- ------------------------------ 原文地址:https://www.cnblogs.com/coco2015/p/11146649.html