歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Clojure:ZeroMQ的入門DEMO

Clojure:ZeroMQ的入門DEMO

日期:2017/3/1 9:34:19   编辑:Linux編程

假設你已經知道什麼是ZeroMQ(不知道的話可以看這個:http://zh.wikipedia.org/wiki/%C3%98MQ),以下就給出在Clojure中如何使用ZeroMQ(感謝此文作者:http://patternhatch.com/2013/06/12/messaging-using-clojure-and-zeromq/)。
1. 創建一個Clojure項目,這裡我們用leiin。lein new app zmq-test
2. 在project.clj文件中添加
[com.rmoquin.bundle/jeromq "0.2.0"]
cheshire "5.3.1"]]
其中jeromq就是我們需要使用的ZeroMQ類庫(純Java實現),cheshire用於雙向處理json。
3. 打開core.clj文件,輸入如下代碼:

(ns zmq-test.core
(:import [org.jeromq ZMQ])
(:require (cheshire [core :as c])))

(def ctx (ZMQ/context 1))

;; REQ/REP [Request-Reply] Pattern
;; In REPL, input
;; (future-call echo-server)
;; (echo "hi")
;; to run the demo function
(defn echo-server
[]
(let [s (.socket ctx ZMQ/REP)]
(.bind s "tcp:// 127.0.0.1:5555")
(loop [msg (.recv s)]
(.send s msg)
(recur (.recv s)))))

(defn echo
[msg]
(let [s (.socket ctx ZMQ/REQ)]
(.connect s "tcp:// 127.0.0.1:5555")
(.send s msg)
(println "Server replied:" (String. (.recv s)))
(.close s)))

;; PUB/SUB [Publish-Subscribe] Pattern
;; In REPL, input
;; (future-call market-data-publisher)
;; (get-market-data 100)
;; to run the demo function
(defn market-data-publisher
[]
(let [s (.socket ctx ZMQ/PUB)
market-data-event (fn []
{:symbol (rand-nth ["CAT" "UTX"])
:size (rand-int 1000)
:price (format "%.2f" (rand 50.0))})]
(.bind s "tcp:// 127.0.0.1:6666")
(while :true
(.send s (c/generate-string (market-data-event))))))

(defn get-market-data
[num-events]
(let [s (.socket ctx ZMQ/SUB)]
(.subscribe s "")
(.connect s "tcp://127.0.0.1:6666")
(dotimes [_ num-events]
(println (c/parse-string (String. (.recv s)))))
(.close s)))

;; PUSH/PULL [Pipeline] Pattern
;; In REPL, input
;; (future-call collector)
;; (future-call worker)
;; (future-call worker)
;; (future-call worker)
;; (dispatcher 100)
;; to run the demo function
(defn dispatcher
[jobs]
(let [s (.socket ctx ZMQ/PUSH)]
(.bind s "tcp://127.0.0.1:7777")
(Thread/sleep 1000)
(dotimes [n jobs]
(.send s (str n)))
(.close s)))

(defn worker
[]
(let [rcv (.socket ctx ZMQ/PULL)
snd (.socket ctx ZMQ/PUSH)
id (str (gensym "w"))]
(.connect rcv "tcp://127.0.0.1:7777")
(.connect snd "tcp://127.0.0.1:8888")
(while :true
(let [job-id (String. (.recv rcv))
proc-time (rand-int 100)]
(Thread/sleep proc-time)
(.send snd (c/generate-string {:worker-id id
:job-id job-id
:processing-time proc-time}))))))

(defn collector
[]
(let [s (.socket ctx ZMQ/PULL)]
(.bind s "tcp://127.0.0.1:8888")
(while :true
(->> (.recv s)
(String.)
(c/parse-string)
(println "Job completed:")))))

代碼中包括了ZeroMQ的三種模式,可以直接在REPL中進行測試。但是這只是很簡單的Hello World程序,如果要將ZeroMQ用於實際生產環境中的話,還有很多環節需要考慮和完善。

Copyright © Linux教程網 All Rights Reserved