歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Storm如何保證at least once語義?

Storm如何保證at least once語義?

日期:2017/3/1 9:27:07   编辑:Linux編程

背景

前期收到的問題:

1、在Topology中我們可以指定spout、bolt的並行度,在提交Topology時Storm如何將spout、bolt自動發布到每個服務器並且控制服務的CPU、磁盤等資源的?

2、Storm處理消息時會根據Topology生成一棵消息樹,Storm如何跟蹤每個消息、如何保證消息不丟失以及如何實現重發消息機制?

本篇看看storm是通過什麼機制來保證消息至少處理一次的語義的,並回答第2個問題。

storm中的一些原語


要說明上面的問題,得先了解storm中的一些原語,比如:

  1. tuple和message
    在storm中,消息是通過tuple來抽象表示的,每個tuple知道它從哪裡來,應往哪裡去,包含了其在tuple-tree(如果是anchored的話)或者DAG中的位置,等等信息。

  2. spout
    spout充當了tuple的發送源,spout通過和其它消息源,比如kafka交互,將消息封裝為tuple,發送到流的下游。

  3. bolt
    bolt是tuple的實際處理單元,通過從spout或者另一個bolt接收tuple,進行業務處理,將自己加入tuple-tree(通過在emit方法中設置anchors)或DAG,然後繼續將tuple發送到流的下游。
  4. acker
    acker是一種特殊的bolt,其接收來自spout和bolt的消息,主要功能是追蹤tuple的處理情況,如果處理完成,會向tuple的源頭spout發送確認消息,否則,會發送失敗消息,spout收到失敗的消息,根據配置和自定義的情況會進行消息的丟棄、重放處理。

spout、bolt、acker的關系

  1. spout將tuple發送給流的下游的bolts.
  2. bolt收到tuple,處理後發送給下游的bolts.
  3. spout向acker發送請求ack的消息.
  4. bolt向acker發送請求ack的消息.
  5. acker向bolt和spout發送確認ack的消息.

簡單的關系如下所示:

上圖展示了spout、bolts等形成了一個DAG,如何追蹤這個DAG的執行過程,就是storm保證僅處理一次消息的語義的機制所在。

storm如何追蹤消息(tuple)的處理

spout在調用emit/emitDirect方法發送tuple時,會以單播或者廣播的方式,將消息發送給流的下游的component/task/bolt,如果配置了acker,那麼會在每次emit調用之後,向acker發送請求ack的消息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向acker發送請求ack消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;

;; rooted?表示是否設置了acker
(if (and rooted?
        (not (.isEmpty out-ids)))
 (do
   (.put pending root-id [task-id
                          message-id
                          {:stream out-stream-id :values values}
                          (if (sampler) (System/currentTimeMillis))])
   (task/send-unanchored task-data
                         ;;表示這是一個流初始化的消息
                         ACKER-INIT-STREAM-ID 
                         ;;將下游組件的out-id和0組成一個異或鏈,發送給acker用於追蹤
                         [root-id (bit-xor-vals out-ids) task-id] 
                         overflow-buffer))

 ;; 如果沒有配置acker,則調用自身的ack方法
 (when message-id
   (ack-spout-msg executor-data task-data message-id
                  {:stream out-stream-id :values values}
                  (if (sampler) 0) "0:")))

從上面的代碼可以看出,每次emit tuple後,spout會向acker發送一個流ID為ACKER-INIT-STREAM-ID的消息,用於將DAG或者tuple-tree中的節點信息交給acker,acker會利用這個信息來追蹤tuple-tree或DAG的完成。

而spout調用emit/emitDirect方法,將tuple發到下游的bolts,也同時會發送用於追蹤DAG完成情況的信息:

;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; spout向流的下游emit消息
;;;;;;;;;;;;;;;;;;;;;;;;;;;;

(let [tuple-id (if rooted?
                ;; 如果有acker,tuple的MessageId會包含一個<root-id,id>的哈希表
                ;; root-id和id都是long型64位整數
                (MessageId/makeRootId root-id id)
                (MessageId/makeUnanchored))
     ;;實例化tuple
     out-tuple (TupleImpl. worker-context
                           values
                           task-id
                           out-stream-id
                           tuple-id)]

 ;; 發送至隊列,最終發送給流的下游的task/bolt
 (transfer-fn out-task
              out-tuple
              overflow-buffer)
 ))

這個追蹤信息是什麼呢?

如果是spout -> bolt或者bolt -> bolt,這個信息就是tuple的MessageId,其內部維護一個哈希表:

// map anchor to id
private Map<Long, Long> _anchorsToIds;

鍵為root-id,表示spout,值表示tuple在tuple-tree或者DAG的根(spout)或者經過的邊(bolt),但這裡沒有利用任何常規意義上的“樹”的算法,而是采用異或的方式來存儲這個值:

  1. spout -> bolt,值被初始化為一個long型64位整數.
  2. bolt -> bolt,值被初始化為一個long型64位整數,並和_anchorsToIds中的舊值進行按位異或,將結果更新到_anchorsToIds中.

如果是spout -> acker,或者bolt -> acker,那麼用於追蹤的是tuple的values:

  1. spout -> acker : [root-id (bit-xor-vals out-ids) task-id]
  2. bolt -> acker : [root (bit-xor id ack-val) ..]

下面給出上面調用的bit-xor-vals和bit-xor方法的代碼:

(defn bit-xor-vals
  [vals]
  (reduce bit-xor 0 vals))

(defn bit-xor
  "Bitwise exclusive or"
  {:inline (nary-inline 'xor)
   :inline-arities >1?
   :added "1.0"}
  ([x y] (. clojure.lang.Numbers xor x y))
  ([x y & more]
    (reduce1 bit-xor (bit-xor x y) more)))

示例

說起來有點抽象,看個例子。

假設我們有1個spout,n個bolt,1個acker:

1.spout

spout發送tuple到下游的bolts:

;; id_1是發送到bolt_1的tuple-id,依此類推
spout : 
  ->bolt_1 : id_1
  ->bolt_2 : id_2
  ..
  ->bolt_n : id_n

2.bolt

bolt收到tuple,在execute方法中進行必要的處理,然後調用emit方法,最後調用ack方法:

;; bolt_1調用emit方法,追蹤消息的這樣一個值:讓id_1和bid_1按位進行異或.
;; bid_1和id_1類似,是個long型的64位隨機整數,在emit這一步生成
  bolt_1 emit : id_1 ^ bid_1

;; bolt_1調用ack方法,並將值表達為如下方式的異或鏈的結果
  bolt_1 ack : 0 ^ bid_1 ^ id_1 ^ bid_1 = 0 ^ id_1

以上,可以看出bolt進行了emit-ack組合後,其自身在異或鏈中的作用消失了,也就是說tuple在此bolt得到了處理。

(當然,此時的ack還沒有得到acker的確認,假設acker確認了,那麼上面所說的tuple在bolt得到了處理就成立了。)

來看看acker的確認。

3.acker

acker收到來自spout的tuple:

;; spout發消息給acker,tuple的MessageId包含下面的異或鏈的結果
spout -> acker : 0 ^ id_1 ^ id_2 ^ .. ^ id_n

;; acker收到來spout的消息,對tuple的ackVal進行處理,如下所示:
acker : 0 ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_1 ^ id_2 ^ .. ^ id_n

acker收到來自bolt的tuple:

;; bolt_1發消息給acker:
bolt_1 -> acker : 0 ^ id_1

;; acker維護的對應此tuple的源spout的ackVal : 
ackVal : 0 ^ id_1 ^ id_2 ^ .. ^ id_n

;; acker進行確認,也就是拿上面的兩個值進行異或:
acker : (0 ^ id_1) ^ (0 ^ id_1 ^ id_2 ^ .. ^ id_n) = 0 ^ id_2 ^ .. ^ id_n
 

可以看出,bolt_1向acker請求ack,acker收到請求ack,異或之後,id_1的作用消失。也就是說,bolt_1已處理完畢這個tuple。

所以,在acker看來,如果某個bolt的處理完成,則此bolt在異或鏈中的作用就消失了。

如果所有的bolt 都得到處理,那麼acker將會觀察到ackVal值變成了0:

ackVal = 0
= (0 ^ id_1) ^ (0 ^ id_1 ^ .. ^ id_n) ^ .. ^ (0 ^ id_n) 
= (0 ^ 0) ^ (id_1 ^ id_1) ^ (id_2 ^ id_2) ^ .. ^ (id_n ^ id_n)

如果出現了ackVal = 0,說明兩個可能:

  1. spout發送的tuple都處理完成,tuple-tree或者DAG已完成。
  2. 概率性出錯,也就是說在極小的概率下,即使不按上面的確認流程來走,異或鏈的結果也可能出現0.但這個概率極小,小到什麼程度呢?
    用官方的話說就是,如果每秒發送1萬個ack消息,50,000,000年時才可能發生這種情況。

如果ackVal不為0,說明tuple-tree或DAG沒有完成。如果長時間不為0,通過超時,可以觸發一個超時回調,在這個回調中調用spout的fail方法,來進行重放。

如此,就保證了消息處理不會漏掉,但可能會重復。

結語

以上,就是storm保證消息至少處理一次的語義的機制 。

Storm進程通信機制分析 http://www.linuxidc.com/Linux/2014-12/110158.htm

Apache Storm 的歷史及經驗教訓 http://www.linuxidc.com/Linux/2014-10/108544.htm

Apache Storm 的詳細介紹:請點這裡
Apache Storm 的下載地址:請點這裡

Copyright © Linux教程網 All Rights Reserved