歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> MapReduce編程實戰

MapReduce編程實戰

日期:2017/3/1 9:45:20   编辑:Linux編程

MapReduce是什麼

MapReduce是Hadoop(這種大數據處理生態環境)的編程模型。既然稱為模型,則意味著它有固定的形式。

MapReduce編程模型,就是Hadoop生態環境進行數據分析處理的固定的編程形式。

這種固定的編程形式描述如下:

MapReduce任務過程被分為兩個階段:map階段和reduce階段。每個階段都以鍵/值對作為輸入和輸出,並由程序員選擇他們的類型。

也就是說,程序員只需要定義兩個函數:map函數和reduce函數就好了,其他的計算過程交給hadoop就好了。

通過以上描述,我們可以看出:

MapReduce所能處理的場景實際是非常具體的,非常有限的,只是“數據的統計分析”場景。

輸入數據准備

天氣預報官方網址:ftp://ftp.ncdc.noaa.gov/pub/data/gsod/

但是,發現這個官方網址的文件格式和《Hadoop權威指南》( http://www.linuxidc.com/Linux/2012-07/65972.htm )所用的格式不一致,不知道是時間久了,官網的格式變了,還是作者對原始格式進行過處理,亦或這個網址根本不對,所以繼而又到《Hadoop權威指南》指定的地址下載了一個,地址如下:

https://github.com/tomwhite/hadoop-book/tree/master/input/ncdc/all

如果簡單測試,也可以把下面這幾行粘貼到一個文本文件也行,這就是正確的天氣文件:

0035029070999991902010113004+64333+023450FM-12+000599999V0201401N011819999999N0000001N9-01001+99999100311ADDGF104991999999999999999999MW1381
0035029070999991902010120004+64333+023450FM-12+000599999V0201401N013919999999N0000001N9-01171+99999100121ADDGF108991999999999999999999MW1381
0035029070999991902010206004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01611+99999100121ADDGF108991999999999999999999MW1381
0029029070999991902010213004+64333+023450FM-12+000599999V0200901N011819999999N0000001N9-01721+99999100121ADDGF108991999999999999999999
0029029070999991902010220004+64333+023450FM-12+000599999V0200901N009819999999N0000001N9-01781+99999100421ADDGF108991999999999999999999

本文中,我們把存儲天氣格式的文本文件命名為:temperature.txt

MapReduce Java編程

有兩套JavaAPI,舊的是org.apache.hadoop.mapred包,MapReduce編程是使用實現接口的方式;新的是org.apache.hadoop.marreduce包,MapReduce編程是使用繼承抽象基類的方式;其實都差不多,下面都會有顯示。

Maven

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.4</version>
</dependency>

也可以不用官方的,用別人修改重新編譯過的,可以直接在Eclipse裡面像運行普通Java程序一樣運行MapReduce。

編譯過的hadoop-core-1.0.4.jar,可以在本地模擬MapReduce

如果Eclipse workspace在d:,則我們可以把d:的某個目錄,比如d:\input作為輸入目錄;d:\output作為輸出目錄。

MapReduce編程模型裡面這樣寫就可以了:

FileInputFormat.setInputPaths(job, new Path("/input"));

FileOutputFormat.setOutputPath(job, new Path("/output"));

下載地址:

免費下載地址在 http://linux.linuxidc.com/

用戶名與密碼都是www.linuxidc.com

具體下載目錄在 /2014年資料/4月/16日/MapReduce編程實戰

下載方法見 http://www.linuxidc.com/Linux/2013-07/87684.htm

----------------------------------------------------------------------------

或者:

------------------------------------------分割線------------------------------------------

FTP地址:ftp://ftp1.linuxidc.com

用戶名:ftp1.linuxidc.com

密碼:www.linuxidc.com

在 2014年LinuxIDC.com\4月\MapReduce編程實戰

下載方法見 http://www.linuxidc.com/Linux/2013-10/91140.htm

------------------------------------------分割線------------------------------------------

下載後,直接覆蓋maven資源庫位置的文件即可。

接口方式

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;

public class MaxTemperature {

public static void main(String[] args) throws Exception {
JobConf conf = new JobConf(MaxTemperature.class);
conf.setJobName("Max Temperature");

// FileInputFormat.addInputPaths(conf, new Path(args[0]));
// FileOutputFormat.setOutputPath(conf, new Path(args[1]));

FileInputFormat.setInputPaths(conf, new Path("/hadooptemp/input/2"));
FileOutputFormat.setOutputPath(conf, new Path("/hadooptemp/output"));

conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReduce.class);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);
}
}

class MaxTemperatureMapper extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
output.collect(new Text(year), new IntWritable(airTemperature));
}
}
}

class MaxTemperatureReduce extends MapReduceBase implements
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
output.collect(key, new IntWritable(maxValue));

}
}

抽象類方式

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class NewMaxTemperature {

public static void main(String[] args) throws Exception {

Job job = new Job();
job.setJarByClass(NewMaxTemperature.class);

// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));

FileInputFormat.setInputPaths(job, new Path("/hadooptemp/input/2"));
FileOutputFormat.setOutputPath(job, new Path("/hadooptemp/output"));

job.setMapperClass(NewMaxTemperatureMapper.class);
job.setReducerClass(NewMaxTemperatureReduce.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

class NewMaxTemperatureMapper extends
Mapper<LongWritable, Text, Text, IntWritable> {
private static final int MISSING = 9999;

public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') {
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}

class NewMaxTemperatureReduce extends
Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterator<IntWritable> values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
while (values.hasNext()) {
maxValue = Math.max(maxValue, values.next().get());
}
context.write(key, new IntWritable(maxValue));

}
}

更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-04/100241p2.htm

Copyright © Linux教程網 All Rights Reserved