歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> RabbitMQ 學習初入門

RabbitMQ 學習初入門

日期:2017/2/28 13:47:29   编辑:Linux教程

RabbitMQ是一個消息隊列,我們可以使用RabbitMQ 做消息隊列,消息通知的業務功能,而且根據網上的不可靠消息得出,RabbitMQ 的性能水平甚至比 activeMQ 還要好,所以也是我選擇認真去學習RabbitMQ的原因,當然我也有做個關於 activeMQ 的一些簡單的 Demo 有機會的話可以分享出來~

Rabbit 使用 Erlang 來編寫的AMQP 服務器。 什麼是 AMQP 本人已經百度給大家了:

AMQP,即Advanced Message Queuing Protocol,一個提供統一消息服務的應用層標准高級消息隊列協議,是應用層協議的一個開放標准,為面向消息的中間件設計。基於此協議的客戶端與消息中間件可傳遞消息,並不受客戶端/中間件不同產品,不同的開發語言等條件的限制。Erlang中的實現有 RabbitMQ等。

哎喲,先從按照說起吧~ 我使用macbook 來進行學習和開發的 順便提一下 本人是位果粉,別噴~ 使用 MAC OS 安非常簡單,還我是建議使用 MAC OS 的同學使用 brew 進行開發,沒有別的 就是因為方便。

使用這個命令之前 如果沒有安裝過 Homebrew 就安裝一下吧 有機會我也分享一下安裝方式 順便介紹一下這個讓你懶癌晚期的Homebrew。

打開終端輸入:

 brew install rabbitmq

然後會告訴你 無法獲得 /usr/local/sbin 的寫入權限。 因為是英文的錯誤提示,本人會看不會寫 所以直接寫中文給各位看了~ 還是那句 別噴~ 謝謝
好,接下來讓他有權限,其實這個目錄是沒有的sbin。

 mkdir /usr/local/sbin       創建目錄
 chmod 777 /usr/local/sbin   開權限,全開別煩
 brew link rabbitmq          link 一下

然後就沒有然後了

啟動 rabbitMQ

MacBook-Pro:~ Tony$ /usr/local/sbin/rabbitmq-server 

              RabbitMQ 3.5.6. Copyright (C) 2007-2015 Pivotal Software, Inc.
  ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
  ##  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/[email protected]
  ######  ##        /usr/local/var/log/rabbitmq/[email protected]
  ##########
              Starting broker... completed with 10 plugins.

目前為止已經安裝完 rabbitMQ 並且啟動起來 了 簡單到飛

開始廢話 說說概念,別方 我也是抄書的~

消費者和生產者,由於書本的篇幅太長,就抄了!簡單明了告訴你們,消息的產生者就是生產者,獲得消息的稱為消息消費者。我QQ M了你一下 我就是消息的生產者了,你在QQ看到我的消息了,你就是消息的消費者了,這才是正常寫書方式嘛··· 舉個開發的業務背景的例子,我支付了一張訂單,這個業務產生了幾個次要的業務,例如積分業務,我購買了訂單,訂單核心功能處理訂單的業務,然後生產了一個消息並通過rabbitMQ 發送了一個消息,內容為產生了一個訂單,然後訂閱了消息的消息消費者就會消費消息,然後根據消息處理自己的業務,該干嘛干嘛去~ 積分業務就去加你的積分等等。當然這個只是舉例,別吐槽 可以在台服務器或者一個業務方法上實現。 上面的舉例對於一些分布式的微服務上是非常常見的方式了~

幾個重要的概念:

信道:在連接到rabbitMQ 之後,你將可以創建並獲得一個AMQP信道,信道是創建在“真實的”TCP連接內的虛擬鏈接。所有的AMQP命令都是通過 AMQP 信道發出去的,每條信道會被指派一個唯一的ID。所有發布消息還是訂閱都是通過AMQP 信道去處理的。這個概念非常像端口號【但是不是端口啊 ,別說我混淆你】,而且他的英文名字可以理解到他的意思channel 有使用NIO 的開發者已經比較了解~ 其實使用AMQP信道的原因非常簡單,建立TCP連接的性能代價是非常高的,當你在不同的線程當中去創建TCP連接的方式去和RabbitMQ進行通信的代價是十分高的,但是在多個不同的線程當中通過channel 我們就可以通過同一個TCP連接進行通信了。好像寫得好亂不知道他們理不理解我寫什麼,唉 算了 本人的水平有限,況且大多數情況下都是我自己看而已,放棄~ 抽象圖如下 丑,不過先將就~

交換器 與 路由鍵: 這個好難解釋啊~ 直接說工作流程吧~ 哎喲其實應該把 隊列寫到這裡來,算了吧 不改了。 先記住,交換器當中裡面有N個隊列,而路由鍵 是讓交換器知道消息應該放到哪個隊列當中去。別廢話舉例: 產生消息 要告訴 RabbitMQ 這個消息是放到那個交換器上,【注意:交換器是由開發者去創建的你可以搞N個交換器出來,沒有管你】
上代碼:
this.channel.basicPublish(“hello-exchange”,”hola”,properties, SerializationUtils.serialize(msg));
首先 hello-exchange 是交換器的名字, hola 是路由鍵 這個讓hello-exchange 去判斷應該分發到那個隊列當中,SerializationUtils.serialize(msg) 將一個消息序列化。 channel 就不用說啦上面說了,是一個AMQP信道。
交換器有很多不同的類型一會會一一介紹。交換器的類型不同也注定了消息會分發到哪個隊列當中,所有尤其重要。

工作流程:

1、消費者訂閱消息,先創建一個交換器,如果交換器已經存在即返回一個交換器。當然你可以passive 為 true 如果 passive 為 true , 交換器已經存在返回一個已經存在的交換器,否則報錯。我暫時不知道用在哪裡,所以 先記住一般情況下 創建一個交換器,如果存在就返回一個已經存在的交換器,如果不存在則創建並返回。

JAVA 代碼如下【直接給源碼的方法定義,是不是很貼心】:

備注是自己翻譯的別噴

/**
     * Declare an exchange, via an interface that allows the complete set of
     * arguments.
     * @see com.rabbitmq.client.AMQP.Exchange.Declare
     * @see com.rabbitmq.client.AMQP.Exchange.DeclareOk
     * @param exchange the name of the exchange  【交換器名稱】
     * @param type the exchange type 【交換器類型 一會說 別著急】
     * @param durable true if we are declaring a durable exchange (the exchange will survive a server restart) 【是否持久化交換器,這個後面說,還是簡單說一下,糾結帝~ 就是交換器中的消息會持久化,就是會存在硬盤當中,宕機或重啟服務時會自動恢復當時會犧牲性能,除非你的SSD硬盤好快,當我沒有說過~】
     * @param autoDelete true if the server should delete the exchange when it is no longer in use  【當這個交換器 已經沒有人使用的時候 會自動刪除,longer 長時期 當時我不知道長時期是什麼時候,所以我會記住是 沒人用就自己刪掉 自動消失】
     * @param internal true if the exchange is internal, i.e. can't be directly
     * published to by a client. 【不知道什麼鬼意思,如果是內部 不會直接發送到客戶端,我一般寫false ,真心不知道他什麼意思,後面學下去應該明白】
     * @param arguments other properties (construction arguments) for the exchange 【其他參數 暫時沒有用到】
     * @return a declaration-confirm method to indicate the exchange was successfully declared  
     * @throws java.io.IOException if an error is encountered 【會拋出IO異常】
     */
Exchange.DeclareOk exchangeDeclare(String exchange,String type, boolean durable, boolean autoDelete, boolean internal,Map<String, Object> arguments) throws IOException;

2、消費者創建隊列,並綁定隊列到交換器當中,上寫路由鍵,讓交換器知道這個路由鍵的消息是跑到這個隊列當中的。

JAVA 代碼如下【直接給源碼的方法定義】:

/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue  【隊列名稱】 
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)  【持久化,這裡特別說一下,如果你想消息是持久化的,必須消息是持久化的,交換器也是持久化的,隊列更是持久化的,其中一個不是也無法恢復消息】
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)  【私有的,獨有的。 這個隊列之後這個應用可以消費,上面的英文注釋是 說restricted to this connection  就是限制在這個連接可以消費,就是說不限制channel信道咯,具體沒有試過,但是應該是這樣,除非備注騙我,我讀得書少,你唔好呃我!!!】 
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use) 【沒有人使用自動刪除】 注意:如果exclusive為true 最好 autodelete都為true 至於為什麼 這麼簡單自己想~
     * @param arguments other properties (construction arguments) for the queue 【其他參數沒有玩過】
     * @return a declaration-confirm method to indicate the queue was successfully declared  
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;/**
     * Declare a queue
     * @see com.rabbitmq.client.AMQP.Queue.Declare
     * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
     * @param queue the name of the queue
     * @param durable true if we are declaring a durable queue (the queue will survive a server restart)
     * @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
     * @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
     * @param arguments other properties (construction arguments) for the queue
     * @return a declaration-confirm method to indicate the queue was successfully declared
     * @throws java.io.IOException if an error is encountered
     */
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
綁定隊列和交換器 並指定路由鍵
/**
     * Bind a queue to an exchange.
     * @see com.rabbitmq.client.AMQP.Queue.Bind
     * @see com.rabbitmq.client.AMQP.Queue.BindOk
     * @param queue the name of the queue  【隊列名稱】
     * @param exchange the name of the exchange 【交換器名稱】
     * @param routingKey the routine key to use for the binding 【路由鍵】
     * @param arguments other properties (binding parameters) 【其他參數 還是那句沒有玩過】
     * @return a binding-confirm method if the binding was successfully created
     * @throws java.io.IOException if an error is encountered
     */
    Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;

3、然後消費者就去訂閱消息咯,注意在JAVA裡面的訂閱方法會產生線程阻塞的。

JAVA 代碼:

訂閱消息並消費
 /**
     * Start a non-nolocal, non-exclusive consumer, with
     * a server-generated consumerTag.
     * @param queue the name of the queue 【所訂閱消費的隊列】
     * @param autoAck true if the server should consider messages 【是否為自動確定消息,好像TCP的ack syn 啊,可怕的7層模型!!!一般寫true就可以】
     * acknowledged once delivered; false if the server should expect
     * explicit acknowledgements
     * @param callback an interface to the consumer object  【回調callback 這個馬上上代碼看看】
     * @return the consumerTag generated by the server
     * @throws java.io.IOException if an error is encountered
     * @see com.rabbitmq.client.AMQP.Basic.Consume
     * @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
     * @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
     */
    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

接收到消息之後,馬上有回調,來看看回調接口:

/**
 * Interface for application callback objects to receive notifications and messages from
 * a queue by subscription.
 * Most implementations will subclass {@link DefaultConsumer}.
 * <p/>
 * The methods of this interface are invoked in a dispatch
 * thread which is separate from the {@link Connection}'s thread. This
 * allows {@link Consumer}s to call {@link Channel} or {@link
 * Connection} methods without causing a deadlock.
 * <p/>
 * The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
 * dispatch threads. {@link Consumer}s should avoid executing long-running code
 * because this will delay dispatch of messages to other {@link Consumer}s on the same
 * {@link Channel}.
 *
 * @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
 * @see Channel#basicCancel
 */
public interface Consumer {
    /**
     * Called when the consumer is registered by a call to any of the
     * {@link Channel#basicConsume} methods.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleConsumeOk(String consumerTag);

    /**
     * Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleCancelOk(String consumerTag);

    /**
     * Called when the consumer is cancelled for reasons <i>other than</i> by a call to
     * {@link Channel#basicCancel}. For example, the queue has been deleted.
     * See {@link #handleCancelOk} for notification of consumer
     * cancellation due to {@link Channel#basicCancel}.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @throws IOException
     */
    void handleCancel(String consumerTag) throws IOException;

    /**
     * Called when either the channel or the underlying connection has been shut down.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
     */
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    /**
     * Called when a <code><b>basic.recover-ok</b></code> is received
     * in reply to a <code><b>basic.recover</b></code>. All messages
     * received before this is invoked that haven't been <i>ack</i>'ed will be
     * re-delivered. All messages received afterwards won't be.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     */
    void handleRecoverOk(String consumerTag);

    /**
     * Called when a <code><b>basic.deliver</b></code> is received for this consumer.
     * @param consumerTag the <i>consumer tag</i> associated with the consumer
     * @param envelope packaging data for the message
     * @param properties content header data for the message
     * @param body the message body (opaque, client-specific byte array)
     * @throws IOException if the consumer encounters an I/O error while processing the message
     * @see Envelope  寶寶累了 看重點,這個方法就是消息到達的時候回調的方法其他你們喜歡研究 可以認真看看英文備注。
     */
    void handleDelivery(String consumerTag,
                        Envelope envelope,
                        AMQP.BasicProperties properties,
                        byte[] body)
        throws IOException;
}

注意一下:其實也可以使用其他訂閱方式去消費消息的,可以使用get的方式去獲得一條消息,然後會在隊列當中給你一條消息,但是你會想就不需要使用上面的訂閱方法啦,直接跑個循環就可以啦。其實get方法會開啟訂閱獲得一條消息後關閉訂閱,這樣會產生不必要的性能開銷的,除非老板讓你搞卡一點,讓客戶掏點優化費,不然就別這樣干了。get的方法這裡就先不介紹啦~

4、上面所說的都是消息消費者的工作,這個step開始說消息生產者要做的。開啟一個交換器,當然這個交換器應該是存在不用創建的因為 上面所說消費者已經創建過了,但是其實誰去先創建都是可以的,消費者也可以創建,生產者也可以,這個倒是沒有什麼關系,主要是看你的業務需求【這句話甩鍋用】。由於已經貼出過創建交換器的代碼所以就不貼了,看上面。

與上面的交換器創建一樣。不貼代碼了自己拖上去看吧~

5、好像沒有什麼別的事情了,就是產生一個消息然後發過去。上代碼

JAVA代碼:

/**
     * Publish a message
     * @see com.rabbitmq.client.AMQP.Basic.Publish
     * @param exchange the exchange to publish the message to  【發送到那個交換器上】
     * @param routingKey the routing key  【路由鍵 決定消息去哪個隊列裡面混】
     * @param props other properties for the message - routing headers etc  【其他消息參數, 哎~ 這個我玩過,一會DEMO時間,表演一下】
     * @param body the message body 【消息體,一般把對象序列化一下發過去】
     * @throws java.io.IOException if an error is encountered
     */
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;

然後就沒有然後了 上面 第三步的 訂閱方法中的回調 就會有這個消息了。 好又到說概念的時候了。 困的同學先休息。

消息的投遞方式,我也困了其實:

【1】 消息MESSAGE_A 到達 queue_A 隊列
【2】 RabbitMQ 把 MESSAGE_A 發送到 APPLICATION A(消費者)
【3】 APPLICATION A 確定收到了消息 發個ack 高興一下
【4】 RabbitMQ 把 消息從 queue_A 隊列刪除。

上面其實也有說過,autoack參數 有印象吧~ 所以autoack 設置成true 就沒錯了。這裡就可以開展其他話題了,如果多個訂閱者怎麼辦呢,是不是多個訂閱者都會收到消息呢,答案是否定的。RabbitMQ的隊列會對訂閱者做一個循環,例如目前有兩個訂閱者訂閱了同一個隊列,serverA 和 serverB 他會循環去發送消息,隊列有 ID 1-10 的消息,ID 1消息會由 serverA 處理 ID 2 消息會由 serverB 處理 ID 3 消息會由 serverA 去處理 如此類推。
如果serverA 在發送ACK應答給 RabbitMQ之前 斷開連接【就是服務掛了,宕機了】RabbitMQ 會認為這個消息沒有處理,即沒有消費到這個消息,會把這個消息發送到 serverB 去處理。
而且在消息沒有ACK之前,RabbitMQ不會發你第二條消息的,所以如果你想等待消息的任務處理完之後再給第二個消息的話,可以將autoDelete設置成false,這樣你就可以在消息處理完之後再去ack。���樣也是一個非常通用的場景,以防扎堆處理消息。
其實這樣的設計也是非常好的,如果你出現了業務錯誤,執行消息處理的時候,剛好出現問題了,你可以通過斷開連接的方式【這裡所指的斷開是斷開RabbitMQ的connection】讓這個消息交給下個訂閱者,【這裡說一下,如果隊列沒有消費者去訂閱消息的話,消息會存在RabbitMQ當中等待有消費者去訂閱再去發送】,當然這也是在你沒有ack的情況下。在高級版本的RabbitMQ中可以使用reject命令,讓這個消息直接傳遞到下個訂閱者。【在RabbitMQ2.0可以使用】

AMQP信道channel與訂閱

如果 一個channel 訂閱了一個隊列就不能訂閱別的隊列了,也就是說一個channel只能訂閱一個隊列。所以你需要取消原來的訂閱,並將信道設置為“傳輸”模式。後面有時間寫後面的文章可能會說到。

交換器的類型:

direct :這個簡單,只要路由鍵一模一樣就投遞到相應的隊列當中。


fanout :這個也簡單,消息會投遞到所有綁定到這個交換器的隊列當中。


topic :這個就復雜一點了,我很客觀的簡單就簡單 復雜就復雜 從不忽悠。但是這個也好常用不能不學,好累。在路由鍵當中可以使用通配符。怎麼說呢,好糾結啊 不知道怎麼解釋啊 怎麼辦 在線等。

舉個例子吧,綁定隊列:

//綁定隊列 路由鍵是 tony.teamA 有路由鍵為tony.teamA的消息就投遞到這裡
this.channel.queueBind("queueA","topic_exchangeA","tony.teamA",null); 

//綁定隊列 路由鍵是 yan.teamA 有路由鍵為tony.teamB的消息就投遞到這裡
this.channel.queueBind("queueB","topic_exchangeA","yan.teamA",null); 

//綁定隊列 路由鍵是 *.teamA  有路由鍵為.teamA結尾的消息就投遞到這裡
this.channel.queueBind("queueC","topic_exchangeA","*.teamA",null); 

//綁定隊列 路由鍵是 chao.teamB 有路由鍵為chao.teamB的消息就投遞到這裡
this.channel.queueBind("queueD","topic_exchangeA","chao.teamB",null); 

//綁定隊列 路由鍵是 *.teamB 有路由鍵為.teamB結尾的消息就投遞到這裡
this.channel.queueBind("queueE","topic_exchangeA","*.teamB",null); 

//綁定隊列 路由鍵是 # 所有在這個交換器的消息都投遞到這裡
this.channel.queueBind("queueF","topic_exchangeA","#",null); 

// queueA[路由鍵:tony.teamA]、queueC[路由鍵:*.teamA]、queueF[路由鍵:#]會收到消息
this.channel.basicPublish("topic_exchangeA","tony.teamA",properties,SerializationUtils.serialize(msg));

// queueD[路由鍵:chao.teamB]、queueE[路由鍵:*.teamB]、queueF[路由鍵:#]會收到消息
this.channel.basicPublish("topic_exchangeA","chao.teamB",properties,SerializationUtils.serialize(msg));

然後基本的rabbitMQ 的東西就搞好了。不過這只是開始~

簡單說一下虛擬主機和隔離吧,簡單來說想MySQL 你可以創建很多個庫,裡面有很多個表。然後呢 rabbitMQ可以創建很多個虛擬主機,虛擬主機裡面有很多個交換器,交換器裡面有很多個隊列,解釋得完美。默認會提供一個 默認虛擬主機 vhost : “/”。後面找時間再說這個 vhost 吧。

重頭大戲上DEMO,由於方便我閱讀回憶,所以我忽略的封裝性,一切以容易快速看懂和回憶為目標。別噴~

生產者:

package com.maxfunner;

import com.maxfunner.mq.EndPoint;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Producer
 */
public class Producer {


    private Connection connection;
    private Channel channel;
    private Map<Long, String> messageMap = new HashMap<Long, String>();
    private int maxID = 0;

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";


    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");  //服務器地址
        factory.setUsername("guest");  //默認用戶名
        factory.setPassword("guest"); //默認密碼
        factory.setPort(5672); //默認端口,對就是這麼屌全部默認的

        this.connection = factory.newConnection();  //創建鏈接

        this.channel = this.connection.createChannel();

    }


    public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //啟用消息確認已經投遞成功的回調


        /**
         * 創建了一個交換器,類型為 direct 非持久化 自動刪除  沒有額外參數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {


            /**
             * 成功的時候回調【這個是當消息到達交換器的時候回調】
             * @param deliveryTag   每一條消息都有一個唯一ID【只是同一個channel唯一】,每次發出消息遞增1 因為同一個channel所有也保證了消息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 發送成功");
                messageMap.remove(message);

                //最後一個消息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }

            /**
             * 失敗的時候回調
             * @param deliveryTag   每一條消息都有一個唯一ID【只是同一個channel唯一】,每次發出消息遞增1 因為同一個channel所有也保證了消息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 發送失敗");
                messageMap.remove(message); //發送失敗就不重發了,發脾氣

                //最後一個消息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }
        });

    }


    public void sendMessage(String message) throws IOException {


        AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                .contentType("text/plain")  //指定是一個文本
                .build();


        // 發送一個消息 到 EXCHANGE_NAME 的交換器中  路由鍵為 KEY_A  發送 message 之前序列化一下 具體用什麼包上面import自己看
        this.channel.basicPublish(EXCHANGE_NAME, "KEY_A", properties, SerializationUtils.serialize(message));

    }

    public void closeAnything() throws IOException {
        this.channel.close();   //跪安吧 小channel
        this.connection.close(); //你也滾吧 connection
    }


    public static void main(String[] args) throws IOException {


        Producer producer = new Producer();
        producer.createConnectionAndChannel();
        producer.initChannelAndCreateExchange();


        List<String> messageList = new ArrayList<String>();
        messageList.add("message_A");
        messageList.add("message_B");
        messageList.add("message_C");
        messageList.add("message_D");
        messageList.add("message_E");
        messageList.add("message_F");


        producer.maxID = messageList.size();    //記錄最後一個ID 當最後一個消息發送成功後關閉連接

        //注意:因為channel產生的ID 是從1開始的
        for (int i = 1; i <= messageList.size(); i++) {

            producer.messageMap.put(new Long(i), messageList.get(i - 1));    //這裡看懂了嗎?沒看懂也沒有辦法了,這裡我真不知道怎麼解釋
            producer.sendMessage(messageList.get(i - 1));

        }


    }
}

消費者:

package com.maxfunner;

import com.maxfunner.mq.QueueConsumer;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * Consumer
 */
public class Consumer {

    private Connection connection;
    private Channel channel;
    private Map<Integer,String> messageMap = new HashMap<Integer, String>();

    private static final String EXCHANGE_NAME = "MY_EXCHANGE";


    /**
     * 對,你猜得一點都沒有錯,我是復制的
     * @throws IOException
     */
    public void createConnectionAndChannel() throws IOException {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");  //服務器地址
        factory.setUsername("guest");  //默認用戶名
        factory.setPassword("guest"); //默認密碼
        factory.setPort(5672); //默認端口,對就是這麼屌全部默認的

        this.connection = factory.newConnection();  //創建鏈接

        this.channel = this.connection.createChannel();

    }


    public void createAndBindQueue() throws IOException {

        /**
         * 創建了一個交換器,類型為 direct 非持久化 自動刪除  沒有額外參數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,true,null); //最好也創建一下交換器,反正已經創建也沒有關系

        /**
         * 創建了一個隊列, 名稱為 QUEUE_A  非持久化 非獨有的 自動刪除的 沒有額外刪除的
         */
        this.channel.queueDeclare("QUEUE_A",false,false,true,null);


        this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");


    }



    public static void main(String args[]) throws IOException {


        final Consumer consumer = new Consumer();

        consumer.createConnectionAndChannel();

        consumer.createAndBindQueue();

        System.out.println("等待消息中。。。。");

        new Thread(new Runnable() {
            public void run() {

                try {

                    /**
                     * 訂閱消息,訂閱隊列QUEUE_A  獲得消息後自動確認
                     */
                    consumer.channel.basicConsume("QUEUE_A", true, new com.rabbitmq.client.Consumer() {

                        public void handleConsumeOk(String consumerTag) {

                        }

                        public void handleCancelOk(String consumerTag) {

                        }

                        public void handleCancel(String consumerTag) throws IOException {

                        }

                        public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {

                        }

                        public void handleRecoverOk(String consumerTag) {

                        }

                        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                            System.out.println("message : " + SerializationUtils.deserialize(body));

                        }
                    });
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();


    }

}

生產者運行結果:

message : message_A ! 發送成功
message : message_B ! 發送成功
message : message_C ! 發送成功
message : message_D ! 發送成功
message : message_E ! 發送成功
message : message_F ! 發送成功

消費者運行結果:

等待消息中。。。。
message : message_A
message : message_B
message : message_C
message : message_D
message : message_E
message : message_F

項目我是用maven創建的貼一下maven 的pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.maxfunner</groupId>
    <artifactId>rabbitlearning</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitlearning</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>


        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
        </dependency>

        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>3.1</version>
        </dependency>

    </dependencies>
</project>

最後一個重點問題

消息“黑洞”,如果在沒有綁定隊列和交換器之前,所有發出的消息都無法匹配到相應的隊列當中,那些消息將永遠不會被消費。而且confirm的callback也是會返回成功的即使消息進入了消息“黑洞”。所以在發送消息之前 必須確定隊列已經綁定,確保消息能分配到相應的隊列當中。 測試很簡單,上面的DEMO 先運行 Producer 顯示所有消息發送成功,然後再運行 Consumer 發現沒有消息可以接收。 再嘗試先運行Consumer 再運行 Producer 就發現一切都正常了,這也是為什麼我把autoDelete設置為true的原因,有了autoDelete當隊列沒有人用的時候就會自動刪除。所以每次運行都可以測試出問題。要保證消息能夠到達指定的隊列最好也在Producer中建立隊列 而且進行相關的綁定 然後再發送消息 修改一下 Producer 的其中一方法即可。 不寫了 累了~

public void initChannelAndCreateExchange() throws IOException {

        this.channel.confirmSelect();   //啟用消息確認已經投遞成功的回調


        /**
         * 創建了一個交換器,類型為 direct 非持久化 自動刪除  沒有額外參數
         */
        this.channel.exchangeDeclare(EXCHANGE_NAME, "direct", false, true, null);

        this.channel.addConfirmListener(new ConfirmListener() {


            /**
             * 成功的時候回調【這個是當消息到達交換器的時候回調】
             * @param deliveryTag   每一條消息都有一個唯一ID【只是同一個channel唯一】,每次發出消息遞增1 因為同一個channel所有也保證了消息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleAck(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 發送成功");
                messageMap.remove(message);

                //最後一個消息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }

            /**
             * 失敗的時候回調
             * @param deliveryTag   每一條消息都有一個唯一ID【只是同一個channel唯一】,每次發出消息遞增1 因為同一個channel所有也保證了消息的流水性。
             * @param multiple
             * @throws IOException
             */
            public void handleNack(long deliveryTag, boolean multiple) throws IOException {

                String message = messageMap.get(deliveryTag);
                System.out.println("message : " + message + " ! 發送失敗");
                messageMap.remove(message); //發送失敗就不重發了,發脾氣

                //最後一個消息都搞掂之後 關閉所有東西
                if (deliveryTag >= maxID) {
                    closeAnything();
                }

            }
        });

        /**
         * 創建了一個隊列, 名稱為 QUEUE_A  非持久化 非獨有的 自動刪除的 沒有額外刪除的
         */
        this.channel.queueDeclare("QUEUE_A",false,false,true,null);


        this.channel.queueBind("QUEUE_A",EXCHANGE_NAME,"KEY_A");

CentOS7下安裝RabbitMQ http://www.linuxidc.com/Linux/2016-11/136812.htm

CentOS 7.2 下 RabbitMQ 集群搭建 http://www.linuxidc.com/Linux/2016-12/137812.htm

CentOS7環境安裝使用專業的消息隊列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

配置與管理RabbitMQ http://www.linuxidc.com/Linux/2016-11/136815.htm

RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm

RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡

Copyright © Linux教程網 All Rights Reserved