博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapredcue-Redcuejoin
阅读量:3966 次
发布时间:2019-05-24

本文共 6964 字,大约阅读时间需要 23 分钟。

Reducejoin与Mapjoin相同,不是一种具体的类接口等需要实现的东西,而是一种思想,根据mapreduce的工作流程与原理,来间接完成自己的工作,看下面的案例实操。

join基本思路

​ Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。

​ Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。

reducejoin的缺点在于将工作量全部分给了reduce端,容易造成资源浪费,即数据倾斜,可以通过在map端进行数据合并来缓解这种情况。

reducejoin案例思路与实操

将订单数据表与产品表进行数据合并。

订单数据表

id pid amount
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6

产品表

pid pname
01 小米
02 华为
03 格力

预期,根据pid合并两个表

id pname amount
1001 小米 1
1004 小米 4
1002 华为 2
1005 华为 5
1003 格力 3
1006 格力 6

通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nQUwAfm5-1605151583237)(https://s3.ax1x.com/2020/11/12/BxSbRI.png)]

源码

Mapper

package com.Rejoin.mr;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class RejoinMapper extends Mapper
{
String name; @Override protected void setup(Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit(); name = inputSplit.getPath().getName(); } Text k = new Text(); TableBean v = new TableBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//pid pname //01 小米 //id pid amount //1001 01 1 String line = value.toString(); if (name.startsWith("order")) {
String[] fields = line.split("\t"); v.setId(fields[0]); v.setPid(fields[1]); v.setCnt(Integer.parseInt(fields[2])); v.setPname(""); v.setTar("order"); k.set(fields[1]); } else {
String[] fields = line.split("\t"); v.setPid(fields[0]); v.setPname(fields[1]); v.setTar("pd"); v.setId(""); v.setCnt(0); k.set(fields[0]); } context.write(k,v); }}

Reducer

package com.Rejoin.mr;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class RejoinReducer extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
ArrayList
lists = new ArrayList<>(); //保存所有order TableBean tmpbean = new TableBean(); //主要存储标记位 for (TableBean value: values) {
if ("order".equals(value.getTar())){
TableBean tmp = new TableBean(); try {
BeanUtils.copyProperties(tmp,value); lists.add(tmp); } catch (IllegalAccessException e) {
e.printStackTrace(); } catch (InvocationTargetException e) {
e.printStackTrace(); } } else {
//TableBean tmp = new TableBean(); try {
BeanUtils.copyProperties(tmpbean,value); } catch (IllegalAccessException e) {
e.printStackTrace(); } catch (InvocationTargetException e) {
e.printStackTrace(); } } } for (TableBean tmp: lists) {
tmp.setPname(tmpbean.getPname()); context.write(tmp,NullWritable.get()); } }}

TableBean

package com.Rejoin.mr;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class TableBean implements Writable {
//id pid cnt//id pname private String id; private String pid; private int cnt; private String tar; private String pname; public TableBean() {
super(); } public TableBean(String id, String pid, int cnt, String tar, String pname) {
this.id = id; this.pid = pid; this.cnt = cnt; this.tar = tar; this.pname = pname; } @Override public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id); dataOutput.writeUTF(pid); dataOutput.writeInt(cnt); dataOutput.writeUTF(tar); dataOutput.writeUTF(pname); } @Override public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readUTF(); pid = dataInput.readUTF(); cnt = dataInput.readInt(); tar = dataInput.readUTF(); pname = dataInput.readUTF(); } public String getId() {
return id; } public String getPid() {
return pid; } public int getCnt() {
return cnt; } public String getTar() {
return tar; } public String getPname() {
return pname; } public void setId(String id) {
this.id = id; } public void setPid(String pid) {
this.pid = pid; } public void setCnt(int cnt) {
this.cnt = cnt; } public void setTar(String tar) {
this.tar = tar; } public void setPname(String pname) {
this.pname = pname; } @Override public String toString() {
return "id='" + id + '\t' + "pname='" + pname + '\t' + "cnt=" + cnt; }}

Driver

package com.Rejoin.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RejoinDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{
"D:\\mapreduceinput\\input6","D:\\mapreduceoutput\\outputorder"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapOutputKeyClass(Text.class); //job.setOutputValueClass(TableBean.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(RejoinDriver.class); job.setMapperClass(RejoinMapper.class); job.setReducerClass(RejoinReducer.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}

转载地址:http://licki.baihongyu.com/

你可能感兴趣的文章
杂项设备实现原理
查看>>
stat.h头文件,轻松获取文件属性(2…
查看>>
stat.h头文件,轻松获取文件属性
查看>>
stat.h头文件,轻松获取文件属性
查看>>
fcntl.h和unistd.h
查看>>
fcntl.h和unistd.h
查看>>
Printk在终端显示
查看>>
Printk在终端显示
查看>>
嵌入式Linux之我行——S3C2440上触摸…
查看>>
嵌入式Linux之我行——S3C2440上触摸…
查看>>
Linux环境进程间通信(二):&nbsp;信号…
查看>>
Linux环境进程间通信(二):&nbsp;信号…
查看>>
Linux环境进程间通信(二):&nbsp;信号…
查看>>
Linux环境进程间通信(二):&nbsp;信号…
查看>>
wait和waitpid函数
查看>>
wait和waitpid函数
查看>>
fcntl&nbsp;函数
查看>>
fcntl&nbsp;函数
查看>>
Linux&nbsp;系统内核的调试
查看>>
Linux&nbsp;系统内核的调试
查看>>