hadoop编程(4)-MapReduce案例:求每一年的最高温度

云计算 waitig 428℃ 百度已收录 0评论

此例来自Hadoop权威指南。

下载数据

参考资料

从这里下载1901和1902年的数据

数据格式

数据说明:无需解压,数据的格式是一行一条记录

0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...

这些文件输入到map函数时,map得到的是键值对

(0, 0067011990999991950051507004…9999999N9+00001+99999999999…)
(106, 0043011990999991950051512004…9999999N9+00221+99999999999…)
(212, 0043011990999991950051518004…9999999N9-00111+99999999999…)
(318, 0043012650999991949032412004…0500001N9+01111+99999999999…)
(424, 0043012650999991949032418004…0500001N9+00781+99999999999…)

key是数据偏移量,value就是原文件一行的数据。

一行数据是一次温度采集,其内涵丰富,不同位上的数字含义不同,这个我们不用管,直接抄一段原书的代码下来就可以了。

public class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
  private static final int MISSING = 9999;
  @Override
  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString(); // 整行的数据
    String year = line.substring(15, 19); // 年份
    int airTemperature;  // 某次记录的温度
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
      airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
      airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93); // 质量代码
    //提取有效数据
    if (airTemperature != MISSING && quality.matches("[01459]")) {
      // context.write(new Text(year), new IntWritable(airTemperature));
      System.out.printf("年份%s,温度:%d\n",year,airTemperature);
    }
  }
}

打印效果如下:

年份1902,温度:-94
年份1902,温度:-100
年份1902,温度:-117
年份1902,温度:-161
年份1902,温度:-172
年份1902,温度:-178
……

也就是说,按这样去解析每一条记录,我们能准确得到有效的年份和当次记录的温度。

所有每年的最高温度

Mapper的数据来自切片,这个我们不必去管,现在把每个记录中的年份作为key,温度作为value输出即可。

取消上一段代码的这一行的注释:

context.write(new Text(year), new IntWritable(airTemperature));

Reducer

现在我们来设计Reducer。由于按key分区和归并,reduce函数得到的键值对是<年份,[温度集合]>。
我们只需求出温度集合中的最大值,然后把<年份,最高温度>作为输出即可。

public class MyReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
  @Override
  protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int max = Integer.MIN_VALUE;
    for(Iterator<IntWritable> iter=values.iterator();iter.hasNext();){
      int temp=iter.next().get();
      if (temp>max)
        max=temp;
    }
    context.write(key,new IntWritable(max));
  }
}

Driver

public class Driver extends Configured implements Tool {
  @Override
  public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    // 处理main的参数
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      // System.err.println("Usage: wordcount <in> [<in>...] <out>");
      ToolRunner.printGenericCommandUsage(System.err);
      System.exit(-1);
    }

    String jobName = "sort temperature";
    //job的各种设定
    Job job = Job.getInstance(conf, jobName);//new Job(conf, "word count");
    job.setJarByClass(getClass());

    job.setMapperClass(MyMapper.class);
    job.setCombinerClass(MyReducer.class);
    job.setReducerClass(MyReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));

    return job.waitForCompletion(true) ? 0 : 1;

  }

  public static void main(String[] args) throws Exception {
    final Driver driver = new Driver();
    Configuration conf = new Configuration();
    // 配置文件
    conf.addResource("hadoop-local.xml");
    driver.setConf(conf);

    //输入数据的位置,可替换成小数据集样本
    Path in = new Path("src/main/resources/weather");
    //输出数据的位置
    Path out = new Path("output");
    //删除输出目录,因为hadoop不会覆盖已有的目录,如果目录存在会报错
    FileSystem fs = FileSystem.get(conf);
    fs.delete(out, true);

    //运行任务
    int exitCode = ToolRunner.run(driver, new String[]{in.toString(), out.toString()});
    System.exit(exitCode);

  }
}

注意,我把下载的数据放到了"src/main/resources/weather"目录下。

最终生成的文件内的内容为:

1901    317
1902    244

结果所示,这份记录中1901年的最高气温为31.7°,1902年的最高气温为24.4°


本文由【waitig】发表在等英博客
本文固定链接:hadoop编程(4)-MapReduce案例:求每一年的最高温度
欢迎关注本站官方公众号,每日都有干货分享!
等英博客官方公众号
点赞 (0)分享 (0)