经历了一个学期的洗礼之后,最终我还是选择了走大数据处理这条道路,个人觉得自己不是一个愿意扎实看论文潜心研究的人,所以机器学习->深度学习这条路不是特别适合我,还是更加愿意去写一些工程代码锻炼自己的能力。

1、MapReduce的理论基础

​ 那么什么是 MapReduce 呢?MapReduce 就是一种“分治”的思想,把一个大规模的数据操作,分解成一个个小数据集操作,同时分发到一个主节点管理的集群中进行任务工作。然后再把各个节点完成的工作合并在一起,这就得到了最终的结果。总得来说,MapReduce 就是 Map + Reduce,即任务的“分解”与结果的“汇总”。

​ Hadoop 中共有两种任务机器角色,一种是 JobTracker,一种是 TaskTracker,顾名思义,JobTracker 是用于工作的调度,而 TaskTracker 是用于执行工作的。其中,一个集群中只有一台 JobTracker。

​ MapReduce 框架负责了并行计算中的诸多问题,比如分布式存储、工作调度、负载均衡、容错均衡、容错处理以及网络通信等问题,抽象成了以上提到的 Map + Reduce 两个方法函数。

2、MapReduce 处理过程

mage-20180327105617

​ 整个过程可以简单做成以上的流程,『 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出 』,具体的内容,我们通过一个 WordCount 例子来说明。

3、WordCount 解析

​ WordCount是一个非常简单,但是又很能够体现 MapReduce 思想的程序,这个程序被 Hadoop 内置作为了一个测试程序,功能很简单,就是统计一个输入文件内每个单词的个数。

​ 我们暂时利用 maven 导入的 Hadoop 环境作为测试平台,pom 文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>2.6.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.4</version>
</dependency>
</dependencies>
1
2
3
4
5
6
7
/* 这里我们使用新版 API 来编写代码,原因如下:
* 新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。
* 例如,你可以添加一个方法(用默认的实现)到一个抽象类而不需修改类之前的实现方法。在新的API中,Mapper和Reducer是抽象类。
*
* 新的API广泛使用context object(上下文对象),并允许用户代码与MapReduce系统进行通信。
* 例如,MapContext基本上充当着JobConf的OutputCollector和Reporter的角色。
*/
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
public class WordCount {

/*
* Hadoop 提供了如下内容数据类型,均实现了 WritableComparable 接口,以便用这些类型定义的数据可以被序列化,用以网络传输和文件存储
* BooleanWritable: 标准布尔
* ByteWritable: 单字节
* DoubleWritable: 双字节
* FloatWritable: 浮点数
* IntWritable: 整型数
* LongWritable: 长整型数
* Text: UTF8 格式存储的文本
* NullWritable: <key,value> 中 key 或 value 为空的时候使用
*/

// KEYIN, VALUEIN, KEYOUT, VALUEOUT
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

@Override
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// map 方法中,value 值存储的是文本文件中的一行(回车符为行结束标记)
// key 值为该行首字母相对于文本文件首字母偏移量

/*
* 分割过程,例如:
* Hello World -> <0, "Hello World">
* Bye World -> <12, "Bye World">
* 其中偏移量包括了回车所占的字符数(注意 Windows 和 Linux/macOS/*nix 环境下换行符的差别)
* Windows: \r\n
* Linux/macOS/*nix: \n
* 老版本的 Mac OS, OS X: \r
*/

StringTokenizer itr = new StringTokenizer(value.toString()); // StringTokenizer 将每一行拆分成一个个单词

/*
* Map 过程,例如:
* <0, "Hello World> -> <Hello, 1> + <World, 1>
* <12, "Bye World> -> <Bye, 1> + <World, 1>
*/

while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
// 以 <word, 1> 作为方法结果输出,等于是做了一个词频的统计
context.write(word, one);
// 剩余工作交给 MapReduce 框架处理
}

/*
* 得到 map 方法输出的 <key,value> 后,Mapper 对这一些键值对按照 key 值进行排序(字典序)
* 再执行 Combine 过程,将 key 值相同的 value 累加,得到 Mapper 最终输出结果
* <Bye, 1> -> <Bye, 1>
* <Hello, 1> -> <Hello, 1>
* <World, 1> -> <World, 2>
* <World, 1> ↗
*/
}
}

// KEYIN, VALUEIN, KEYOUT, VALUEOUT
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();

@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// key 为单个单词,values 则是对应单词计数值组成的列表
// reduce 的过程就是遍历 values 求和的操作,得到某个单词的总词数

/*
* 首先 Reducer 对从 Mapper 接收的数据进行排序操作,再交给用户重写的 reduce 方法进行处理
* 得到新的 <key,value> 对,作为 WordCount 的输出结果
* <Bye, list(1,1)> -> <Bye, 2>
* <Hadoop, list(2)> -> <Hadoop, 1>
* <Hello, list(1,1)> -> <Hello, 2>
* <Word, list(2)> -> <Word, 2>
*/

int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
// 用 Configuration 类对 MapReduce Job 进行一个初始化
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (2 != otherArgs.length) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}

// 由 Job 对象负责管理和运行一个计算任务,通过 Job 的一些方法来对任务参数进行相关设置
// 对 Job 进行命名操作,使得在 JobTracker 和 TaskTracker 页面进行监视
Job job = Job.getInstance(conf, "word count");

// 设置主类
job.setJarByClass(WordCount.class);

// 设置 Job 的 Map(拆分操作)、Combiner(中间结果合并)、Reduce(合并操作)三个相关处理类
// 原始数据 -> 分割 -> map -> map 端排序 -> Combine -> Reduce 端排序 -> Reduce 输出
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);

// 设置 Job 输出结果,为 <key,value>
// 本例子中是 <单词,个数>,故 key 为 Text 型,Value 为 IntWritable 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

// 设置输入输出的路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 等待任务结束后退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}