MapReduce 初体验

October 1, 2008 – 11:55 pm

最近一直都比较忙,blog 都好久没有更新了,不过正好今天十一开始放假,趁机了解了一下 MapReduce 相关的东西,想着还是把自己的经历记下来吧。

MapReduce 最先由 Google 实现并在论文“MapReduce: Simplified Data Processing on Large Clusters”中描述。差不多就是对计算模型做一些限制,让情况得以简化,在有 MapReduce 库支持的情况下,可以非常方便地写出一个应用,并且能够自如地在大型分布式系统里 scale 自如。

map 和 reduce 两个词来源于 Lisp 里的两个基本的函数,其中 map 是将一组元素映射到另一组元素的操作,而 reduce 则是讲一组元素组合起来得到最终结果的过程。举一个简单的例子,对一个数组 arr 里的数求平方和,就是先将 arr 通过“求平方”的 map 操作,然后再通过“加”操作 reduce 为最终结果,在 Ruby 里可以这样写(Ruby 里 reduce 被叫做 inject):

arr.map { |e| e*e }.inject(0) { |sum, e| sum + e }

但是 MapReduce 里的 map 其实做的事情比 Lisp 里的 map 要稍微多一点:在分布式的世界里,数据量经常都非常大,一个最常用的处理办法就是分组处理,而分组的规则可以是各种各样,原始数据的分组方式也许并不适合问题的处理,需要换一种方式进行分组,而这个分组变换的工作也在 map 里完成。

reduce 拿到的数据就是已经按照适合问题处理的分组方式分好的数据,因此就可以很简单地把它们组合到一起,得到最终结果了。

再举一个很简单的例子:统计文档中各个单词出现的次数。文档的数量会很大,因此可能被分组进行存储,分组的方式是(比如)按照文档的来源进行分类,从同一个 host 下载的文档被放到一起。完成这个任务的 map 函数要做两件事情:

map(doc, words):
    for word in words:
        emit(word, 1)

一是将单词映射为“1”,亦即表示单词出现了一次;二是将原来的分组方式从“按照 doc 分组”变为“按照 word 分组”。按照 word 被分到同一组的数据会最终传输给 reduce ,它则负责进行化归得到结果:

reduce(key, values):
    result = 0
    for v in values:
        result += v
    emit result

由于最初的数据是分组的,因此可以将各个组的数据在不同的节点上并行地处理;又由于 map 的结果也是分组的,所以各个组的 reduce 工作也可以在不同的节点上并行执行。这便是 MapReduce 的威力所在。当然这只是问题的一个大致描述,真正实现一个可以在实际系统中应用的 MapReduce 系统实际上还是会碰到很多棘手的问题,Google 的那篇论文可以作为一个参考,另外,也有若干开源的 MapReduce 实现可以参考。

不过大部分人都不会想要自己去实现这样一个系统,如果只是想要用这样的系统的话,有不少开源的实现可以用,例如:

  • disco:核心由 erlang 写成,外部接口是 Python 的。
  • skynet:一个 Ruby 的 MapReduce 实现。
  • Hadoop:一个非常成熟的 Java 的 MapReduce 实现。

我首先尝试了一下 disco ,至少它看起来是很 promising 的:核心是用 erlang 写的,erlang 目前也是越来越红火,许多分布式系统都采用 erlang 来实现,也正是证明了它自己的价值。而接口采用 Python 的方式,也是处于易用性的考虑,用起来会非常方便。而且这个项目是由 Nokia Research Center 发起的。

不过 disco 这个项目似乎还是处于相当早期的阶段,现在可以下载到的发布版本是 0.1 ,虽然对版本号的解释方式各有喜好,也有像 Google 这样的千年 beta 存在。 ^_^bb

不过其实 disco 的目的就是要“轻量级”,引用其官方主页上的原话是:

…a lightweight framework for rapid scripting of distributed data processing tasks.

于是我便去下载下来尝试了一下。我没有两千个节点的 cluster 可以玩,不过我还是能方便地弄到两三台 Linux 机器(其实也可以在单机上玩,或者用数个虚拟机组成一个虚拟网络)。按照 disco 主页上的 setup instructions 来安装。

一开始安装的时候就出了一点问题,erlang 在源码的某一行报了一个很专业的错误,搜索了一下似乎没有什么结果,最后在 disco 的邮件列表里看到似乎是由于 erlang 的版本太老了。那台 Linux 机器似乎是一个比较老版本的 Ubuntu 了,源里的 erlang 版本相当老。去下载了最新版本的 erlang 源码包编译安装上之后,disco 也能正常安装了。第二台机器是 Arch ,源里的 erlang 已经是非常新的版本了,所以安装很容易。

之后按照指南上进行配置,建立用户、创建相应的目录、修改权限、修改 hosts 文件、配置 ssh 访问等等,实在是相当麻烦,就不在这里一一详述了。最后,一切准备就绪,便开始尝试其指南里给的那个 count word 的例子:

import sys
from disco import Disco, result_iterator
 
def fun_map(e, params):
    return [(w, 1) for w in e.split()]
 
def fun_reduce(iter, out, params):
    s = {}
    for w, f in iter:
        s[w] = s.get(w, 0) + int(f)
    for w, f in s.iteritems():
        out.add(w, f)
 
master = sys.argv[1]
print "Starting Disco job.."
print "Go to %s to see status of the job." % master
results = Disco(master).new_job(
                name = "wordcount",
                input = ["http://discoproject.org/chekhov.txt"],
                map = fun_map,
                reduce = fun_reduce).wait()
 
print "Job done. Results:"
for word, frequency in result_iterator(results):
        print word, frequency

可是运行不起来,似乎是 disco 在安装的时候并没有把 Python 相关的库安装到 Python 的 site-packages 下面。手工指定 PYTHONPATH 之后,脚本可以跑起来了,但是还是报错了,从一大堆错误输出中找到了“Internal Server Error”的关键词。通过 disco server 的 web 页面登录上去修改节点的配置之类的也是只是给一个简单的提示说有错误,也不说具体是什么错误。弄了半天也没弄清楚是怎么回事,只好先放弃了。

然后我又尝试了 Hadoop ,相比起来,Apache 社区下的 Hadoop 应该算是非常成熟的项目了,雅虎做了很多贡献,并将其成功应用到了自己的大规模集群上。不过仔细一看,最新的 Hadoop 其实版本号也只有 0.18 ^_^bb 而且还有一个比较寒的地方:下载下来的 hadoop-0.18.1.tar.gz 其实是被用 gzip 压缩了两次,应该是 hadoop-0.18.1.tar.gz.gz 才对,要用 gunzip 解压两次才行。-.-bb

不过,按照 Quickstart 中的步骤,很容易就能把 Hadoop 跑起来,它有三种模式:

  • Standalone Operation:只有一个 Java 虚拟机在跑,完全没有分布式的成分。
  • Pseudo-Distributed Operation:在同一台机器上启动独立数个 JVM 进程,进行“分布式”操作。
  • Fully-Distributed Operation:真正的可以运行于多台机器上的分布式模式。

前面两种模式都很容易测试,要进行完全分布式的测试则相对麻烦一些。由于我手上的 Linux 机器都是不同的系统(Debian、Ubuntu、Arch),配置各个节点不能统一地进行,一一配置太麻烦了,所以最后我只用了两台机器做测试。Hadoop 需要一个 dfs 的中央服务器节点,一个 job tracker 的中央服务器节点,剩下的则是从节点,用于执行 map 和 reduce 的任务。我把 10.13.122.229 同时用作 dfs 和和 job tracker 的中央服务器节点(同时也作一个从节点),而 10.214.17.222 作为一个从节点。其中 Hadoop 的 site 配置文件 conf/hadoop-site.xml 如下:

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 
<!-- Put site-specific property overrides in this file. -->
 
<configuration>
        <property>
                <name>fs.default.name</name>
                <value>hdfs://10.13.122.229:9000/</value>
        </property>
        <property>
                <name>mapred.job.tracker</name>
                <value>10.13.122.229:9001</value>
        </property>
</configuration>

再在 conf/slaves 中列出从节点的 ip 地址或者是主机名。这样大致的一个测试系统就配置好了,把 hadoop 复制到从节点的相同路径下面,并配置好相互的无密码的 SSH 访问。接下来首先将 hdfs 文件系统格式化:

bin/hadoop namenode -format

HDFS 是 Hadoop 项目里实现的一个分布式文件系统,一个 MapReduce 实现有一个稳定高效的分布式文件系统支持是最好的了,Google 的实现有 GFS 支持,而其他的大部分实现也都可以建立在 Amazon Elastic Compute Cloud (Amazon EC2) 之上。不过,让文件系统节点和 MapReduce 的任务节点是同样的集群当然是首选的,这样 MapReduce 的调度可以根据数据的分布来调度任务,尽量让数据可以从本地取到,避免大量数据阻塞网络造成瓶颈的情况。

不过 HDFS 有一个缺点就是并不是像普通的 Linux 文件系统那样直接 mount 上使用的,而是提供了命令行接口,需要显式地将数据存入文件系统或者从中取出来,例如:

bin/hadoop fs -put foo bar

就是把本地文件系统中的 foo 以“bar”这个名字存入 HDFS 中。在默认配置下,HDFS 会将数据文件存放在 /tmp 下,实际使用中应该将这个配置到合适的地方。

不过我总是在 -put 的时候出错,得到“could only be replicated to 0 nodes, instead of 1”的错误,按照这里的办法,重新格式化文件系统,之后要先访问一下 DFS 的 web 页面查看一下健康状况以及统计信息,便能正常地使用了。 -.-bb

之后运行 Hadoop 提供的 grep 的例子,一切正常。 :) 今天 MSTC 去爬山,玩得很开心,不过也还挺累的,没有精力再写下去了。至于如何在 Hadoop 的基础上做自己的 MapReduce 应用,就留到下次再说吧! :D

  1. 2 Responses to “MapReduce 初体验”

  2. 太赞了!

    By zhouyuan on Oct 2, 2008

  3. 邪恶到足够有趣了,,好也!

    By Zoom.Quiet on Oct 28, 2008

Post a Comment