Sample Join Analysis

2023-05-07,,

Sample data: student.txt

1,yaoshuya,25
2,yaoxiaohua,29
3,yaoyuanyie,15
4,yaoshupei,26

Sample data:score.txt

1,yuwen,100
1,shuxue,99
2,yuwen,99
2,shuxue,88
3,yuwen,99
3,shuxue,56
4,yuwen,33
4,shuxue,99

输出文件内容:

1    [yaoshuya,25,yuwen,100]
1    [yaoshuya,25,shuxue,99]
2    [yaoxiaohua,29,yuwen,99]
2    [yaoxiaohua,29,shuxue,88]
3    [yaoyuanyie,15,yuwen,99]
3    [yaoyuanyie,15,shuxue,56]
4    [yaoshupei,26,yuwen,33]
4    [yaoshupei,26,shuxue,99]

参数:

args= "-Dio.sort.mb=10

-r 1

-inFormat org.apache.hadoop.mapred.KeyValueTextInputFormat

-outFormat org.apache.hadoop.mapred.TextOutputFormat

-outKey org.apache.hadoop.io.Text

-outValue org.apache.hadoop.mapred.join.TupleWritable

hdfs://namenode:9000/user/hadoop/student/student.txt

hdfs://namenode:9000/user/hadoop/student/score2.txt

hdfs://namenode:9000/user/hadoop/joinout".split(" ");

需要注意的是我使用的输出格式是TextOutputFormat(完全是为了方便观察输出后的数据)

输出的valuetype是org.apache.hadoop.mapred.join.TupleWritable ,这个 类型非常方便,类似于数组类型,可以接受多值。

在源码中添加的一句代码,是用来配置我的数据源文件的keyvalue分隔符是,(comma).

jobConf.set("key.value.separator.in.input.line", ",");

关键代码简析:

job.setInputFormatClass(CompositeInputFormat.class);
job.getConfiguration().set(CompositeInputFormat.JOIN_EXPR,
      CompositeInputFormat.compose(op, inputFormatClass,
      plist.toArray(new Path[0])));

使用CompositeInputFormat来进行join操作。此类的说明:

/**
* An InputFormat capable of performing joins over a set of data sources sorted
* and partitioned the same way.
*
* A user may define new join types by setting the property
* <tt>mapreduce.join.define.&lt;ident&gt;</tt> to a classname.
* In the expression <tt>mapreduce.join.expr</tt>, the identifier will be
* assumed to be a ComposableRecordReader.
* <tt>mapreduce.join.keycomparator</tt> can be a classname used to compare
* keys in the join.
* @see #setFormat
* @see JoinRecordReader
* @see MultiFilterRecordReader
*/

通过op来指定连接类型:inner,outer,tbl等,有其他需要也可以实现。

具体是怎么连接的呢?根据两个source进入mapper的key进行归并连接。所以要求数据源是根据key值有序的。此连接是在map端实现的。

测试中我使用KeyValueTextInputFormat来处理,其默认格式是key\tValue,所以我使用了上面的代码来进行重置这个格式。但如果你的文件不是key放在第一个位置,你就需要自己写FileInputFormat啦。

但明显需要你要处理的数据源都是使用同样的FileInputFormat去读取。

还有一点,这里支持多文件连接,示例中我只使用了两个示例文件,可以添加更多的文件,路径添加到outputdir之前即可。

Sample Join Analysis的相关教程结束。

《Sample Join Analysis.doc》

下载本文的Word格式文档,以方便收藏与打印。