[关闭]
@chanvee 2014-06-10T16:40:01.000000Z 字数 7593 阅读 6197

ODPS MapReduce 入门

ODPS Bigdata

ODPS(Open Data Process Serviec)是阿里自己搭建的一个大规模分布式处理服务平台,MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算,本次阿里的大数据竞赛第二轮需要在ODPS上利用MapReduce进行处理运算。由于以前从没有接触过这方面的东西,在通过各种大牛博客的指导下,慢慢的终于摸索出了一点门道,因此略作总结如下。

MapReduce原理简介


MapReduce示例图
(本图引自Introduction to Hadoop
MapReduce被称为Google技术“三宝”[1]之一,以MapReduce中最经典的wordcount应用为例,来分析一下MapReduce的全过程。这里我们要统计文件中每个单词出现的次数。

  • Input 代表的是需要处理的原始数据,一共有3行。
  • Splitting 表示分配任务,这里把任务分给3台机器同时处理,每台机器只负责处理一行的数据。
  • Mapping 表示这3台机器具体要做的事情。在这里每台机器要做的就是统计一行文字里的单词频率。这里就涉及到比较重要的一个概念,就是key和value。这里key就是单词,value就是这个单词在这一行出现的次数。
  • Shuffling 表示对Mapping步骤产生的9行数据,按照key进行分组。这里分成了4组,每组交给一台电脑去处理。
  • Reducing 表示把相同key对应的value相加,每个key最终只输出一行,依然是key,value的形式输出。
  • Final result 表示把Reducing的输出合并。

为何要如此设计?简单来说,因为MapReduce为的是能实现分布式运算,涉及到多台机器同时运算的步骤有Mapping和Reducing,参与Mapping工作的机器可以完全独立工作而不需要知道其他机器上有什么数据;参与Reducing步骤的机器,由于数据之前已经按照key进行了分组,因此其他机器上有什么数据与他毫无关系。参与计算的机器都是互相独立,完全不依赖其他机器的数据,这样就可以很方便写代码,因为所有参与Mapping工作的机器使用一模一样的代码,所有参与Reducing工作的机器也使用一模一样的代码。

我们要在ODPS上要实现MapReduce,就需要写两类代码,一类称为Mapper,另一类称为Reducer。抛开前面所说的原理,我们只需要记住以下两点:

  • Mapper每次只处理一行数据。即Mapper的Input是数据库中的一条记录。
  • Reducer每次要处理的是相同key下的所有记录,通常会是多行的。

目标


通过MapReduce计算对每个user对不同品牌产生的4种行为(点击、购买、收藏、购物车)的次数进行统计,其中对于点击次数分别统计第一个月(first_click)、第二个月(second_click)、第三个月(third_click)和第四个月(fourth_click)的次数,对于购买统计四个月总的购买次数(buy),对于收藏和加入购物车统计两者总的次数(collect_basket),并把上述6个统计作为feature,最后在对这些features赋权值进行耦合:

Input table(示例):

user_id brand_id type visit_datetime
101 20001 0 05-01
101 20001 0 06-22
101 20002 0 07-02
101 20002 1 04-17
101 20003 0 08-05
101 20001 0 06-23
101 20003 2 05-13
101 20003 3 07-03
101 20003 1 06-24

Output table(示例):

user_id brand_id first_click second_click third_click fourth_click buy collect_basket score
1 20001 0 0 1 1 0 1 0.45
10 20001 0 0 0 4 1 0 1.2
10 20002 0 0 1 1 0 1 0.45
101 20002 2 0 1 1 0 1 0.75
101 20003 1 0 1 5 2 1 2.1

比如在output table的第三行就表示用户10对品牌20002前四个月分别的点击次数,以及总的购买次数和总的收藏购物车次数,以及最后的得分。

由于需要实现累计求和,因此我们可以在Mapping步骤中,使用(user_id,brand_id)作为key,而(type,visit_datetime)作为value。

在ODPS中需要达到上述目标,需要动手实现3个类,这里我把他们命名为TestMapper,TestReducer,TestDriver,具体创建方法这里就不在赘述。

TestMapper


在开始之前你需要在eclipse中新建一个ODPS项目。然后在项目的src上右键->new->other,在Aliyun Open Data Processing Service下选择Mapper,接着eclipse会帮我们生成一个Driver的模板。

之前说过,Mapper的任务就是对读入的一行数据,接着输出key和value。key和value都属于Record类,并且key和value都可以由单个或者多个字段构成,在我们这个任务中,key由(user_id,brand_id)两个字段构成,value由(type,visit_datetime)构成。

与创建TestDriver的步骤类似,使用官方的模板创建一个名为TestMapper的java代码,同样官方模板把大多数代码都生成好了。

  1. public void setup(TaskContext context) throws IOException {
  2. key = context.createMapOutputKeyRecord();
  3. value = context.createMapOutputValueRecord();
  4. }

setup当中是对key和value进行初始化。其中key使用createMapOutputKeyRecord()进行初始化,value使用createMapOutputValueRecord()进行初始化。

在map函数中,record代表读入的一行数据,比如101, 20001, 0, 06-01,我们可以通过record.get(n)方法获取该行记录第n列的数据。并且方便的是,这里可以直接对读入的数据进行一个类型转换。例如record.getString()会把读入的数据转为字串,record.getBigInt()则会把读入的数据转为Long型整数。

在前面的设定当中,我们已经把Mapper输出的key定为(user_id,brand_id),value定为(type,visit_datetime),在map函数中,我们可以使用key.set()value.set()来分别赋予这4个值。

最后context.write(key, value)的意思是输出这条key-value,如果不写这行,Mapper就什么都不输出。一个Mapper可以有0个或多个key-value的输出,每调用一次context.write(key,value)就会输出一行。

TestMapper完整代码如下(省略import部分)

  1. public class TestMapper extends MapperBase{
  2. Record key;
  3. Record value;
  4. @Override
  5. public void setup(TaskContext context) throws IOException {
  6. key = context.createMapOutputKeyRecord();
  7. value = context.createMapOutputValueRecord();
  8. }
  9. @Override
  10. public void map(long recordNum, Record record, TaskContext context)
  11. throws IOException {
  12. key.set("user_id", record.getString(0));
  13. key.set("brand_id", record.getString(1));
  14. value.set("type", record.getString(2));
  15. value.set("visit_datetime", record.getString(3));
  16. context.write(key, value);
  17. }
  18. }

TestReducer


通常Driver和Mapper方面都很简单,大多情况下,计算工作都在Reducing步骤完成,因此Reducer的代码会略多一些。同样按照前面的方法生成名为TestReducer的Reducer类。

这里又再啰嗦一遍,Mapper每次只处理一行数据,而Reducer通常处理的不止一行,而是会处理属于相同key的所有数据。翻到文章开头的那张图片,图中第二个Reducer,所有key为Car的记录,全部交给该一个Reducer处理。

因此reduce函数当中的values参数是一个Iterator,通过调用values.next()来读取所有属于该key的记录。每读取一行记录,进行相应的累加操作。

Reducer的output是一个Record类,可以通过output.set(n)来设定该output第n列的数值,同样使用context.write(output)输出一行数据。由于我们要按着月份对click进行统计,因此我们需要使用.compareTo来比较两个字符串的大小,它的用法是:对于字符串a和b,如果a.compareTo(b)<0,则aa.compareTo(b)>0,则a>b; a.compareTo(b)=0,则a=b。字符串的“大小”是按着字符在ASCII码的顺序来比较的。

TestReducer的完整代码(省略模板中的import部分)

  1. public class TestReducer extends ReducerMap{
  2. Record output;
  3. @Override
  4. public void setup(TaskContext context) throws IOException {
  5. output = context.createOutputRecord();
  6. }
  7. @Override
  8. public void reduce(Record key, Iterator<Record> values, TaskContext context)
  9. throws IOException {
  10. //各个计数器的初始化
  11. Long first_click = 0L;
  12. Long second_click = 0L;
  13. Long third_click = 0L;
  14. Long fourth_click = 0L;
  15. Long buy = 0L;
  16. Long collect_basket = 0L;
  17. double score = 0;
  18. while (values.hasNext()) {
  19. Record val = values.next();
  20. String date = val.getString("visit_datetime");
  21. int type = Integer.parseInt(val.getString("type"));
  22. if(type == 0){
  23. if(date.compareTo("05-15")<0){
  24. first_click += 1;
  25. }
  26. else if(date.compareTo("05-15")>=0 && date.compareTo("06-15")<0){
  27. second_click += 1;
  28. }
  29. else if(date.compareTo("06-15")>=0 && date.compareTo("07-15")<0){
  30. third_click +=1;
  31. }
  32. else{
  33. fourth_click += 1;
  34. }
  35. else if(type == 1){
  36. buy += 1;
  37. }
  38. else{
  39. collect_basket += 1;
  40. }
  41. }
  42. output.set(0, key.getString("user_id"));
  43. output.set(1, key.getString("brand_id"));
  44. output.set(2, first_click);
  45. output.set(3, second_click);
  46. output.set(4, third_click);
  47. output.set(5, fourth_click);
  48. output.set(6, buy);
  49. output.set(7, collect_basket);
  50. score = 0.05*first_click + 0.1*second_click + 0.15*third_click + 0.2*fourth_click + 0.4*buy + 0.1*collect_basket;
  51. output.set(8, score);
  52. context.write(output);
  53. }
  54. }
  55. }

TestDriver


Driver主要用来进行一些格式设定。官方写的很清楚了,所有的TODO部分是需要我们进行修改的。先来看第一个TODO:

  1. // TODO: specify map output types
  2. job.setMapOutputKeySchema(SchemaUtils.fromString("user_id:string,brand_id:string"));
  3. job.setMapOutputValueSchema(SchemaUtils.fromString("type:string,visit_datetime:string"));

这是用来设定Mapper输出的时候,key与value的格式。按照之前说的,以(user_id,brand_id)为key,(type,visit_datetime)为value。

第二个TODO:

  1. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(),job);
  2. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(),job);

设定input table与output table。这里把待会儿命令行调用中,第一个参数(args[0])设为input table,第二个参数(args[1])设为output table。

第三个TODO:

  1. // TODO: specify a mapper
  2. job.setMapperClass(TestMapper.class)
  3. job.setReducerClass(TestReducer.class)

告诉系统这次任务要用的Mapper和Reducer是谁,按照上面的设定之后,系统就会通知所有负责Mapping工作的电脑待会儿使用TestMapper中的代码进行运算,通知所有负责Reducing工作的电脑待会儿使用TestReducer中的代码进行运算。

TestDriver完整代码如下(省略开头import的部分)

  1. public class TestDriver {
  2. public static void main (String[] args) throws OdpsException {
  3. JobConf job = new JobConf();
  4. // TODO: specify map output types
  5. job.setMapOutputKeySchema(SchemaUtils.fromString("user_id:string,brand_id:string"));
  6. job.setMapOutputValueSchema(SchemaUtils.fromString("type:string,visit_datetime:string"));
  7. // TODO: specify input and output tables
  8. InputUtils.addTable(TableInfo.builder().tableName(args[0]).build(),job);
  9. OutputUtils.addTable(TableInfo.builder().tableName(args[1]).build(),job);
  10. // TODO: specify a mapper
  11. job.setMapperClass(TestMapper.class)
  12. job.setReducerClass(TestReducer.class)
  13. RunningJob rj = JobClient.runJob(job);
  14. rj.waitForCompletion();
  15. }
  16. }

打包、上传、建表、运行


1.在Package Explorer中你之前建立的ODPS项目下的src上右键,选择Export,然后选择Java底下的JAR file。接着设定下JAR包存放的位置与文件名。这里假设我们放在C:\TOOLS\test.jar,然后点Finish。

2.打开odps console,新建一个resource

  1. odps:tianchi_123> create resource jar C:/tools/test.jar -f

3.在实际运行之前,需要先建立一个表,作为结果输出的位置。这里我们就叫它tb_output好了。进入sql,建立表格

  1. odps:tianchi_123> sql
  2. odps:sql:tianchi_123> create table tb_output (user_id string, brand_id string, first_click bigint,second_click bigint,third_click bigint,fourth_click bigint,buy bigint, collect_basket bigint);

4.在odps console下,执行MapReduce任务

  1. odps:tianchi_1234> jar -resources test.jar -classpath c:/tools/test.jar TestDriver t_alibaba_bigdata_user_brand_total_1 tb_output;

结尾


这是第一次接触MapReduce,也是第一次写Java的代码,刚开始时真的不知道如何下手,看了两三天文档也很茫然,所以非常感谢那些无私的大神将自己的经验和知识在自己的博客上分享出来,现在感觉自己会的东西真的太少[2],以后要多向这些大神们学习。


[1] google 技术“三宝”: GFS、MapReduce和大表(BigTable)。
[2] 原稿
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注