[grid@hadoop1 ~]$ hadoop fs -cat ./in/7/dept.txt 10 ACCOUNTING NEW YORK 20 RESEARCH DALLAS 30 SALES CHICAGO 40 OPERATIONS BOSTON[grid@hadoop1 ~]$ hadoop fs -cat ./in/7/emp.txt 7369 SMITH CLERK 7902 17-12月-80 800 20 7499 ALLEN SALESMAN 7698 20-2月 -81 1600 300 30 7521 WARD SALESMAN 7698 22-2月 -81 1250 500 30 7566 JONES MANAGER 7839 02-4月 -81 2975 20 7654 MARTIN SALESMAN 7698 28-9月 -81 1250 1400 30 7698 BLAKE MANAGER 7839 01-5月 -81 2850 30 7782 CLARK MANAGER 7839 09-6月 -81 2450 10 7839 KING PRESIDENT 17-11月-81 5000 10 7844 TURNER SALESMAN 7698 08-9月 -81 1500 0 30 7900 JAMES CLERK 7698 03-12月-81 950 30 7902 FORD ANALYST 7566 03-12月-81 3000 20 7934 MILLER CLERK 7782 23-1月 -82 1300 10
dept 文件第一列是部门编号,第二列是部门名称,第三列是部门所在城市
emp 文件第六列是员工工资,第八列是员工所在部门编号 要求输出:部门名称 员工数 平均工资 多表连接与单表自连接很类似,都是将关联列做 key ,另一列做 value 保存,并在 value 中添加额外信息来区分左右表 程序:import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class JoinMapper extends Mapper{ @Override public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (line.length() > 80) { //判断是左表还是右表 String deptno = line.substring(87, 89); String sal = line.substring(62, 67).trim(); context.write(new Text(deptno), new Text("A" + sal)); } else { String deptno = line.substring(8, 10); String dname = line.substring(11, 25).trim(); context.write(new Text(deptno), new Text("B" + dname)); } }}import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class JoinReducer extends Reducer { @Override public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int total = 0, count = 0; String dname = null; for (Text value : values) { if (value.toString().startsWith("A")) { total += Integer.parseInt(value.toString().substring(1)); count++; } else { dname = value.toString().substring(1); } } String avg = (count == 0 ? "--" : ("" + total / count)); context.write(new Text(dname), new Text(count + " " + avg)); }}import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.Job;public class Join { public static void main(String[] args) throws Exception { if (args.length != 3) { System.err.println("Usage: One
查看输出:
[grid@hadoop1 ~]$ hadoop fs -cat ./out/7/part-r-00000ACCOUNTING 3 2916RESEARCH 3 2258SALES 6 1566OPERATIONS 0 --