System|分布式|MapReduce

2021-11-22 10:30:38 浏览数 (1)

MapReduce被称为谷歌的三驾马车之一,主要面向谷歌的分布式计算,主要思想来自函数式编程。

计算模型

Map和Reduce是Lisp的两个原语。

  • map:某个集合按照一定的映射关系映射到另一个集合,
  • reduce:将某个集合按照一定的计算规则逐个与之前的结果运算生成一个值
代码语言:javascript复制
map(['add','bacon','to','me'], a->a[-1] a[0:-1])
echo >  ['dad','nbaco','ot','em']
reduce(['add','bacon','to','me'], a,b->a b)
echo > 'addbacontome'

但是在这里map和reduce和传统的函数式编程差别还是很大的。

map: 输入键值对生成一个中间键值对集合

所有key相同的值组合成集合

reduce: 将键和键对应的值集合生成较小的集合,通常0/1个输出。集合通过迭代器遍历。

map和reduce都是没有副作用的纯函数。

代码语言:javascript复制
map (in_key, in_value) -> list(out_key, intermediate_value)
reduce (out_key, list(intermediate_value)) -> list(out_value)

实现

谷歌表示不同情形下应该选取不同的实现,但是在他们分布式的环境下给出了实现。

  1. User通过参数将输入分为M份,同时fork出很多进程到不同机器
  2. 某个特殊的进程被称为master,其他都是worker,master将M个map任务,R个reduce任务分配给idle的worker,简便点就叫mapper和reducer好了
  3. mapper在完成读入的split的计算后将结果分成R个文件(如哈希),并通知master我这准备好了并告诉自己的地址,进入idle状态
  4. master得到通知,告诉reducer有份数据准备好了,快去读。等worker读完所有的中间键值对,进行排序使得来自不同mapper的同键键值对分到同组。
  5. reducer遍历每一组并调用reduce,结果append到远程的输出文件,进入idle状态
  6. 所有reducer均执行完毕后,通知user计算完成

输出文件通常是其他MapReduce的输入,通过map和reduce组合生成复杂的计算。

master持有metadata,例如worker的id、状态,中间键值对文件的size、地址(说明map完成)用于通知正在进行的reducer。

worker 故障

master定期ping worker听心跳,如果worker挂了,task重新调度给idle worker。

需要注意的是,mapper的结果存在本地,reducer的结果存在gfs里面,因此已经完成的map依然需要被重新调度,而已经完成的reduce则不需要。同时master通知还没有来得及读的reducer,应该更换读的地址。

master 故障

打log,做备份,从checkpoint开始启动;如果是单点故障那就莫得了,计算失败。

如何handle复活的worker

对于mapper,采取幂等操作,master已经知道有任务完成的情况下忽略completion请求。

对于reducer,采取原子性操作,将临时文件rename为最终文件,利用文件系统rename原子性,保证输出结果唯一。(如果是那种确定性输出的函数,那输出都是一致的)

Locality

mapper优先访问gfs里最近的copy

任务粒度

对master而言,时间复杂度 O(M R) ,空间复杂度 O(M ∗ R)因为要记录每个中间结果

所以M一般能够让每个worker分到64mb即可,R一般是worker的几倍。

多备份

选择多备份任务同时执行,而不需要等到任务失败后再调度,无论备份里哪个完成,都可以先抢占master(见上文如何handle复活的worker)。备份越多效率越高,因为是取执行时间的最小者。通常执行时间长的任务更应该备份。

改进

  1. 切分算法: 用hash(Hostname(urlkey))mod R使得输出文件一致
  2. Order:中间键值对有序处理,这样结果处理方便
  3. Combiner: 自定义整合reducer的结果给user
  4. type: 自行实现reader接口,从非文件里读
  5. 副作用作为额外输出: 原子性rename,但是多文件不支持。
  6. 跳过坏的记录:保存序列号,失败传给master,master要是看到几个worker在同一序列号上失败分配的时候就跳过这个记录
  7. 本地执行: 增加flag允许debug
  8. status: master监控状态便于debug
  9. counter:用户可以对于采用计数器,便于监控事件发生
代码语言:javascript复制
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");

总结

现在spark比较流行,而且性能也碾压了hadoop。

但是mr的思想还在,也就是函数式编程从原本的函数算子,变成现在分布式的算子。

spark的优化主要是:

  • 中间结果内存化
  • 算子增加
  • 进程 -> 线程级

但思路和MapReduce其实是一脉相承的,原理万变不离其宗。这几架马车的实现虽然过时了,但是后继者无一例外都继承了前辈的遗产。


新时代的MapReduce

  • Pregel [SIGMOD’10] for graphic computing(now in Spark)
  • Spanner [OSDI’12] Global distributed database using TrueTime API
  • DataFlow Map-Reduce is retired in Google

Problem: 提供泛用的分布式计算模型,面向异构数据

Related work: conceptually straightforward

Observation: MapReduce计算模型

Solution: Map和Reduce按Task粒度分配给并发worker

Evaluation: 性能存在问题

Comments: map和reduce非图灵完备,表达能力有限

0 人点赞