博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用MapReduce实现join操作
阅读量:4159 次
发布时间:2019-05-26

本文共 11492 字,大约阅读时间需要 38 分钟。

文章目录

一.概述

熟悉SQL的读者都知道,使用SQL语法实现join是很简单的,只需要一条SQL语句即可,但是在大数据场景下使用MapReduce编程模型实现join还是比较繁琐的在实际生产中我们可以借助Hive,Spark SQL 等框架来实现join,但是对于join的实现原理我们需要掌握,这对于理解join的底层实现很有帮助,本文介绍如何使用MapReduce API 来实现join

二.需求

实现如下SQL的功能: select c.customer_id,c.customer_name,o.orderId,o.order_status from customer c join order o on c.customer_id=o.customer_id

文件链接: https://pan.baidu.com/s/1GziR0W7pNwk26lHf-ZZ8NA 提取码: 2piw

三.map+reduce实现join

map

  • 判断字段个数如果是4个字段就是order表,9个字段就是customer表
  • ( customer_id,(customer_id,customer_name,orderId,order_status,flag))

reduce

对同一个customer_id的key进行处理,将value值进行拼接
代码实现
-1)先写所需字段的实体类如下:

package hadoop.mapreduce.reducejoin;import org.apache.hadoop.io.Writable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;/** * @author sunyong * @date 2020/07/02 * @description */public class CustomerOrders implements Writable {
//-( customer_id,(customer_id,customer_name,orderId,order_status,flag)) private String customer_id;//客户id private String customer_name;//客户名 private String orderId;//订单id private String order_status;//订单状态 private String flag;//标志位(是map识别文件的标志) public CustomerOrders() {
} public CustomerOrders(String customer_id, String customer_name, String orderId, String order_status, String flag) {
this.customer_id = customer_id; this.customer_name = customer_name; this.orderId = orderId; this.order_status = order_status; this.flag = flag; } //序列化 @Override public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(customer_id); dataOutput.writeUTF(customer_name); dataOutput.writeUTF(orderId); dataOutput.writeUTF(order_status); dataOutput.writeUTF(flag); } //反序列化(顺序要一致) @Override public void readFields(DataInput dataInput) throws IOException {
this.customer_id=dataInput.readUTF(); this.customer_name=dataInput.readUTF(); this.orderId=dataInput.readUTF(); this.order_status=dataInput.readUTF(); this.flag=dataInput.readUTF(); } public String getCustomer_id() {
return customer_id; } public void setCustomer_id(String customer_id) {
this.customer_id = customer_id; } public String getCustomer_name() {
return customer_name; } public void setCustomer_name(String customer_name) {
this.customer_name = customer_name; } public String getOrderId() {
return orderId; } public void setOrderId(String orderId) {
this.orderId = orderId; } public String getOrder_status() {
return order_status; } public void setOrder_status(String order_status) {
this.order_status = order_status; } public String getFlag() {
return flag; } public void setFlag(String flag) {
this.flag = flag; } @Override public String toString() {
return customer_id + ',' + customer_name + ',' + orderId + ',' + order_status ; }}
  • 2)编写map类,如下:
package hadoop.mapreduce.reducejoin;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * @author sunyong * @date 2020/07/02 * @description */public class CustomerOrderMapper extends Mapper
{
CustomerOrders v = new CustomerOrders(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//将字段进行切割,返回字段数值 String[] fields = value.toString().split(","); //进行判断,4字段是订单表,否则就是顾客表 if(fields.length==4){
//订单表中可赋值的字段进行赋值 v.setCustomer_id(fields[2]); v.setCustomer_name(""); v.setOrderId(fields[0]); v.setOrder_status(fields[3]); v.setFlag("1"); }else{
//顾客表中可赋值的字段进行赋值 v.setCustomer_id(fields[0]); v.setOrderId(""); v.setOrder_status(""); v.setCustomer_name(fields[1]); v.setFlag("0"); } //从Map端写出 context.write(new Text(v.getCustomer_id()),v); }}
  • 3)编写reduce类,如下:
package hadoop.mapreduce.reducejoin;import org.apache.commons.beanutils.BeanUtils;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.lang.reflect.InvocationTargetException;import java.util.ArrayList;/** * @author sunyong * @date 2020/07/02 * @description */public class CustomerOrderReducer extends Reducer
{
@Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException {
//1.准备订单记录集合(因为一个顾客id可对应多个订单数据,所以需要一个集合存放) ArrayList
ordeBeans = new ArrayList<>(); //准备顾客bean对象(因为顾客id只对应一个顾客,同一个id应该只有一个顾客对象,需要一个对象来存放) CustomerOrders cusBean = new CustomerOrders(); //2.遍历map端输出内容将数据放入到集合中,准备合并bean对象 for (CustomerOrders bean : values) {
if(bean.getFlag().equals("1")){
//订单表 CustomerOrders orderBean = new CustomerOrders(); try {
BeanUtils.copyProperties(orderBean,bean); } catch (IllegalAccessException e) {
e.printStackTrace(); } catch (InvocationTargetException e) {
e.printStackTrace(); } ordeBeans.add(orderBean); }else {
//顾客表 try {
BeanUtils.copyProperties(cusBean,bean); } catch (IllegalAccessException e) {
e.printStackTrace(); } catch (InvocationTargetException e) {
e.printStackTrace(); } } } //3.遍历集合,进行空白字段拼接 for (CustomerOrders bean : ordeBeans) {
//将顾客对象的姓名取出来填充到list中的对象中去 bean.setCustomer_name(cusBean.getCustomer_name()); //4.调用写出方法 context.write(bean,NullWritable.get()); } }}
  • 4)编写Driver类运行,如下:
package hadoop.mapreduce.reducejoin;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/** * @author sunyong * @date 2020/07/01 * @description */public class CustomerOrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//1.创建配置文件,创建Job Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"sqlJoin"); //2.设置jar的位置 job.setJarByClass(CustomerOrderDriver.class); //3.设置map和reduce的位置 job.setMapperClass(CustomerOrderMapper.class); job.setReducerClass(CustomerOrderReducer.class); //4.设置map输出端的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomerOrders.class); //5.设置reduce输出的key,value类型 job.setOutputKeyClass(CustomerOrders.class); job.setOutputValueClass(NullWritable.class); //6.设置输出路径 FileInputFormat.setInputPaths(job,new Path("F:\\sunyong\\Java\\codes\\javaToHdfs\\join")); FileOutputFormat.setOutputPath(job,new Path("joinOut")); //7.提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result?0:1); }}
  • 5)执行之后去查看文件,如下:
    在这里插入图片描述

四.MapReduce Map端 join实现原理(没有reduce处理)

实际把一个表缓存到内存里(小表),可以使用HashMap缓存,再遍历另一个表,通过key到HashMap中进行取值

客户表:一个客户一个记录-->小表
订单表:一个客户可有多个订单

  • 1.编写实体类,如上:
  • 2.编写map类.这里不同
package hadoop.mapreduce.join;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.net.URI;import java.util.HashMap;/** * @author sunyong * @date 2020/07/02 * @description */public class MapJoinMapper extends Mapper
{
//hashmap存储顾客id顾客姓名 HashMap
customerMap = new HashMap<>(); //准备顾客订单对象 CustomerOrders customerOrders = new CustomerOrders(); //对顾客表操作 @Override protected void setup(Context context) throws IOException, InterruptedException {
//获取缓存文件的URI,这里只有一个文件 URI[] cacheFiles = context.getCacheFiles(); if(cacheFiles!=null && cacheFiles.length>0){
//获取文件路径,文件名 String fileName = cacheFiles[0].getPath().toString(); //缓冲流并设置utf8格式 BufferedReader bw = new BufferedReader(new InputStreamReader(new FileInputStream(fileName),"UTF-8")); String line; //读取文件将第一列和第二列作为map的键和值 while(StringUtils.isNotEmpty(line = bw.readLine())){
String[] split = line.split(","); customerMap.put(split[0],split[1]); } //关闭资源 bw.close(); } } //对订单表操作 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取第一行切割成字段 String[] fields = value.toString().split(","); //进行赋值 customerOrders.setCustomer_id(fields[2]); customerOrders.setOrderId(fields[0]); customerOrders.setOrder_status(fields[3]); //从HashMap获取姓名 customerOrders.setCustomer_name(customerMap.get(fields[2])); //写出一个个对象(map方法每个键都会执行) context.write(customerOrders,NullWritable.get()); }}
  • 3.编写Driver类执行:
package hadoop.mapreduce.join;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;import java.net.URI;import java.net.URISyntaxException;/** * @author sunyong * @date 2020/07/01 * @description */public class CustomerOrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
//1.创建配置文件,创建Job Configuration conf = new Configuration(); Job job = Job.getInstance(conf,"mapJoin"); //2.设置jar的位置 job.setJarByClass(CustomerOrderDriver.class); //3.设置map和reduce的位置(这里不需要reduce) job.setMapperClass(MapJoinMapper.class); //设置reduce个数为0 job.setNumReduceTasks(0); //4.设置map输出端的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(CustomerOrders.class); //5.设置reduce输出的key,value类型(这里不需要) //6.设置输出路径 //注意URI无法识别\\只能用///不然会报错,无法识别路径 job.addCacheFile(new URI("file:///F:///sunyong///Java///codes///javaToHdfs///join///customers.csv"));//设置小表的缓存 FileInputFormat.setInputPaths(job,new Path("F:\\sunyong\\Java\\codes\\javaToHdfs\\join\\orders.csv")); FileOutputFormat.setOutputPath(job,new Path("mapOut")); //7.提交程序运行 boolean result = job.waitForCompletion(true); System.exit(result?0:1); }}
  • 4)执行后查看文件效果如下(是没有顺序的):
    在这里插入图片描述

转载地址:http://kcjxi.baihongyu.com/

你可能感兴趣的文章
HTML5学习之——概念篇
查看>>
HTML5学习之——HTML 5 视频
查看>>
HTML5学习之——HTML 5 Video + DOM
查看>>
HTML5学习之——HTML 5 音频
查看>>
HTML5学习之——HTML 5 拖放
查看>>
HTML5学习之——HTML 5 Canvas vs. SVG
查看>>
HTML5学习之——HTML 5 应用程序缓存
查看>>
HTML5学习之——HTML 5 Web Workers
查看>>
HTML5学习之——HTML 5 Canvas
查看>>
HTML5学习之——HTML5 内联 SVG
查看>>
HTML5学习之——HTML 5 服务器发送事件
查看>>
SVG学习之——HTML 页面中的 SVG
查看>>
SVG 形状学习之——SVG 矩形<rect>
查看>>
SVG 形状学习之——SVG圆形
查看>>
SVG 滤镜学习之——SVG 滤镜
查看>>
mysql中用命令行复制表结构的方法
查看>>
hbase shell出现ERROR: org.apache.hadoop.hbase.ipc.ServerNotRunningYetException
查看>>
让代码变得更优雅-Lombok
查看>>
解决Rhythmbox乱码
查看>>
豆瓣爱问共享资料插件发布啦
查看>>