本文共 6964 字,大约阅读时间需要 23 分钟。
Reducejoin与Mapjoin相同,不是一种具体的类接口等需要实现的东西,而是一种思想,根据mapreduce的工作流程与原理,来间接完成自己的工作,看下面的案例实操。
Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在Map阶段已经打标志)分开,最后进行合并就ok了。
reducejoin的缺点在于将工作量全部分给了reduce端,容易造成资源浪费,即数据倾斜,可以通过在map端进行数据合并来缓解这种情况。
将订单数据表与产品表进行数据合并。
订单数据表
id | pid | amount |
---|---|---|
1001 | 01 | 1 |
1002 | 02 | 2 |
1003 | 03 | 3 |
1004 | 01 | 4 |
1005 | 02 | 5 |
1006 | 03 | 6 |
产品表
pid | pname |
---|---|
01 | 小米 |
02 | 华为 |
03 | 格力 |
预期,根据pid合并两个表
id | pname | amount |
---|---|---|
1001 | 小米 | 1 |
1004 | 小米 | 4 |
1002 | 华为 | 2 |
1005 | 华为 | 5 |
1003 | 格力 | 3 |
1006 | 格力 | 6 |
通过将关联条件作为Map输出的key,将两表满足Join条件的数据并携带数据所来源的文件信息,发往同一个ReduceTask,在Reduce中进行数据的串联
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nQUwAfm5-1605151583237)(https://s3.ax1x.com/2020/11/12/BxSbRI.png)]
Mapper
package com.Rejoin.mr;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class RejoinMapper extends Mapper{ String name; @Override protected void setup(Context context) throws IOException, InterruptedException { FileSplit inputSplit = (FileSplit) context.getInputSplit(); name = inputSplit.getPath().getName(); } Text k = new Text(); TableBean v = new TableBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //pid pname //01 小米 //id pid amount //1001 01 1 String line = value.toString(); if (name.startsWith("order")) { String[] fields = line.split("\t"); v.setId(fields[0]); v.setPid(fields[1]); v.setCnt(Integer.parseInt(fields[2])); v.setPname(""); v.setTar("order"); k.set(fields[1]); } else { String[] fields = line.split("\t"); v.setPid(fields[0]); v.setPname(fields[1]); v.setTar("pd"); v.setId(""); v.setCnt(0); k.set(fields[0]); } context.write(k,v); }}
Reducer
package com.Rejoin.mr;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;public class RejoinReducer extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { ArrayList lists = new ArrayList<>(); //保存所有order TableBean tmpbean = new TableBean(); //主要存储标记位 for (TableBean value: values) { if ("order".equals(value.getTar())){ TableBean tmp = new TableBean(); try { BeanUtils.copyProperties(tmp,value); lists.add(tmp); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } else { //TableBean tmp = new TableBean(); try { BeanUtils.copyProperties(tmpbean,value); } catch (IllegalAccessException e) { e.printStackTrace(); } catch (InvocationTargetException e) { e.printStackTrace(); } } } for (TableBean tmp: lists) { tmp.setPname(tmpbean.getPname()); context.write(tmp,NullWritable.get()); } }}
TableBean
package com.Rejoin.mr;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class TableBean implements Writable { //id pid cnt//id pname private String id; private String pid; private int cnt; private String tar; private String pname; public TableBean() { super(); } public TableBean(String id, String pid, int cnt, String tar, String pname) { this.id = id; this.pid = pid; this.cnt = cnt; this.tar = tar; this.pname = pname; } @Override public void write(DataOutput dataOutput) throws IOException { dataOutput.writeUTF(id); dataOutput.writeUTF(pid); dataOutput.writeInt(cnt); dataOutput.writeUTF(tar); dataOutput.writeUTF(pname); } @Override public void readFields(DataInput dataInput) throws IOException { id = dataInput.readUTF(); pid = dataInput.readUTF(); cnt = dataInput.readInt(); tar = dataInput.readUTF(); pname = dataInput.readUTF(); } public String getId() { return id; } public String getPid() { return pid; } public int getCnt() { return cnt; } public String getTar() { return tar; } public String getPname() { return pname; } public void setId(String id) { this.id = id; } public void setPid(String pid) { this.pid = pid; } public void setCnt(int cnt) { this.cnt = cnt; } public void setTar(String tar) { this.tar = tar; } public void setPname(String pname) { this.pname = pname; } @Override public String toString() { return "id='" + id + '\t' + "pname='" + pname + '\t' + "cnt=" + cnt; }}
Driver
package com.Rejoin.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class RejoinDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String[]{ "D:\\mapreduceinput\\input6","D:\\mapreduceoutput\\outputorder"}; Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setMapOutputKeyClass(Text.class); //job.setOutputValueClass(TableBean.class); job.setMapOutputValueClass(TableBean.class); job.setOutputKeyClass(TableBean.class); job.setOutputValueClass(NullWritable.class); job.setJarByClass(RejoinDriver.class); job.setMapperClass(RejoinMapper.class); job.setReducerClass(RejoinReducer.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1); }}
转载地址:http://licki.baihongyu.com/