合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
1. 分析 适用于关联表中有小表的情形。 可以将小表分发到所有的 map 节点,这样,map 节点就可以在本地对自己所读到的大表数据进行合并并输出最终结果,可以大大提高合并操作的并发度,加快处理速度。 2. 实现代码: (1)先在 driver 中添加缓存文件 ```java package com.kgc.mapreduce.driver; import com.kgc.mapreduce.entry.CustomerOrders; import com.kgc.mapreduce.mapper.MapJoinMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.net.URI; public class MapJoinDriver { public static void main(String[] args) throws Exception { // 1 获取 job 信息 Configuration configuration = new Configuration(); Job job = Job.getInstance(configuration); // 2 设置加载 jar 包路径 job.setJarByClass(MapJoinDriver.class); // 3 关联 map job.setMapperClass(MapJoinMapper.class); // 4 设置最终输出数据类型 job.setOutputKeyClass(CustomerOrders.class); job.setOutputValueClass(NullWritable.class); // 5 设置输入输出路径 FileInputFormat.setInputPaths(job, new Path("file:///d:\\input")); FileOutputFormat.setOutputPath(job, new Path("file://d:\\output")); // 6 加载缓存数据 job.addCacheFile(new URI("file:///d:/customerinput/customers.csv")); // 7 map 端 join 的逻辑不需要 reduce 阶段,设置 reducetask 数量为 0 job.setNumReduceTasks(0); // 8 提交 boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); } } ``` (2)mapper 中读取缓存的文件数据 ```java package com.kgc.mapreduce.mapper; import com.kgc.mapreduce.entry.CustomerOrders; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.*; import java.net.URI; import java.util.HashMap; public class MapJoinMapper extends Mapper<LongWritable, Text, CustomerOrders, NullWritable> { HashMap<String, String> customerMap = new HashMap<>(); CustomerOrders customerOrders = new CustomerOrders(); @Override protected void setup(Context context) throws IOException, InterruptedException { // super.setup(context); URI[] cacheFiles = context.getCacheFiles(); if (null != cacheFiles && cacheFiles.length > 0) { String filename = cacheFiles[0].getPath().toString(); BufferedReader bufferedReader = new BufferedReader( new InputStreamReader(new FileInputStream(filename), "UTF-8")); String line; while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) { String[] split = line.split(","); customerMap.put(split[0], split[1]); } bufferedReader.close(); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 String line = value.toString(); // 2 截取 String[] fields = line.split(","); // 3 设置客户 id customerOrders.setCustomerId(fields[1]); customerOrders.setOrderId(fields[0]); customerOrders.setOrderStatus(fields[3]); // 4 获取客户名称 customerOrders.setCustomerName(customerMap.get(fields[2])); // 6 写出 context.write(customerOrders, NullWritable.get()); } } ```