Map Reduce和流处理

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

本文由@从流域到海域翻译,发表于腾讯云+社区

map()和reduce()是在集群式设备上用来做大规模数据处理的方法,用户定义一个特定的映射,函数将使用该映射对一系列键值对进行处理,直接产生出一系列键值对。

Map Reduce和流处理

Hadoop的Map / Reduce模型在并行处理大量数据方面非常出色。它提供了一个通用的分区机制(基于数据的关键)来分配不同机器上的聚合式工作负载。基本上, map / reduce的算法设计都是关于如何在处理过程中的不同阶段为记录值选择正确的key。

然而,“时间维度”与数据的其他维度属性相比具有非常不同的特征,特别是在涉及实时数据处理时。它对面向批处理的Map/Reduce模型提出了一系列不同的挑战。

  1. 实时处理需要非常低的响应延迟,这意味着没有太多的数据能够在“时间”维度上进行处理。
  2. 从多个数据源收集到的数据可能没有全部到达汇总点。
  3. 在Map/Reduce的标准模型中,reduce阶段在map阶段完成之前无法启动。而且在下载到reducer之前,所有处理过程的中间数据都保存在磁盘中。所有这些都显著增加了处理的延迟。

尽管Hadoop Map/Reduce是针对批处理的工作负载而设计的,但某些应用程序(如欺诈检测,广告显示,网络监控需要实时响应以处理大量数据),现在已开始考虑各种调整Hadoop的方法以使其适合更实时的处理环境。在本篇文章中,我尝试了一些基于Map/Reduce模型的执行低延迟并行处理的技术。

常用流处理模型

在这个模型中,数据是在各种各样的OLTP系统中生成的,这些系统更新了事务数据存储,并异步发送其他数据用于分析处理。分析处理过程将输出写入到决策模型,该决策模型会将信息反馈给OLTP系统来进行实时决策。

注意与OLTP系统分离的分析处理的“异步性质”,在该方式下OLTP系统不会放慢速度等待分析处理完成。无论如何,我们仍然需要尽快进行分析处理,否则决策模型将不能反映当前世界的真实场景,它将不会很有用处。什么程度的延迟可容忍的是应用程序指定的。

在Map/Reduce中进行微批处理

一种方法是根据时间窗(例如每小时)将数据分成小批量,并将每批中收集的数据提交给Map/Reduce作业。这需要分段机制,以便OLTP应用程序可以继续独立于分析处理。而作业调度程序用于规范生产者和消费者,基于此它们每个生产者或消费者都可以独立进行。(生产者和消费者是在操作系统理论中对产生数据和处理数据的程序的称呼,译者注)

连续性Map/Reduce

这里让我们想象一下有关Map/Reduce执行模型的一些可能的修改,以使其适应实时流处理。我并不担心Hadoop在线原型(HOP)所采用的方法的向后兼容性 。

长时间运行

第一种修改方法是使mapper和reducer长时间运行。因此,我们不能等待map阶段结束之后才开始reduce阶段,因为map阶段永远不会结束。这意味着mapper在完成处理后会将数据推送到reducer,并让reducer对数据进行排序。这种方法的缺点是它没有机会去运行地图侧的combine()函数以降低带宽使用率。它还将更多的工作量转移到正需要进行分类的reducer。

注意在延迟和优化之间需要有一个折衷。优化需要更多的数据在源头(即Mapper)就进行累积,如此即可以执行本地合并(即:结合在一起)。不幸的是,低延迟需要尽快发送数据,因此没有太多时间使大量累积操作可以完成。

HOP提出了一种自适应流控制机制,在该方式下数据会被尽快推送到Reducer,直到Reducer被重载并退回(使用某种流量控制协议)。然后mapper将缓冲处理后的消息并在发送给reducer之前执行combine()函数。这种方法将会自动地来回移动Reducer和Mapper之间的聚合工作负载。

时间窗口:切片和范围

这是一个“时间片(time slice)”概念和一个“时间范围(time range)”的概念。“切片(Slice)”定义了执行reduce处理之前所累计结果的时间窗口。这也是mapper在发送到reducer之前应积累的最小数据量。

“范围(Range)”定义了结果所汇总的时间窗口。它可以是一个具有明确起点定义的界标窗口或者是跳跃窗口的(考虑移动的界标场景)。它也可以是一个滑动窗口,其中从当前时间开始聚合的固定大小的窗口。

在从每个mapper接收到特定时间片后,reducer可以启动聚合处理并将结果与之前的聚合结果进行合并。切片(大小)可以根据mapper发送的数据量来进行动态调整。

增量处理

请注意,reducer需要在收到所有mapper中相同时间片的所有记录后计算聚合片值。之后,它会调用用户定义的merge()函数将切片值与范围值合并。如果范围需要刷新(例如达到跳转窗口边界),将调用init()函数来获取刷新的范围值。如果范围值需要更新(当某个切片值超出滑动范围时),则会调用unmerge()函数。

以下是我们如何在每小时更新(即:一小时大小切片)的情况下,在24小时滑动窗口内跟踪平均命中率(即:每小时总命中数)的示例。

# Call at each hit record
map(k1, hitRecord) {
   site = hitRecord.site
   # lookup the slice of the particular key
   slice = lookupSlice(site)
   if (slice.time - now > 60.minutes) {
       # Notify reducer whole slice of site is sent
       advance(site, slice)
        slice = lookupSlice(site)
    }
   emitIntermediate(site, slice, 1)
}

combine(site, slice, countList) {
   hitCount = 0
   for count in countList {
       hitCount += count
   }
   # Send the message to the downstream node
   emitIntermediate(site, slice, hitCount)
}
# Called when reducer receive full slice from all mappers
reduce(site, slice, countList) {
   hitCount = 0
   for count in countList {
       hitCount += count
   }
   sv = SliceValue.new
   sv.hitCount = hitCount
   return sv
}

# Called at each jumping window boundary
init(slice) {
   rangeValue = RangeValue.new
   rangeValue.hitCount = 0
   return rangeValue
}

# Called after each reduce()
merge(rangeValue, slice, sliceValue) {
   rangeValue.hitCount += sliceValue.hitCount
}

# Called when a slice fall out the sliding window
unmerge(rangeValue, slice, sliceValue) {
   rangeValue.hitCount -= sliceValue.hitCount
}

问答

比较好的MapReduce例子有哪些?

相关阅读

MapReduce极简教程

大数据运算模型 MapReduce 原理

如何为Hadoop选择最佳弹性MapReduce框架

此文已由作者授权腾讯云+社区发布,原文链接:https://cloud.tencent.com/developer/article/1122471?fromSource=waitui

原文地址:https://www.cnblogs.com/qcloud1001/p/9047039.html

时间: 2024-10-29 19:08:46

Map Reduce和流处理的相关文章

map reduce

作者:Coldwings链接:https://www.zhihu.com/question/29936822/answer/48586327来源:知乎著作权归作者所有,转载请联系作者获得授权. 简单的说就是问题可以划分成若干单元,每个单元的计算互不相关,单元计算结果可以在可以承受的时间内合成为总结果的计算.再说直白一点:所有分治模型都可交由hadoop解决.可以说spark是功能更全面的hadoop,支持一些诸如filter.group之类的操作,但是原本思想仍是map reduce,差别不太大

分布式基础学习(2)分布式计算系统(Map/Reduce)

二. 分布式计算(Map/Reduce) 分 布式式计算,同样是一个宽泛的概念,在这里,它狭义的指代,按Google Map/Reduce框架所设计的分布式框架.在Hadoop中,分布式文件 系统,很大程度上,是为各种分布式计算需求所服务的.我们说分布式文件系统就是加了分布式的文件系统,类似的定义推广到分布式计算上,我们可以将其视为增 加了分布式支持的计算函数.从计算的角度上看,Map/Reduce框架接受各种格式的键值对文件作为输入,读取计算后,最终生成自定义格式的输出文件. 而从分布式的角度

如何给Map/Reduce程序传递参数?

前言 以前我们启动一个Map/Reduce,经常是利用hadoop jar ./xxx.jar yyy.KK input output的方式在SHELL脚本或者命令行直接提交作业.但是最近涉及到的一个项目,需要根据配置动态的启动MR作业,也就是涉及到向MAP,REDUCE处理类传递参数的问题. 传递参数的方式 最常见的方式: Configuration conf = new Configuration(); conf.set("key","value"); 然后在M

Hadoop的HDFS和Map/Reduce

HDFS HDFS是一个具有高度容错性的分布式文件系统,适合部署在廉价的机器上,它具有以下几个特点: 1)适合存储非常大的文件 2)适合流式数据读取,即适合"只写一次,读多次"的数据处理模式 3)适合部署在廉价的机器上 但HDFS不适合以下场景(任何东西都要分两面看,只有适合自己业务的技术才是真正的好技术): 1)不适合存储大量的小文件,因为受Namenode内存大小限制 2)不适合实时数据读取,高吞吐量和实时性是相悖的,HDFS选择前者 3)不适合需要经常修改数据的场景 HDFS的架

Spark streaming storm map reduce区别与联系

1.1  基本概念 Storm是一个流式计算框架,Storm采用Java和Clojure编写,其优点是全内存计算,所以它的定位是分布式实时计算. Spark是一个基于内存计算的开源集群计算系统,目的是更快速的进行数据分析.Spark类似于Hadoop MapReduce的通用并行计算框架,Spark基于Map Reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点,但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spa

python中filter, map, reduce, lambda

python 中内置的几个函数filter, map, reduce, lambda简单的例子. #!/usr/bin/env python #_*_coding:utf-8_*_ #filter(function, sequence): #对sequence中的item依次执行function(item),将执行结果为True的item组成一个List/String/Tuple(取决于sequence的类型)返回. #可以看作是过滤函数. tasks = [ { 'id': 1, 'title

王亟亟的Python学习之路(八)-函数式编程,map(),reduce(),filter()

转载请注明出处:王亟亟的大牛之路 首先在这里祝愿大家,新年快乐,工作顺利,BUG少少!!! 本来说是在春节假期内继续维持着写文章的进度,但是还是偷懒了几天(打了4天SC2哈哈哈) 今天上的是关于Python的文章,毕竟在亲戚家拜年,懒得插各类手机调试什么的,况且确实好久没有弄Python了,就写了,废话不多,开始正题!! 函数式编程 函数是什么? 把复杂的操作化为简单的函数分解成简单的操作,这种操作就是面向过程,也就是C这类的实现的大体概念. 函数式是什么? 函数没有变量,任意一个函数,只要输入

记一次MongoDB Map&Reduce入门操作

需求说明 用Map&Reduce计算几个班级中,每个班级10岁和20岁之间学生的数量: 需求分析 学生表的字段: db.students.insert({classid:1, age:14, name:'Tom'}) 将classid随机1和2.age在8-25岁之间随机,name在3-7个字符之间随机. 数据写入 数据写入java脚本 往mrtask库中students写入1000万条数据: package org.test; import java.util.ArrayList; impor

lodash用法系列(4),使用Map/Reduce转换

Lodash用来操作对象和集合,比Underscore拥有更多的功能和更好的性能. 官网:https://lodash.com/引用:<script src="//cdnjs.cloudflare.com/ajax/libs/lodash.js/2.4.1/lodash.min.js"></script>安装:npm install lodash 首先通过npm安装lodash:npm i --save lodash 在js文件中引用lodash:var _ =