java。io。IOE org。apache。hadoop。conf。C org。apache。hadoop。conf。C org。apache。hadoop。fs。FileS org。apache。hadoop。fs。P org。apache。hadoop。io。LongW org。apache。hadoop。io。T org。apache。hadoop。mapreduce。J org。apache。hadoop。mapreduce。M org。apache。hadoop。mapreduce。R org。apache。hadoop。mapreduce。lib。input。FileInputF org。apache。hadoop。mapreduce。lib。output。FileOutputF org。apache。hadoop。util。T org。apache。hadoop。util。ToolR WordCCTool{ 对文本文件进行Wordcount,文本文件的输入类型是TextInputF,它实现了createRecordReader, 返回创建的LineRecordR实现类,这个类里就有对应的key和value的类型 文本文件 KEYIN:行字节偏移量 VALUEIN:一行数据 mapper的输入类型是由业务需求来自行确定类型,跟框架没关系,因为我们的需求是按照单词统计数量 key:单词,String类型的封装类Text value:数值,Long类型的封装类LongWritable WordCountMMapper{ ZZ map(),一行调用一次 Override map(LongWkey,Tvalue,Ccontext) IOException,InterruptedE{ Svalue。toString(); System。out。println(map():keyIn:key。get();valueIn:value。toString()); String〔〕line。split(); for(S:splits){ keyOut。set(word); map()输出数据,用context。write() context。write(keyOut,valueOut); System。out。println(map():keyOut:keyOut。toString();valueOut:valueOut。get()); } } } KEYIN,VALUEIN:根据map输出的类型来确定 KEYOUT,VALUEOUT:根据业务需求确定 KEYOUT是单词,S类型的封装类Text VALUEOUT数值,Long类型的封装类LongWritable WordCountRReducer{ LongWvalueOLongWritable(); 一个key调用一次 Override reduce(Tkey,Ivalues,Ccontext)IOException,InterruptedE{ StringBStringBuilder(); sb。append(reduce():keyIn:key。toString();vlaueIn:〔); 0; for(LongW:values){ 通过get(),获取LongW对象的实际值 w。get(); sb。append(num)。append(,); } sb。deleteCharAt(sb。length()1); sb。append(〕); System。out。println(sb。toString()); valueOut。set(sum); context。write(key,valueOut); } } Override run(String〔〕args)E{ 创建及配置,提交任务 CgetConf(); 创建job对象 JJob。getInstance(conf,); 任务运行类 job。setJarByClass(WordCount。class); 任务map运行类 job。setMapperClass(WordCountMapper。class); 任务运行类 job。setReducerClass(WordCountReducer。class); 任务map阶段输出的key的类型 job。setMapOutputKeyClass(Text。class); 任务map阶段输出的value类型 job。setMapOutputValueClass(LongWritable。class); 任务reduce阶段(最后阶段)输出的key的类型 job。setOutputKeyClass(Text。class); 任务reduce阶段(最后阶段)输出的value的类型 job。setOutputValueClass(LongWritable。class); 设置reduce个数 job。setNumReduceTasks(2); 任务的输入目录 FileInputFormat。addInputPath(job,Path(args〔0〕)); PoutputPPath(args〔1〕); 任务的输出目录 FileOutputFormat。setOutputPath(job,outputPath); 解决自动删除输出目录 FileSFileSystem。get(conf); 判断文件系统下存不存在该目录,如果存在删除 if(fs。exists(outputPath)){ 递归删除 fs。delete(outputPath,true); System。out。println(dir:outputPath。toString()SUCCESS!); } 提交任务 waitForCompletion(false);false:代表不输出counter job。waitForCompletion(false); ?0:1; } main(String〔〕args)E{ 运行时将输入输出目录放到执行参数里,用main()的接收到 tmpoutput System。exit(ToolRunner。run(WordCount(),args)); } }