歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux編程 >> Linux編程 >> Python之RabbitMQ

Python之RabbitMQ

日期:2017/3/1 9:13:36   编辑:Linux編程
RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。 MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。 安裝RabbitMQ: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 安裝配置epel源:(詳見http://www.cnblogs.com/ernest-zhang/p/5714434.html 安裝erlang: yum -y install erlang 注:安裝erlang的時候碰到 Error: Package: erlang-erts-R14B-04.3.el6.i686 (epel) Requires: libz.so.1(ZLIB_1.2.2) [root@localhost ~]# yum whatprovides libz.so.1 Loaded plugins: rhnplugin This system is not registered with RHN. RHN support will be disabled. zlib-1.2.3-25.el6.i686 : The zlib compression and decompression library #提供壓縮與解壓縮庫 Repo : local Matched from: Other : libz.so.1 檢查發現應該是zlib的版本太老了,從網上下載最新的zlib-1.2.8-10.fc24.i686,然後使用RPM安裝後解決。 下載地址:http://www.zlib.net/ #zlib官網 http://rpmfind.net/linux/rpm2html/search.php?query=zlib #zlib下載網站 安裝rabbitMQ: yum -y install rabbitmq-server service rabbitmq-server start/stop 啟動和停止rabbitmq 安裝API,然後可以基於API操作rabbitmq 1 2 3 4 5 6 7 pip install pika or easy_install pika or 源碼 https://pypi.python.org/pypi/pika Python 操作RabbitMQ 發布端: 1 2 3 4 5 6 7 import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) #服務器地址 channel=connection.channel() channel.queue_declare(queue='Hi') #如果有隊列,略過;如果沒有,創建隊列 channel.basic_publish(exchange='',routing_key='cc',body='hello!world!!!') print("[x] sent 'hello,world!'") connection.close()

接收端:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import pika #創建一個連接對象,綁定rabbitmq的IP connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) #創建一個頻道對象 channel=connection.channel() #頻道中聲明指定queue,如果MQ中沒有指定queue就創建,如果有,則略過 channel.queue_declare(queue='Hi') #定義回調函數 def callback(ch,method,properties,body): print('[x] Recieved %r'%body) # channel.close() #no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq,callback:回調函數,queue:指定隊列 channel.basic_consume(callback,queue='Hi',no_ack=True) # channel.basic_consume(callback,queue='cc') print('[*] Waiting for msg') channel.start_consuming()

1、acknowledgment 消息不丟失

no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會重新將該任務添加到隊列中。

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag)
  • basic_comsume中的no_ack=False
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=False)  
print('[*] Waiting for msg')
channel.start_consuming()

durable 消息不丟失

消息生產者端發送消息時掛掉了,消費者接消息時掛掉了,以下方法會讓RabbitMQ重新將該消息添加到隊列中:

  • 回調函數中的ch.basic_ack(delivery_tag=method.delivery_tag),消費端需要做的
  • basic_comsume中的no_ack=False,消費端需要做的
  • 發布消息端的basic_publish添加參數properties=pika.BasicProperties(delivery_mode=2),生產者端需要做的
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')  # 如果有,略過;如果沒有,創建隊列
channel.basic_publish(exchange='',
                      routing_key='Hi',
                      body='hello!world!!!',
                      properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print("[x] sent 'hello,world!'")
connection.close()
生產者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者

消息獲取順序

默認消息隊列裡的數據是按照順序被消費者拿走,例如:消費者1去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。但有大部分情況下,消息隊列後端的消費者服務器的處理能力是不相同的,這就會出現有的服務器閒置時間較長,資源浪費的情況,那麼,我們就需要改變默認的消息隊列獲取順序!

channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列,這是消費者端需要做的

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
    print('[x] Recieved %r' % body)
    # channel.close()
    ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)     #改變默認獲取順序,誰來誰取
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
                      no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者

發布和訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

RabbitMQ中,所有生產者提交的消息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行存儲 。RabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少,只對前三種模式進行比較。

exchange type = fanout

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。

1.可以理解為路由表的模式

2.這種模式不需要RouteKey

3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。

4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。

import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
msg='456'
channel.basic_publish(exchange='logs_fanout',routing_key='',body=msg)
print('開始發送:%s'%msg)
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#綁定相關隊列名稱
channel.queue_bind(exchange='logs_fanout',queue=queue_name)
def callback(ch,method,properties,body):
    print('[x] %r'%body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者

關鍵字

任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。 1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字符串,下文稱其為default Exchange)。 2.這種模式下不需要將Exchange進行任何綁定(binding)操作 3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。 4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
serverity='error'
msg='123'
channel.basic_publish(exchange='logs_direct_test1',routing_key=serverity,body=msg)
print('開始發送:%r:%r'%(serverity,msg))
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error','info','warning',]
for serverity in serverities:
    channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
    print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者1
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error',]
for serverity in serverities:
    channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
    print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者2

模糊訂閱

任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上 1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的隊列。 2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。 3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。 4.“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。 5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
                         type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)
for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm

RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm

用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm

RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm

Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm

在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm

RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm

RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡

Copyright © Linux教程網 All Rights Reserved