hadoop(二MapReduce)

2022-10-17,

hadoop(二mapreduce)


介绍

mapreduce:其实就是把数据分开处理后再将数据合在一起.

  • map负责“分”,即把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算,彼此间几乎没有依赖关系。
  • reduce负责“合”,即对map阶段的结果进行全局汇总。
  • mapreduce运行在yarn集群

mapreduce中定义了如下的mapreduce两个抽象的编程接口,由用户去编程实现.map和reduce,

mapreduce处理的数据类型是键值对


代码处理

mapreduce 的开发一共有八个步骤, 其中 map 阶段分为 2 个步骤,shuwle 阶段 4 个步 
骤,reduce 阶段分为 2 个步骤

​ map 阶段 2 个步骤

  1. 设置 inputformat 类, 将数据切分为 key-value(k1和v1) 对, 输入到第二步
  2. 自定义 map 逻辑, 将第一步的结果转换成另外的 key-value(k2和v2) 对, 输出结果 
    shuwle 阶段 4 个步骤
  3. 对输出的 key-value 对进行分区
  4. 对不同分区的数据按照相同的 key 排序
  5. (可选) 对分组过的数据初步规约, 降低数据的网络拷贝
  6. 对数据进行分组, 相同 key 的 value 放入一个集合中 
    reduce 阶段 2 个步骤
  7. 对多个 map 任务的结果进行排序以及合并, 编写 reduce 函数实现自己的逻辑, 对输入的 
    key-value 进行处理, 转为新的 key-value(k3和v3)输出
  8. 设置 outputformat 处理并保存 reduce 输出的 key-value 数据

常用maven依赖

<packaging>jar</packaging> <dependencies> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-common</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-client</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-hdfs</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>org.apache.hadoop</groupid> <artifactid>hadoop-mapreduce-client-core</artifactid> <version>2.7.5</version> </dependency> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>release</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-compiler-plugin</artifactid> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> <!--    <verbal>true</verbal>--> </configuration> </plugin> <plugin> <groupid>org.apache.maven.plugins</groupid> <artifactid>maven-shade-plugin</artifactid> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <minimizejar>true</minimizejar> </configuration> </execution> </executions> </plugin> </plugins> </build>

入门---统计

结构

/*  四个泛型解释:    keyin :k1的类型    valuein: v1的类型    keyout: k2的类型    valueout: v2的类型*/public class wordcountmapper extends mapper<longwritable,text, text , longwritable> { //map方法就是将k1和v1 转为 k2和v2 /*      参数:         key    : k1   行偏移量(默认几乎一直固定为longwritable)         value  : v1   每一行的文本数据         context :表示上下文对象     */ /*      如何将k1和v1 转为 k2和v2        k1         v1        0   hello,world,hadoop        15  hdfs,hive,hello       ---------------------------        k2            v2        hello         1        world         1        hdfs          1        hadoop        1        hello         1     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { text text = new text(); longwritable longwritable = new longwritable(); //1:将一行的文本数据进行拆分 string[] split = value.tostring().split(","); //2:遍历数组,组装 k2 和 v2 for (string word : split) { //3:将k2和v2写入上下文            text.set(word);            longwritable.set(1);            context.write(text, longwritable); } }}


/*  四个泛型解释:    keyin:  k2类型    valulein: v2类型    keyout: k3类型    valueout:v3类型*/public class wordcountreducer extends reducer<text,longwritable,text,longwritable> { //reduce方法作用: 将新的k2和v2转为 k3和v3 ,将k3和v3写入上下文中 /*      参数:        key : 新k2        values: 集合 新 v2        context :表示上下文对象        ----------------------        如何将新的k2和v2转为 k3和v3        新  k2         v2            hello      <1,1,1>            world      <1,1>            hadoop     <1>        ------------------------           k3        v3           hello     3           world     2           hadoop    1     */ @override protected void reduce(text key, iterable<longwritable> values, context context) throws ioexception, interruptedexception { long count = 0; //1:遍历集合,将集合中的数字相加,得到 v3 for (longwritable value : values) {             count += value.get(); } //2:将k3和v3写入上下文中        context.write(key, new longwritable(count)); }}

public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "wordcount"); //如果打包运行出错,则需要加该配置        job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径        job.setinputformatclass(textinputformat.class); textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); //textinputformat.addinputpath(job, new path("file:///d:\\mapreduce\\input")); //第二步:指定map阶段的处理方式和数据类型         job.setmapperclass(wordcountmapper.class); //设置map阶段k2的类型          job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型          job.setmapoutputvalueclass(longwritable.class); //第三,四,五,六 采用默认的方式 //第七步:指定reduce阶段的处理方式和数据类型          job.setreducerclass(wordcountreducer.class); //设置k3的类型           job.setoutputkeyclass(text.class); //设置v3的类型           job.setoutputvalueclass(longwritable.class); //第八步: 设置输出类型           job.setoutputformatclass(textoutputformat.class); //设置输出的路径 path path = new path("hdfs://node01:8020/wordcount_out"); textoutputformat.setoutputpath(job, path); //textoutputformat.setoutputpath(job, new path("file:///d:\\mapreduce\\output")); //获取filesystem filesystem filesystem = filesystem.get(new uri("hdfs://node01:8020"), new configuration()); //判断目录是否存在 boolean bl2 = filesystem.exists(path); if(bl2){ //删除目标目录                 filesystem.delete(path, true); } //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}

shuwle阶段

分区

分区实则目的是按照我们的需求,将不同类型的数据分开处理,最终分开获取

代码实现

结构

public class mypartitioner extends partitioner<text,nullwritable> { /*      1:定义分区规则      2:返回对应的分区编号     */ @override public int getpartition(text text, nullwritable nullwritable, int i) { //1:拆分行文本数据(k2),获取中奖字段的值 string[] split = text.tostring().split("\t"); string numstr = split[5]; //2:判断中奖字段的值和15的关系,然后返回对应的分区编号 if(integer.parseint(numstr) > 15){ return 1; }else{ return 0; } }}

 //第三步,指定分区类            job.setpartitionerclass(mypartitioner.class); //第四, 五,六步 //设置reducetask的个数            job.setnumreducetasks(2);

mapreduce 中的计数器

计数器是收集作业统计信息的有效手段之一,用于质量控制或应用级统计

可辅助诊断系统故障

看能否用一个计数器值来记录某一特定事件的发生 ,比分析一堆日志文件容易

通过enum枚举类型来定义计数器 统计reduce端数据的输入的key有多少个

public class partitionerreducer extends reducer<text,nullwritable,text,nullwritable> { public static enum counter{            my_input_recoreds,my_input_bytes } @override protected void reduce(text key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception { //方式2:使用枚枚举来定义计数器        context.getcounter(counter.my_input_recoreds).increment(1l);       context.write(key, nullwritable.get()); }}

排序(包含序列化)

  • 序列化 (serialization) 是指把结构化对象转化为字节流
  • 反序列化 (deserialization) 是序列化的逆过程. 把字节流转为结构化对象. 当要在进程间传 
    递对象或持久化对象的时候, 就需要序列化对象成字节流, 反之当要将接收到或从磁盘读取 
    的字节流转换为对象, 就要进行反序列化
  • java 的序列化 (serializable) 是一个重量级序列化框架, 一个对象被序列化后, 会附带很多额 
    外的信息 (各种校验信息, header, 继承体系等), 不便于在网络中高效传输. 所以, hadoop 
    自己开发了一套序列化机制(writable), 精简高效. 不用像 java 对象类一样传输多层的父子 
    关系, 需要哪个属性就传输哪个属性值, 大大的减少网络传输的开销
  • writable 是 hadoop 的序列化格式, hadoop 定义了这样一个 writable 接口. 一个类要支持可 
    序列化只需实现这个接口即可
  • 另外 writable 有一个子接口是 writablecomparable, writablecomparable 是既可实现序列 
    化, 也可以对key进行比较, 我们这里可以通过自定义 key 实现 writablecomparable 来实现 
    我们的排序功能

public class sortbean implements writablecomparable<sortbean>{ private string word; private int  num; public string getword() { return word; } public void setword(string word) { this.word = word; } public int getnum() { return num; } public void setnum(int num) { this.num = num; } @override public string tostring() { return   word + "\t"+ num ; } //实现比较器,指定排序的规则 /*      规则:        第一列(word)按照字典顺序进行排列    //  aac   aad        第一列相同的时候, 第二列(num)按照升序进行排列     */ @override public int compareto(sortbean sortbean) { //先对第一列排序: word排序 int result = this.word.compareto(sortbean.word); //如果第一列相同,则按照第二列进行排序 if(result == 0){ return this.num - sortbean.num; } return result; } //实现序列化 @override public void write(dataoutput out) throws ioexception {        out.writeutf(word);        out.writeint(num); } //实现反序列 @override public void readfields(datainput in) throws ioexception { this.word = in.readutf(); this.num = in.readint(); }}
public class sortmapper extends mapper<longwritable,text,sortbean,nullwritable> { /*      map方法将k1和v1转为k2和v2:      k1            v1      0            a  3      5            b  7      ----------------------      k2                         v2      sortbean(a  3)         nullwritable      sortbean(b  7)         nullwritable     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:将行文本数据(v1)拆分,并将数据封装到sortbean对象,就可以得到k2 string[] split = value.tostring().split("\t"); sortbean sortbean = new sortbean();        sortbean.setword(split[0]);        sortbean.setnum(integer.parseint(split[1])); //2:将k2和v2写入上下文中        context.write(sortbean, nullwritable.get()); }}
public class sortreducer extends reducer<sortbean,nullwritable,sortbean,nullwritable> { //reduce方法将新的k2和v2转为k3和v3 @override protected void reduce(sortbean key, iterable<nullwritable> values, context context) throws ioexception, interruptedexception {       context.write(key, nullwritable.get()); }}

job略


规约combiner

在三大阶段的第一阶段map处理完后,可能数据过多,利用分布式思想,抢在reduce前先做一次合并,后再由reduce合并,目的是:提高网络io 性能

实现步骤

 //第三(分区),四 (排序) //第五步: 规约(combiner)      job.setcombinerclass(mycombiner.class); //第六步 分布


案例:流量统计(key相同则++++++++)

public class flowbean implements writable { private integer upflow; //上行数据包数 private integer downflow; //下行数据包数 private integer upcountflow; //上行流量总和 private integer downcountflow;//下行流量总和 //下略get   set   序列化  反序列化
public class flowcountmapper extends mapper<longwritable,text,text,flowbean> { /*      将k1和v1转为k2和v2:      k1              v1      0               1363157985059     13600217502    00-1f-64-e2-e8-b1:cmcc    120.196.100.55    www.baidu.com    综合门户    19    128    1177    16852    200     ------------------------------      k2              v2      13600217502     flowbean(19    128    1177    16852)     */ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { //1:拆分行文本数据,得到手机号--->k2 string[] split = value.tostring().split("\t"); string phonenum = split[1]; //2:创建flowbean对象,并从行文本数据拆分出流量的四个四段,并将四个流量字段的值赋给flowbean对象 flowbean flowbean = new flowbean();        flowbean.setupflow(integer.parseint(split[6]));        flowbean.setdownflow(integer.parseint(split[7]));        flowbean.setupcountflow(integer.parseint(split[8]));        flowbean.setdowncountflow(integer.parseint(split[9])); //3:将k2和v2写入上下文中        context.write(new text(phonenum), flowbean); }}
public class flowcountreducer extends reducer<text,flowbean,text,flowbean> { @override protected void reduce(text key, iterable<flowbean> values, context context) throws ioexception, interruptedexception { //1:遍历集合,并将集合中的对应的四个字段累计 integer upflow = 0; //上行数据包数 integer downflow = 0; //下行数据包数 integer upcountflow = 0; //上行流量总和 integer downcountflow = 0;//下行流量总和 for (flowbean value : values) {            upflow += value.getupflow();            downflow += value.getdownflow();            upcountflow += value.getupcountflow();            downcountflow += value.getdowncountflow(); } //2:创建flowbean对象,并给对象赋值  v3 flowbean flowbean = new flowbean();        flowbean.setupflow(upflow);        flowbean.setdownflow(downflow);        flowbean.setupcountflow(upcountflow);        flowbean.setdowncountflow(downcountflow); //3:将k3和v3下入上下文中        context.write(key, flowbean); }}
public class jobmain extends configured implements tool { //该方法用于指定一个job任务 @override public int run(string[] args) throws exception { //1:创建一个job任务对象 job job = job.getinstance(super.getconf(), "mapreduce_flowcount"); //如果打包运行出错,则需要加该配置        job.setjarbyclass(jobmain.class); //2:配置job任务对象(八个步骤) //第一步:指定文件的读取方式和读取路径        job.setinputformatclass(textinputformat.class); //textinputformat.addinputpath(job, new path("hdfs://node01:8020/wordcount")); textinputformat.addinputpath(job, new path("file:///d:\\input\\flowcount_input")); //第二步:指定map阶段的处理方式和数据类型         job.setmapperclass(flowcountmapper.class); //设置map阶段k2的类型          job.setmapoutputkeyclass(text.class); //设置map阶段v2的类型          job.setmapoutputvalueclass(flowbean.class); //第三(分区),四 (排序) //第五步: 规约(combiner) //第六步 分组 //第七步:指定reduce阶段的处理方式和数据类型          job.setreducerclass(flowcountreducer.class); //设置k3的类型           job.setoutputkeyclass(text.class); //设置v3的类型           job.setoutputvalueclass(flowbean.class); //第八步: 设置输出类型           job.setoutputformatclass(textoutputformat.class); //设置输出的路径 textoutputformat.setoutputpath(job, new path("file:///d:\\out\\flowcount_out")); //等待任务结束 boolean bl = job.waitforcompletion(true); return bl ? 0:1; } public static void main(string[] args) throws exception { configuration configuration = new configuration(); //启动job任务 int run = toolrunner.run(configuration, new jobmain(), args); system.exit(run); }}

如增加需求:

上行流量倒序排序

public class flowbean implements writablecomparable<flowbean> { //指定排序的规则 @override public int compareto(flowbean flowbean) { // return this.upflow.compareto(flowbean.getupflow()) * -1; return flowbean.upflow - this.upflow ; }}

需求:手机号码分区

public class flowcountpartition extends partitioner<text,flowbean> { /*      该方法用来指定分区的规则:        135 开头数据到一个分区文件        136 开头数据到一个分区文件        137 开头数据到一个分区文件        其他分区       参数:         text : k2   手机号         flowbean: v2         i   : reducetask的个数     */ @override public int getpartition(text text, flowbean flowbean, int i) { //1:获取手机号 string phonenum = text.tostring(); //2:判断手机号以什么开头,返回对应的分区编号(0-3) if(phonenum.startswith("135")){ return 0; }else if(phonenum.startswith("136")){ return 1; }else if(phonenum.startswith("137")){ return 2; }else{ return 3; } }}
 //第三(分区),四 (排序)            job.setpartitionerclass(flowcountpartition.class); //第五步: 规约(combiner) //第六步 分组 //设置reduce个数            job.setnumreducetasks(4);

《hadoop(二MapReduce).doc》

下载本文的Word格式文档,以方便收藏与打印。