Hadoop学习笔记之MapReduce浅析

最近电商行情不是太乐观,整个市场流量增长缓慢,正好项目闲了下来,老板说下一步重点要搞大数据分析。

大姨吗现在接近一亿注册用户,日PV和UV已经达到很高的一个数量级,美月这边实际消费转化率目前也就在1%左右,整个电商市场的寒冬,老板表示急于需要分析现有的用户数据来进行针对性的推送推荐备货以及下一步战略目标。这样就开始我的大数据之旅。


MapReduce是什么

问题

在介绍什么是MapReduce之前呢,首先我们先来看这样一副扑克牌:
扑克牌中有多少张A?

问题:请问上面扑克牌中又多少张A?

解决

传统的方案:
就是一张一张的检查,然后输出来。

MapReduce:

1.给在座的人分配这些牌
2.让每个人数自己手里有多少A
3.把所有人报的数字加起来,得到结果

概念

那么我们来引入MapReduce的概念:

Map(映射):

  • 对集合中每一个元素进行制定的操作

Reduce(化简):

  • 遍历集合中的元素进行适当的合并,得到结果

分析

重新审视我们原来那个分散纸牌的例子,我们有MapReduce数据分析的基本方法:

  1. 通过把牌分给多个玩家并且让他们各自数数,你就在并行执行运算,因为每个玩家都在同时计数。这同时把这项工作变成了分布式的,因为多个不同的人在解决同一个问题的过程中并不需要知道他们的邻居在干什么。
  2. 通过告诉每个人去数数,你对一项检查每张牌的任务进行了映射。 你不会让他们把A的牌递给你,而是让他们把你想要的东西化简为一个数字,然后你把结果进行合并

Tips:例子仅供参考

MapReduce做什么

初析

Mapreduce提供一个统一 的计算框架,既然是做计算的框架,那么表现形式就是有个输入(input),mapreduce操作这个输入(input),通过本身定义好的计算模型,得到一个输出(output),这个输出就是我们所需要的结果。

  • 通过抽象模型和计算框架把需要做什么(what need to do)与具体怎么做(how to do)分开了,为程序员提供一个抽象和高层的编程接口和框架

  • 程序员仅需要关心其应用层的具体计算问题,仅需编写少量的处理应用本身计算问题的程序代码

  • 如何具体完成这个并行计算任务所相关的诸多系统层细节被隐藏起来,交给计算框架去处理:从分布代码的执行,到大到数千小到单个节点集群的自动调度使用

主要功能

  1. 数据划分和计算任务调度:
    系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并 负责Map节点执行的同步控制。

  2. 数据/代码互定位:
    为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。

  3. 系统优化:
    为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个 Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。

  4. 出错检测和恢复:
    以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。

主要特征

MapReduce设计上具有以下主要的技术特征:

  1. 向“外”横向扩展,而非向“上”纵向扩展
    即MapReduce集群的构建完全选用价格便宜、易于扩展的低端商用服务器,而非价格昂贵、不易扩展的高端服务器。
    对于大规模数据处理,由于有大 量数据存储需要,显而易见,基于低端服务器的集群远比基于高端服务器的集群优越,这就是为什么MapReduce并行计算集群会基于低端服务器实现的原因。

  2. 失效被认为是常态
    MapReduce集群中使用大量的低端服务器,因此,节点硬件失效和软件出错是常态,因而一个良好设计、具有高容错性的并行计算系统不能因为节点 失效而影响计算服务的质量,任何节点失效都不应当导致结果的不一致或不确定性;任何一个节点失效时,其他节点要能够无缝接管失效节点的计算任务;当失效节 点恢复后应能自动无缝加入集群,而不需要管理员人工进行系统配置。
    MapReduce并行计算软件框架使用了多种有效的错误检测和恢复机制,如节点自动重 启技术,使集群和计算框架具有对付节点失效的健壮性,能有效处理失效节点的检测和恢复。

  3. 把处理向数据迁移
    传统高性能计算系统通常有很多处理器节点与一些外存储器节点相连,如用存储区域网络(Storage Area,SAN Network)连接的磁盘阵列,因此,大规模数据处理时外存文件数据I/O访问会成为一个制约系统性能的瓶颈。
    为了减少大规模数据并行计算系统中的数据 通信开销,代之以把数据传送到处理节点(数据向处理器或代码迁移),应当考虑将处理向数据靠拢和迁移。MapReduce采用了数据/代码互定位的技术方法,计算节点将首先尽量负责计算其本地存储的数据,以发挥数据本地化特点,仅当节点无法处理本地数据时,再采用就近原则寻找其他可用计算节点,并把数据传送到该可用计算节点。

  4. 顺序处理数据、避免随机访问数据
    大规模数据处理的特点决定了大量的数据记录难以全部存放在内存,而通常只能放在外存中进行处理。由于磁盘的顺序访问要远比随机访问快得多,因此 MapReduce主要设计为面向顺序式大规模数据的磁盘访问处理。
    为了实现面向大数据集批处理的高吞吐量的并行处理,MapReduce可以利用集群中 的大量数据存储节点同时访问数据,以此利用分布集群中大量节点上的磁盘集合提供高带宽的数据访问和传输。

  5. 为应用开发者隐藏系统层细节
    软件工程实践指南中,专业程序员认为之所以写程序困难,是因为程序员需要记住太多的编程细节(从变量名到复杂算法的边界情况处理),这对大脑记忆是 一个巨大的认知负担,需要高度集中注意力;而并行程序编写有更多困难,如需要考虑多线程中诸如同步等复杂繁琐的细节。由于并发执行中的不可预测性,程序的 调试查错也十分困难;而且,大规模数据处理时程序员需要考虑诸如数据分布存储管理、数据分发、数据通信和同步、计算结果收集等诸多细节问题。
    MapReduce提供了一种抽象机制将程序员与系统层细节隔离开来,程序员仅需描述需要计算什么(What to compute),而具体怎么去计算(How to compute)就交由系统的执行框架处理,这样程序员可从系统层细节中解放出来,而致力于其应用本身计算问题的算法设计。

  6. 平滑无缝的可扩展性
    这里指出的可扩展性主要包括两层意义上的扩展性:数据扩展和系统规模扩展性。
    理想的软件算法应当能随着数据规模的扩大而表现出持续的有效性,性能上的下降程度应与数据规模扩大的倍数相当;在集群规模上,要求算法的计算性能应能随着节点数的增加保持接近线性程度的增长。绝大多数现有的单机算法都达不到 以上理想的要求;把中间结果数据维护在内存中的单机算法在大规模数据处理时很快失效;从单机到基于大规模集群的并行计算从根本上需要完全不同的算法设计。奇妙的是,MapReduce在很多情形下能实现以上理想的扩展性特征。
    多项研究发现,对于很多计算问题,基于MapReduce的计算性能可随节点数目增长保持近似于线性的增长。

MapReduce怎么做

抽象模型

Map: 对一组数据元素进行某种重复式的处理
Reduce: 对Map的中间结果进行某种进一步的结果整
MapReduce抽象模型

MapReduce借鉴了函数式程序设计语言Lisp中的思想,定义了如下的Map和Reduce两个抽象的编程接口,由用户去编程实现:

map: (k1; v1) → [(k2; v2)]:

  • 输入:键值对(k1; v1)表示的数据
  • 处理:文档数据记录(如文本文件中的行,或数据表格中的行)将以“键值对”形式传入map函数;map函数将处理这些键值对,并以另一种键值对形式输出处理的一组键值对中间结果[(k2; v2)]
  • 输出:键值对[(k2; v2)]表示的一组中间数据

reduce: (k2; [v2]) → [(k3; v3)]:

  • 输入: 由map输出的一组键值对[(k2; v2)] 将被进行合并处理(combiner)将同样主键下的不同数值合并到一个列表[v2]中,故reduce的输入为(k2; [v2])
  • 处理:对传入的中间结果列表数据进行某种整理或进一步的处理,并产生最终的某种形式的结果输出[(k3; v3)]
  • 输出:最终输出结果[(k3; v3)]

我们可以摘引官方文档这段的描述:

Input and Output types of a MapReduce job:
(input) < k1, v1 > -> map -> < k2, v2 > -> combine -> < k2, v2 > -> reduce -> < k3, v3 > (output)

范例-词频统计

有以下四组原始文本数据:

  • Text1: the weather is good
  • Text2: today is good
  • Text3: good weather is good
  • Text4: today has good weather

统计每个单词出现的次数

传统的串行处理方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
String[] text = new String[] { "the weather is good",
"today is good",
"good weather is good",
"today has good weather"};
Hashtable ht = new Hashtable();
for(int i = 0; i < text.length; ++i) {
StringTokenizer st = new StringTokenizer(text[i]);
while (st.hasMoreTokens()) {
String word = st.nextToken();
if(!ht.containsKey(word)) {
ht.put(word, new Integer(1));
} else {
// 计数加1
int wc = ((Integer)ht.get(word)).intValue() +1;
ht.put(word, new Integer(wc));
}
}
}
for (Iterator itr=ht.keySet().iterator(); itr.hasNext();) {
String word = (String)itr.next();
System.out.print(word+ ":"+ (Integer)ht.get(word)+";");
}
}

MapReduce的处理方式:
使用4个map节点:

  • map节点1:
    输入: (text1, “the weather is good”)
    输出: (the, 1), (weather, 1), (is, 1), (good, 1)

  • map节点2:
    输入: (text2, “today is good”)
    输出: (today, 1), (is, 1), (good, 1)

  • map节点3:
    输入: (text3, “good weather is good”)
    输出: (good, 1), (weather, 1), (is, 1), (good, 1)

  • map节点4:
    输入: (text4, “today has good weather”)
    输出: (today, 1), (has, 1), (good, 1), (weather, 1)

Reduce节点