歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> 結合使用Hadoop和Couchbase

結合使用Hadoop和Couchbase

日期:2017/2/27 16:01:12   编辑:Linux教程
Hadoop 非常適合處理大量數據並將該信息解析為您可查詢的較小的信息集。但是,通過與 Couchbase Server 集成,您可以對信息執行實時查詢和報告,同時繼續使用 Hadoop 處理大型數據集和數據集的繁重處理工作。Couchbase Server 還使用了一個 MapReduce 查詢系統,這使您能夠輕松地遷移和集成索引和查詢系統,從而有效地提取和操作信息。

Hadoop 和數據處理

Hadoop 將許多重要特性結合在一起,這使 Hadoop 對於將大量數據分解為更小、實用的數據塊非常有用。

Hadoop 的主要組件是 HDFS 文件系統,它支持將信息分布到整個集群中。對於使用這種分布格式存儲的信息,可以通過一個名為 MapReduce 的系統在每個集群節點上進行單獨處理。MapReduce 進程將存儲在 HDFS 文件系統中的信息轉換為更小的、經過處理的、更容易管理的數據塊。

因為 Hadoop 可在多個節點上運行,所以可以使用它來處理大量輸入數據,並將這些數據簡化為更實用的信息塊。此過程可使用一個簡單的 MapReduce 系統來處理。

MapReduce 轉換傳入信息(不一定為結構化格式),將該信息轉換為一種可更輕松地使用、查詢和處理的結構。

例如,一種典型的用途是處理來自數百個不同應用程序的日志信息,以便可以識別特定的問題、計數或其他事件。通過使用 MapReduce 格式,您可以開始度量並查找趨勢,將平常非常多的信息轉換為更小的數據塊。舉例而言,在查看某個 Web 服務器的日志時,您可能希望查看特定頁面上的特定范圍中發生的錯誤。您可以編寫一個 MapReduce 函數來識別特定頁面上的特定錯誤,並在輸出中生成該信息。使用此方法,您可從日志文件中精減多行信息,得到一個僅包含錯誤信息的小得多的記錄集合。

理解 MapReduce

MapReduce 的工作方式分兩個階段。映射 (map) 過程獲取傳入信息,並將這些信息映射到某種標准化的格式。對於某些信息類型,此映射可以是直接和顯式的。例如,如果要處理 Web 日志等輸入數據,那麼僅從 Web 日志的文本中提取一列數據即可。對於其他數據,映射可能更復雜。在處理文本信息時,比如研究論文,您可能需要提取短語或更復雜的數據塊。

精減 (reduce) 階段用於收集和匯總數據。精減實際上能夠以多種不同方式發生,但典型的過程是處理一個基本計數、總和或其他基於來自映射階段的個別數據的統計數據。

想象一個簡單的示例,比如 Hadoop 中用作示例 MapReduce 的字數,映射階段將對原始文本進行分解,以識別各個單詞,並為每個單詞生成一個輸出數據塊。reduce 函數獲取這些映射的信息塊,對它們進行精減,以便在所看到的每個惟一單詞上進行遞增。給定一個包含 100 個單詞的文本文件,映射過程將生成 100 個數據塊,但精減階段可對此進行匯總,提供惟一單詞的數量(比如 56 個)和每個單詞出現的次數。

借助 Web 日志,映射將獲取輸入數據,為日志文件中的每個錯誤創建一條記錄,然後為每個錯誤生成一個數據塊,其中包含日期、時間和導致該問題的頁面。

在 Hadoop 內,MapReduce 階段會出現在存儲各個源信息塊的各個節點上。這使 Hadoop 能夠處理以下大型信息集:通過允許多個節點同時處理數據。例如,對於 100 個節點,可以同時處理 100 個日志文件,比通過單個節點快得多地簡化許多 GB(或 TB)的信息。

Hadoop 信息

核心 Hadoop 產品的一個主要限制是,無法在數據庫中存儲和查詢信息。數據添加到 HDFS 系統中,但您無法要求 Hadoop 返回與某個特定數據集匹配的所有數據的列表。主要原因是 Hadoop 不會存儲、結構化或理解存儲在 HDFS 中的數據的結構。這正是 MapReduce 系統需要將信息分析並處理為更加結構化的格式的原因。

但是,我們可以將 Hadoop 的處理能力與更加傳統的數據庫相結合,使我們可以查詢 Hadoop 通過自己的 MapReduce 系統生成的數據。可能的解決方案有許多,其中包括一些傳統 SQL 數據庫,但我們可以通過使用 Couchbase Server 來保持 MapReduce 風格(它對大型數據集非常有效)。

系統之間的數據共享的基本結構如 圖 1 所示。


圖 1. 系統之間的數據共享的基本結構
系統之間的數據共享的基本結構

安裝 Hadoop

如果您尚未安裝 Hadoop,最簡單的方法是使用一個 Cloudera 安裝。為了保持 Hadoop、Sqoop 和 Couchbase 之間的兼容性,最好的解決方案是使用 CDH3 安裝(參閱 參考資料)。為此,您需要使用 Ubuntu 10.10 到 11.10 版。更高的 Ubuntu 版本會引入不兼容問題,因為它們不再支持 Cloudera Hadoop 安裝所需的一個包。

在安裝之前,請確保已經安裝了一個 Java™ 虛擬機,確保在 JAVA_HOME 變量中為 JDK 配置了正確的主目錄。請注意,您必須擁有完整的 Java 開發工具包,而不只是擁有 Java 運行時環境 (JRE),因為 Sqoop 將代碼編譯為 Couchbase Server 與 Hadoop 之間的導出和導入數據。

要在 Ubuntu 和類似的系統上使用 CDH3 安裝,您需要執行以下步驟:

  1. 下載 CDH3 配置包。這會將 CDH3 源文件的配置添加到 apt 存儲庫中。
  2. 更新您的存儲庫緩存:$ apt-get update
  3. 安裝主要 Hadoop 包:$ apt-get install hadoop-0.20
  4. 安裝 Hadoop 組件(參見 清單 1)。


    清單 1. 安裝 Hadoop 組件
    						
    $ for comp in namenode datanode secondarynamenode jobtracker tasktracker
    do
    apt-get install hadoop-0.20-$comp
    done
    
  5. 編輯配置文件,以確保您設置了核心組件。
  6. 編輯 /etc/hadoop/conf/core-site.xml,使其如 清單 2 所示。


    清單 2. 編輯後的 /etc/hadoop/conf/core-site.xml 文件
    						
    <configuration>
      <property>
        <name>fs.default.name</name>
        <value>hdfs://localhost:9000</value>
      </property>
    </configuration>
    


    這將配置存儲數據的默認 hdfs 位置。
    編輯 /etc/hadoop/conf/hdfs-site.xml(參見 清單 3)。


    清單 3. 編輯後的 /etc/hadoop/conf/hdfs-site.xml 文件
    						
    <configuration>
      <property>
        <name>dfs.replication</name>
        <value>1</value>
      </property>
    </configuration>
    


    這支持復制存儲的數據。
    編輯 /etc/hadoop/conf/mapred-site.xml(參見 清單 4)。


    清單 4. 編輯後的 /etc/hadoop/conf/mapred-site.xml 文件
    						
    <configuration>
      <property>
        <name>mapred.job.tracker</name>
        <value>localhost:9001</value>
      </property>
    </configuration>
    


    這實現了 MapReduce 的作業跟蹤器。
  7. 最後,編輯 Hadoop 環境,使其正確地指向 /usr/lib/hadoop/conf/hadoop-env.sh 中您的 JDK 安裝目錄。其中會有一個注釋掉的 JAVA_HOME 變量行。您應該取消注釋它,並將它設置為您的 JDK 位置。例如:export JAVA_HOME=/usr/lib/jvm/java-1.6.0-openjdk
  8. 現在,在您的系統上啟動 Hadoop。最簡單的方法是使用 start-all.sh 腳本:$ /usr/lib/hadoop/bin/start-all.sh

假設所有設置均已正確配置,您現在應有一個正在運行的 Hadoop 系統。

Couchbase Server 概述

Couchbase Server 是一個集群化的、基於文檔的數據庫系統,它使用一個緩存層來提供非常快的數據訪問,將大部分數據都存儲在 RAM 中。該系統使用多個節點和一個自動分散在整個集群上的緩存層。這實現了一種彈性,您可擴大和緊縮集群,以便利用更多 RAM 或磁盤 I/O 來幫助提升性能。

Couchbase Server 中的所有數據最終會持久存儲在磁盤中,但最初會通過緩存層執行寫入和更新操作,這正是提供高性能的源泉,是我們通過處理 Hadoop 數據來獲得實時信息和查詢內容時可利用的優勢。

Couchbase Server 的基本形式是一個基本文檔和基於鍵/值的存儲。只有在您知道文檔 ID 時,才能檢索集群提供的信息。在 Couchbase Server 2.0 中,您可以將文檔存儲為 JSON 格式,然後使用視圖系統在存儲的 JSON 文檔上創建一個視圖。視圖是在存儲在數據庫中的文檔上執行的一個 MapReduce 組合。來自視圖的輸出是一個索引,它通過 MapReduce 函數來匹配您定義的結構。索引的存在為您提供了查詢底層的文檔數據的能力。

我們可以使用此功能從 Hadoop 獲取已處理的信息,將該信息存儲在 Couchbase Server 中,然後使用它作為查詢該數據的基礎。Couchbase Server 可以方便地使用一個 MapReduce 系統來處理文檔和創建索引。這在用於處理數據的方法之間提供了一定的兼容性和一致性水平。

安裝 Couchbase Server

安裝 Couchbase Server 很容易。從 Couchbase 網站下載適合您平台的 Couchbase Server 2.0 版本(參見 參考資料),使用 dpkg 或 RPM(具體依賴於您的平台)安裝該包。

安裝之後,Couchbase Server 會自動啟動。要配置它,請打開一個 Web 浏覽器,並將它指向您的機器的 localhost:8091(或使用該機器的 IP 地址遠程訪問它)。

按照屏幕上的配置說明進行操作。您可使用在安裝期間提供的大部分默認設置,但最重要的設置是寫入數據庫中的數據的數據文件的位置,以及您分配給 Couchbase Server 的 RAM 量。

使 Couchbase Server 能夠與 Hadoop 連接器通信

Couchbase Server 使用 Sqoop 連接器與您的 Hadoop 集群通信。Sqoop 提供了一個連接在 Hadoop 與 Couchbase Server 之間批量傳輸數據。

從技術上講,Sqoop 是一個設計用於在結構化數據庫與 Hadoop 之間轉換信息的應用程序。Sqoop 這個名稱實際上來源於 SQL 和 Hadoop。

安裝 Sqoop

如果使用 CDH3 安裝,您可使用報管理器來安裝 Sqoop:$ sudo apt-get install sqoop

這將把 Sqoop 安裝在 /usr/lib/sqoop 中。

注意:Sqoop 中一個最新的 bug 表明它有時會嘗試傳輸uowu的數據集。修補程序包含在 Sqoop 1.4.2 版中。如果遇到問題,請嘗試使用 V1.4.2 或更高的版本。

安裝 Couchbase Hadoop Connector

Couchbase Hadoop Connector 是一個支持 Sqoop 與 Couchbase 之間的連接的 Java jar 文件集合。從 Couchbase 網站下載 Hadoop 連接器(參閱 參考資料)。該文件封裝為一個 zip 文件。解壓它,然後運行其中的 install.sh 腳本,提供 Sqoop 系統的位置。例如:$ sudo bash install.sh /usr/lib/sqoop

這將安裝所有必要的庫和配置文件。現在我們可以開始在兩個系統之間交換信息了。

將數據從 Couchbase Server 導入 Hadoop

盡管該場景不是我們這裡將直接處理的場景,但需要注意我們可從 Couchbase Server 將數據導入 Hadoop。如果您在 Couchbase Server 中加載了大量數據,並希望利用 Hadoop 來處理和簡化它,這可能很有用。為此,您可以使用以下命令,從 Couchbase Server 將整個數據集加載到 HDFS 中的一個 Hadoop 文件中:$ sqoop import --connect http://192.168.0.71:8091/pools --table cbdata

這裡提供的 URL 是 Couchbase Server 桶池 (bucket pool) 的位置。這裡指定的表實際上是 HDFS 中將存儲數據的目錄的名稱。

數據本身被存儲為來自 Couchbase Server 的信息的一種鍵/值轉儲形式。在 Couchbase Server 2.0 中,這意味著數據是使用惟一文檔 ID 寫出的,包含記錄的 JSON 值。

將 JSON 數據寫入 Hadoop MapReduce

要在 Hadoop 與 Couchbase Server 之間交換信息,需要使用一種通用語言來表達這些信息,在本例中使用的是 JSON(參見 清單 5)。


清單 5. 在 Hadoop MapReduce 中輸出 JSON
				
package org.mcslp;

import java.io.IOException;
import java.util.*;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;
import com.google.gson.*;

public class WordCount {

    public static class Map extends MapReduceBase implements Mapper<LongWritable, 
Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(LongWritable key, Text value, OutputCollector<Text, 
IntWritable> output, Reporter reporter) throws IOException {
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line);
            while (tokenizer.hasMoreTokens()) {
                word.set(tokenizer.nextToken());
                output.collect(word, one);
            }
        }
    }

    public static class Reduce extends MapReduceBase implements Reducer<Text, 
IntWritable, Text, Text> {

        class wordRecord {
            private String word;
            private int count;
            wordRecord() {
            }
        }

        public void reduce(Text key,
                           Iterator<IntWritable> values,
                           OutputCollector<Text, Text> output,
                           Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }

            wordRecord word = new wordRecord();
            word.word = key.toString();;
            word.count = sum;

            Gson json = new Gson();
            System.out.println(json.toJson(word));
            output.collect(key, new Text(json.toJson(word)));
        }
    }

    public static void main(String[] args) throws Exception {
        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);
        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);
    }
}

該代碼是 Hadoop 發行版所提供的字數示例的修改版。

此版本使用 Google Gson 庫從處理過程的精減階段寫入 JSON 信息。為了方便起見,我們使用了一個新類 (wordRecord),它由 Gson 轉換為一條 JSON 記錄,這種記錄是 Couchbase Server 逐個文檔地處理和解析內容所需的格式。

請注意,我們沒有為 Hadoop 定義一個 Combiner 類。這將阻止 Hadoop 嘗試重新精減該信息,該操作在當前的代碼中會失敗,因為我們的精減階段僅接收該單詞和一位數,並輸出一個 JSON 值。對於輔助的精減/組合階段,我們需要解析 JSON 輸入或定義一個新 Combiner 類,以便輸出信息的 JSON 版本。這稍微簡化了定義。

要在 Hadoop 中使用此代碼,首先需要將 Google Gson 庫復制到 Hadoop 目錄中 (/usr/lib/hadoop/lib)。然後重新啟動 Hadoop,以確保 Hadoop 已經正確識別出該庫。

接下來,將您的代碼編譯到一個目錄中: $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar:./google-gson-2.2.1/gson-2.2.1.jar -d wordcount_classes WordCount.java

現在為您的庫創建一個 jar 文件: $ jar -cvf wordcount.jar -C wordcount_classes/

完成此過程後,您可以將一些文本文件復制到某個目錄中,然後使用此 jar 文件將這些文本文件處理為許多獨立的單詞,創建一條 JSON 記錄來包含每個單詞和計數。例如,要在一些 Project Gutenberg 文本上處理此數據: $ hadoop jar wordcount.jar org.mcslp.WordCount /user/mc/gutenberg /user/mc/gutenberg-output

這將在我們的目錄中生成已由 Hadoop 內的 MapReduce 函數統計的單詞列表。

將數據從 Hadoop 導出到 Couchbase Server

要從 Hadoop 取回數據並導入 Couchbase Server 中,則需要使用 Sqoop 導出該數據: $ sqoop export --connect http://10.2.1.55:8091/pools --table ignored --export-dir gutenberg-output

此示例中忽略了 --table 參數,但 --export-dir 是要導出的信息所在的目錄的名稱。

在 Couchbase Server 中編寫 MapReduce

在 Hadoop 中,MapReduce 函數是使用 Java 編寫的。在 Couchbase Server 中,MapReduce 函數是使用 Javascript 編寫的。作為一種已解釋的語言,這意味著您不需要編譯視圖,它會支持您編輯和細化 MapReduce 結構。

要在 Couchbase Server 中創建一個視圖,請打開管理控制台(在 http://localhost:8091 上),然後單擊 View 按鈕。視圖收集在一個設計文檔中。您可以在單個設計文檔中創建多個視圖,也可以創建多個設計文檔。要提升服務器的總體性能,系統還支持一種可編輯的開發視圖以及一個無法編輯的生產視圖。生產視圖無法編輯是因為這麼做會使視圖索引無效,並會導致需要重新構建索引。

單擊 Create Development View 按鈕並命名您的設計文檔和視圖。

在 Couchbase Server 內,有兩個相同的函數:mapreducemap 函數用於將輸入數據(JSON 文檔)映射到某個表。然後使用 reduce 函數匯總和精減該表。reduce 函數是可選的,不是索引功能所必需的,所以,出於本文的目的,我們將忽略 reduce 函數。

對於 map 函數,函數的格式如 清單 6 所示。


清單 6. map 函數的格式
				
map(doc) { 

}

參數 doc 是每個存儲的 JSON 文檔。Couchbase Server 的存儲格式是一種 JSON 文檔,視圖是使用 Javascript 語言編寫的,所以我們可使用以下語句訪問 JSON 中一個名為 count 的字段:doc.count

要從 map 函數發出信息,可以調用 emit() 函數。emit() 函數接受兩個參數,第一個是鍵(用於選擇和查詢信息),第二個參數是相應的值。因此,我們可以創建一個 map 函數來使用來輸出單詞和計數,如 清單 7 中的代碼所示。


清單 7. 輸出單詞和計數的 map 函數
				
function (doc) {
  if (doc.word) {
  	emit(doc.word,doc.count);
  }
}

這將為每個輸出文檔輸出一行數據,其中包含文檔 ID(實際上是我們的單詞)、用作鍵的單詞和該單詞在源文本中出現的次數。可在 清單 8 中看到原始的 JSON 輸出。


清單 8. 原始的 JSON 輸出
				
{"total_rows":113,"rows":[
{"id":"acceptance","key":"acceptance","value":2},
{"id":"accompagner","key":"accompagner","value":1},
{"id":"achieve","key":"achieve","value":1},
{"id":"adulteration","key":"adulteration","value":1},
{"id":"arsenic","key":"arsenic","value":2},
{"id":"attainder","key":"attainder","value":1},
{"id":"beerpull","key":"beerpull","value":2},
{"id":"beware","key":"beware","value":5},
{"id":"breeze","key":"breeze","value":2},
{"id":"brighteyed","key":"brighteyed","value":1}
]
}

在輸出中,id 是文檔 ID,key 是您在 emit 語句中指定的鍵,value 是在 emit 語句中指定的值。

獲取實時數據

現在我們已在 Hadoop 中處理了信息,請將它導入 Couchbase Server 中,然後在 Couchbase Server 中為該數據創建了一個視圖,我們可以開始查詢已處理和存儲的信息了。視圖可使用一個 REST 樣式的 API 來訪問,或者在使用一個 Couchbase Server SDK 時,通過相應的視圖查詢函數來訪問它。

查詢可通過 3 種主要選擇來執行:

  • 單獨的鍵。例如,顯示與某個特定鍵(比如 'unkind')匹配的信息。
  • 鍵列表。您可提供一個鍵值數組,這將返回其鍵值與一個提供的值匹配的所有記錄。例如,['unkind','kind'] 將返回與其中一個單詞匹配的記錄。
  • 鍵范圍。您可指定一個開始和結束鍵。

例如,要找到一個指定的單詞的數量,可使用 key 參數進行查詢:

http://192.168.0.71:8092/words/_design/dev_words/_view/byword?connection_timeout=
            60000&limit=10&skip=0&key=%22breeze%22

Couchbase Server 會很自然地采用 UTF-8 排序方式輸出一個 MapReduce 的按指定的鍵排序的結果。這意味著您可以通過指定開始值和結束值來獲取一個值范圍。例如,要獲取 'breeze' 與 'kind' 之間的所有單詞,可使用:

http://192.168.0.71:8092/words/_design/dev_words/_view/byword?connection_timeout=
            60000&limit=10&skip=0&startkey=%22breeze%22&endkey=%22kind%22

該查詢很簡單,但非常強大,尤其是在您認識到可以將它與靈活的視圖系統結合使用,生成具有您想要的格式的數據的時候。

結束語

Hadoop 本身提供了一個強大的處理平台,但沒有提供從已處理的數據中實際提取有用信息的方法。通過將 Hadoop 連接到另一個系統,可使用該系統來查詢和提取信息。因為 Hadoop 使用 MapReduce 進行相關處理,所以您可以通過 Couchbase Server 中的 MapReduce 系統,利用 MapReduce 的知識來提供查詢平台。使用此方法,您可以在 Hadoop 中處理數據,以 JSON 文檔格式將數據從 Hadoop 導出到 Couchbase Server 中,然後在 Couchbase Server 中使用 MapReduce 查詢已處理的信息。

Copyright © Linux教程網 All Rights Reserved