ControlledJob ctlJob1 = new ControlledJob(job.getConfiguration()); ControlledJob ctlJob2 = new ControlledJob(job.getConfiguration());
ctlJob1.setJob(job); ctlJob2.setJob(job1);
ctlJob2.addDependingJob(ctlJob1);
JobControl jobControl = new JobControl("SameFriends"); jobControl.addJob(ctlJob1); jobControl.addJob(ctlJob2);
Thread jobThread = new Thread(jobControl); jobThread.start();
// 每隔一段时间来判断一下该jc线程的任务是否执行完成 while (!jobControl.allFinished()){ Thread.sleep(500); }
jobControl.stop();
}
public static class SFMerge1Mapper1 extends Mapper<LongWritable,Text,Text,Text>{ private Text outKey = new Text(); private Text outValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().split(":"); outValue.set(split[0]); String[] friends = split[1].split(","); for (String str : friends) { outKey.set(str); context.write(outKey,outValue); } } }
/** * 第一次reducer输出结果: A F,I,O,K,G,D,C,H,B B E,J,F,A C B,E,K,A,H,G,F D H,C,G,F,E,A,K,L E A,B,L,G,M,F,D,H F C,M,L,A,D,G G M H O I O,C J O K O,B L D,E M E,F O A,H,I,J,F */ public static class SFMerge1Reducer1 extends Reducer<Text,Text,Text,Text>{ private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text str : values) { if( sb.length()!= 0 ){ sb.append(","); } sb.append(str); } outValue.set(sb.toString()); context.write(key,outValue); } }
public static class SFMerge1Mapper2 extends Mapper<LongWritable,Text,Text,Text>{ private Text outValue = new Text(); private Text outKey = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] splits = value.toString().split("\t"); String[] strings = splits[1].split(","); outValue.set(splits[0]); Arrays.sort(strings); for (int i = 0; i < strings.length - 1; i++) { for (int j = i+1; j < strings.length; j++) { outKey.set(strings[i]+"-"+strings[j]); context.write(outKey,outValue); } } } }
/** * 第二次reducer输出结果: A-B E,C A-C D,F A-D E,F A-E B,C,D A-F C,E,O,D,B A-G E,F,C,D A-H C,D,E,O A-I O A-J O,B A-K C,D A-L F,D,E A-M F,E B-C A B-D A,E B-E C B-F C,A,E B-G E,C,A B-H E,C,A B-I A B-K A,C B-L E B-M E B-O A,K C-D A,F C-E D C-F A,D C-G A,D,F C-H D,A C-I A C-K A,D C-L D,F C-M F C-O I,A D-E L D-F A,E D-G E,A,F D-H A,E D-I A D-K A D-L E,F D-M F,E D-O A E-F D,M,C,B E-G C,D E-H C,D E-J B E-K C,D E-L D F-G D,C,A,E F-H A,D,O,E,C F-I O,A F-J B,O F-K D,C,A F-L E,D F-M E F-O A G-H D,C,E,A G-I A G-K D,A,C G-L D,F,E G-M E,F G-O A H-I O,A H-J O H-K A,C,D H-L D,E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E,F
*/ public static class SFMerge1Reducer2 extends Reducer<Text,Text,Text,Text>{ private Text outValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder(); for (Text str : values) { if(sb.length()!= 0){ sb.append(","); } sb.append(str); outValue.set(sb.toString()); } context.write(key,outValue); } } }