歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> 在Hadoop中重寫FileInputFormat類以處理二進制格式存儲的整數

在Hadoop中重寫FileInputFormat類以處理二進制格式存儲的整數

日期:2017/3/1 9:42:01   编辑:Linux編程

最近開始使用MapReduce,發現網上大部分例子都是對文本數據進行處理的,也就是說在讀取輸入數據時直接使用默認的TextInputFormat進行處理即可。對於文本數據處理,這個類還是能滿足一部分應用場景。但是如果要處理以二進制形式結構化記錄存儲的文件時,這些類就不再適合了。

本文以一個簡單的應用場景為例:對按照二進制格式存儲的整數做頻數統計。當然,也可以在此基礎上實現排序之類的其他應用。實現該應用的主要難點就是如何處理輸入數據。參考《權威指南·第三版》得知需要繼承FileInputFormat這個類,並實現以下三個方法:

class MyInputFormat extends FileInputFormat<Type1, Type2> {
/*
* 查詢判斷當前文件是否可以分塊?"true"為可以分塊,"false"表示不進行分塊
*/
protected boolean isSplitable(Configuration conf, Path path) {

}

/*
* MapReduce的客戶端調用此方法得到所有的分塊,然後將分塊發送給MapReduce服務端。
* 注意,分塊中不包含實際的信息,而只是對實際信息的分塊信息。具體的說,每個分塊中
* 包含當前分塊對應的文件路徑,當前分塊在該文件中起始位置,當前分塊的長度以及對應的
* 實際數據所在的機器列表。在實現這個函數時,將這些信息填上即可。
* */
public List<InputSplit> getSplits(Configuration conf) throws IOException {
}

/*
* 類RecordReader是用來創建傳給map函數的Key-Value序列,傳給此類的參數有兩個:一個分塊(split)和作業的配置信息(context).
* 在Mapper的run函數中可以看到MapReduce框架執行Map的邏輯:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 調用RecordReader方法的nextKeyValue,生成新的鍵值對。如果當前分塊(Split)中已經處理完畢了,則nextKeyValue會返回false.退出run函數
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
public RecordReader<LongWritable, IntWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
}

--------------------------------------分割線 --------------------------------------

Ubuntu 13.04上搭建Hadoop環境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu上搭建Hadoop環境(單機模式+偽分布模式) http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu下Hadoop環境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

單機版搭建Hadoop環境圖文教程詳解 http://www.linuxidc.com/Linux/2012-02/53927.htm

--------------------------------------分割線 --------------------------------------

在RecordReader函數中實現以下幾個接口

public class BinRecordReader extends RecordReader<LongWritable, IntWritable> {
/*關閉文件流
* */
public void close() {}

/*
* 獲取處理進度
**/
public float getProgress() {}

/*
* 獲取當前的Key
* */
public LongWritable getCurrentKey() throws IOException,
InterruptedException {}

/* 獲取當前的Value
* */
public IntWritable getCurrentValue() throws IOException,InterruptedException {}

/*
* 進行初始化工作,打開文件流,根據分塊信息設置起始位置和長度等等
* */
public void initialize(InputSplit inputSplit, TaskAttemptContext context)
throws IOException, InterruptedException {}

/*生成下一個鍵值對
**/
public boolean nextKeyValue() throws IOException, InterruptedException {
}
}

更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-07/104417p2.htm

Copyright © Linux教程網 All Rights Reserved