Hadoop 实战:谁是最倒霉的人?

October 4, 2008 – 2:04 pm

hadoop-logo.gif

上一次介绍了 MapReduce 的工作方式以及 Hadoop 这个开源的 MapReduce 实现,这次尝试用 Hadoop 来写一个简单的应用。要解决的问题是这样的:现在我手里有大量的邮件数据,并且我知道每封邮件是正常邮件还是垃圾邮件,现在我想要找出收到的邮件中垃圾邮件最多的人,亦即找出“谁是最倒霉的人”。

首先是 Map 的过程,输入数据是一封一封的邮件,彼此之间没有任何关联,因此可以很自然地分组处理。Map 将邮件转化到以邮件的收件人进行分组,如果邮件是垃圾邮件,则映射到收件人的垃圾邮件数“+1”。Reduce 的过程就是将各个收件人的邮件数统计结果加起来。

在 Hadoop 中实现一个 map 过程只需要实现 Mapper 接口就行了,一般同时继承自 MapReduceBase ,可以省下不少力气。map 接受 key, value 的 pair ,这是按照初始输入数据进行分组的,通常 map 方法从 value 中解析感兴趣的内容,并进行重新分组。map 的另一个 OutputCollector 类型的参数就是专门用于收集 map 之后的新分组的。map 方法看起来是这个样子:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public void map(LongWritable key, Text value,
		OutputCollector<text, LongWritable> output, Reporter reporter)
		throws IOException {
	String header = value.toString();
 
	if (isSpam(header)) {
		String addr = getToAddr(header);
		if (addr != null) {
			output.collect(new Text(addr), new LongWritable(1));
		}
	}
}

由于数据需要在网络上传输,Hadoop 要求 Key 和 Value 的类型都必须是可以序列化的,不过这不是 Java 自带的那个序列化接口 Serializable ,而是 Hadoop 自己定义的一个更加简易的 Writable 接口。另外,由于 Key 是需要用于进行排序和分组的,所以需要实现的是更加具体的一个叫做 WriteComparable 的接口。不过,常用的数据类型 Hadoop 都提供了现成的支持,比如 Text 可以用于存放文本,LongWritable 可以用于存放长整型数据。

在这个任务中,我 map 的输入 key 是长整型,在这里是分组时在文件中的 offset ,这里不需要用到它,直接忽略。而 value 是一封邮件的邮件头的内容。首先我要判断邮件是否是垃圾邮件。原本我可以用作好的分类器进行在线分类,或者使用一个已经做好的 index 来进行判断,不过为了让示例简单一些,我对邮件内容进行了预处理,直接将垃圾邮件标记作为一个邮件头域插入了邮件首部。因此判断是否是垃圾邮件的代码是直接从邮件头里搜索相应标签:

1
2
3
4
5
6
7
private boolean isSpam(String header) {
	Matcher matcher = labelPattern.matcher(header);
	if (matcher.find() && matcher.group(1).equalsIgnoreCase("spam"))
		return true;
	else
		return false;
}

判断出是垃圾邮件之后,就解析出邮件的收件人地址,这里暂时不考虑多个收件人或者有抄送之类的情况。得到收件人之后(比如,是 foo@bar.com),就算得到了一组结果。这将作为后面 reduce 任务的输入:key 的类型是 Text ,亦即收件人地址;value 的类型是 LongWritable ,即收到的垃圾邮件数目。当发现一封垃圾邮件时,就将这个中间结果 (foo@bar.com, 1) 收集起来。

这就完成了 map 的过程,之后应该是 reduce ,这个过程很简单,系统会按照 map 的结果将各个分组的结果传递到 reduce 函数,这里 reduce 只要把各个 LongWritable 加起来得到总和就可以了。在 Hadoop 自带的 Word Count 的示例中有一个类似的例子(不过这里加的是 IntWritable):

1
2
3
4
5
6
7
8
9
10
11
12
public class Reduce extends MapReduceBase
                    implements Reducer<text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<intWritable> values,
                    OutputCollector<text, IntWritable> output,
                    Reporter reporter) throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));
    }
}

不过因为 Hadoop 自带了一个 LongSumReducer 可以完成我们需要做的事情,就不用自己再费力写一个了。 :p 另外,Hadoop 在 map 和 reduce 之间还有一个叫做 combine 的步骤,可以看作是“本地的 reduce ”。除了某些特殊情况,一般 combine 和 reduce 做的事情是一样的(因此这两个任务通常也是通过同一段代码来实现的),只是 combine 只在本地运行,将当前节点得到的局部结果进行一下局部的 reduce ,这样通常可以减少需要进行网络传输的数据量。例如,如果当前节点发现了 5 封发给 foo@bar.com 的垃圾邮件,需要对 5 个 (foo@bar.com, 1) 进行 reduce 调度,而经过本地 combine 之后,只需要处理一个 (foo@bar.com, 5) 就可以了。

map 和 reduce 做好之后,新建一个任务,并告知用于完成 map 和 reduce 任务的类:

countJob.setMapperClass(CountMapper.class);
countJob.setCombinerClass(LongSumReducer.class);
countJob.setReducerClass(LongSumReducer.class);

再设置输入输出的 key 和 value 的类型:

countJob.setInputKeyClass(LongWritable.class);
countJob.setInputValueClass(Text.class);
 
countJob.setOutputKeyClass(Text.class);
countJob.setOutputValueClass(LongWritable.class);

配置好之后调用 JobClient.runJob(countJob) 就可以开始 MapReduce 了。大致的流程就是这样,不过中间还有一些细节需要处理。一个问题就是结果如何输出?我们最终得到的是一些 (Text, LongWritable) 的 pair ,一个办法是让它以文本方式按行输出到文本文件中,这样只需要使用内置的 SequenceFileOutputFormat 即可:

countJob.setOutputFormat(SequenceFileOutputFormat.class);

另一个问题则是输入的格式,Hadoop 需要理解了输入文件的格式才能将其解析并作为参数传递给 map 函数,更重要的是:进行合适的分割,将任务分配到各个节点上去。Hadoop 使用 InputFormat 来控制输入格式,默认情况下,将输入目录中每个文件当作以行为单位的文本进行处理和分割,除此之外 Hadoop 也内置了二进制记录等文件类型的支持。不过我这里的情况比较特殊:一个文件中只有一封邮件,需要当作一个不可分割的原子来处理。因此我实现了一个 AtomFileInputFormat ,并在 isSplitable 方法中总是返回 false

public class AtomFileInputFormat extends FileInputFormat<longWritable, Text> {
 
	@Override
	public RecordReader<longWritable, Text> getRecordReader(InputSplit genericSplit,
			JobConf job, Reporter reporter) throws IOException {
		reporter.setStatus(genericSplit.toString());
		return new FileRecordReader(job, (FileSplit)genericSplit);
	}
 
	@Override
	protected boolean isSplitable(FileSystem fs, Path path) {
		return false;
	}
}

另外,我定制了一个 FileRecordReader (需要实现 RecordReader 接口),从每个文件中读取邮件头,丢掉邮件的内容部分。

@Override
public boolean next(LongWritable key, Text value) throws IOException {
	if (hasRead) {
		return false;
	} else {
		Path file = split.getPath();
		FileSystem fs = file.getFileSystem(job);
 
		key.set(0);
 
		FSDataInputStream in = fs.open(file);
		BufferedReader reader = new BufferedReader(new InputStreamReader(in));
 
		StringBuilder sb = new StringBuilder();
		while (true) {
			String line = reader.readLine();
			// empty string, email headers and body are separated by an
                        // empty line
			if (line == null || line.length() == 0)
				break;
			sb.append(line);
			sb.append('\n');
		}
		value.set(sb.toString());
 
		reader.close();
 
		pos = split.getLength();
		hasRead = true;
		return true;
	}
}

这样差不多任务就完成了,不过还有一点就是最后的结果是按照 key 进行排序的,亦即按照收件人地址排序,而我期望找到“最倒霉”的人,甚至需要按照“倒霉程度”进行排序,所以我再启动另一个 MapReduce 任务,将 key 和 value 颠倒过来,并进行排序。由于这是一个非常常见的任务,Hadoop 内置了相应的支持,因此只需要把内置的 InverseMapperIdentityReducer 组装起来即可:

sortJob.setInputFormat(SequenceFileInputFormat.class);
sortJob.setMapperClass(InverseMapper.class);
sortJob.setNumReduceTasks(1); // write a single file
FileOutputFormat.setOutputPath(sortJob, new Path(args[1]));
sortJob.setOutputKeyComparatorClass(LongWritable.DecreasingComparator.class);

这样就能得到最终结果了!完整的代码可以从这里下载。

最后,总结一下:这样一个简单的 AtomFileInputFormat 似乎没有达到预期的目的,Hadoop 默认情况下似乎把“一个文件”当作一个“很大”的单位了,通常都是考虑将文件进行分割,再分派到各个节点,因此每个文件最少启动一个 map 任务,而现在我的情况是一个文件中只有一条数据,结果似乎是每一个 map 操作都启动了一个新的 map 任务。如果要改进的话,一个是更加深入地定制 InputFormat 的风割方法;另一个办法是对输入数据进行预处理,后者要方便一些,但是在实际应用中如果遇到数据量大到进行预处理本身就需要 MapReduce 的支持的话,就有些不现实了。 ;) 另外,这种分布式的应用出了问题之后不知道要怎样调试才好?且不说调试器的方法肯定不管用了,就算最原始的 printf 大法,也不知道最终的输出会 print 到苍穹的哪一个角落里去了 :p 必须要有一个强大的 log 系统才行,而且如果是在单机环境下也能成功重现的那种 bug 的话,应该也会好姐姐一些。

总之,在上一次介绍的时候我就说了:MapReduce 是对分布式任务进行了一定的限制而得到的结果,考虑到 80%/20% 原则,它可以相当棒地完成一部分类型的任务(也许还不到 80% 吧),但是如果妄图任何事情都要用它来搞,大概也就是削足而适履了。还有就是,没有像样的硬件环境,玩 MapReduce 真的一点都不好玩! :D

  1. 14 Responses to “Hadoop 实战:谁是最倒霉的人?”

  2. 呵呵,尽管我们的项目里也写了些hadoop map/reduce的程序,但目前还都是一直单机运行呢:)

    真需要严肃的应用的话,Amazon ec2就是perfect match了:可以根据需要方便的launch成千上万的instances.然后用完关掉就好了–按使用时间收费.对ec2的small instance. 1000台机器一小时$1000×0.10=$100.

    By yawl on Oct 5, 2008

  3. @yawl,
    恩,感觉 Amazon 在这方面做得很好,集群这种东西自己如果并不是一直都要用的话,购买费用太高了,而且维护起来成本也非常高。他提供这种便利的“即插即用”的服务应当是云计算(以及类似的概念)的趋势吧。

    By pluskid on Oct 5, 2008

  4. 我一边看一边想这哥们居然把hadoop用到垃圾邮件上

    仔细一看原来是你的blog

    By pieerepeng on Oct 6, 2008

  5. 特征抽取的时候,比如PPM,整个就可以mapreduce

    logistic回归的特征抽取也可以分为两个mapreduce,第一个mapreduce提取词频,第二个mapredce抽取特征

    By pieerepeng on Oct 6, 2008

  6. @pieerepeng,
    恩,PPM 不是用 raw bytes 的吗?还需要做特征抽取啊?我正想找你问问呢,你以前为 PPM 做 model 的时候都是限制 5000 封邮件的?我今天用 60000 封邮件训练出来两个 150+ MB 的 model ,结果光是把这两个 model load 进来就花了 90+ 分钟然后 out of memory error 了。改成 binary IO 还是没有什么改观,要好好研究一下 Java IO 才行,PPM 的 model 如果能做得更 compact 一点才行。似乎每次过滤掉一部分权值低的节点并不能有效阻止 PPM model 很快变大啊。我倒是写过一个 C++ 的 PPM codec ,但是我也没有实际测试过性能,而且做成 JNI 怪麻烦的,跨平台都要重新编译……

    By pluskid on Oct 6, 2008

  7. 今天恰好看到的, Yahoo有4000 Hadoop nodes:

    http://developer.yahoo.com/blogs/hadoop/2008/09/scaling_hadoop_to_4000_nodes_a.html

    每个node的配置也很不错.

    By yawl on Oct 8, 2008

  8. @pluskid:
    结果光是把这两个 model load 进来就花了 90+ 分钟然后 out of memory error 了.

    请调整JVM的参数

    @yawl:
    It is really cool.

    By Jack on Oct 8, 2008

  9. 4000个node怎么做升级,呵呵?

    By Jack on Oct 8, 2008

  10. @Jack,
    调整过了。改成 binary IO ,然后加上 buffer ,性能得到了大幅提升。

    By pluskid on Oct 8, 2008

  11. pluskid精通这么多语言,是不是也发起个口水战,谈谈python ruby c++ java ..

    By trstn1 on Oct 9, 2008

  12. 这类东西,还是在实际应用中,才有更好的理解

    By Mick on Oct 28, 2008

  13. 代码链接下不了,能不能发我一份,谢谢!
    beast5117@yahoo.com.cn

    By Sue on Jan 30, 2010

  14. @Sue,
    你好,链接已经 fix 了,现在应该可以下载了。

    By pluskid on Jan 30, 2010

  15. 上面的代码链接下载不了,能不能给我传一份,谢谢
    beast5117@yahoo.com.cn

    By Sue on Jan 30, 2010

Post a Comment