歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
Linux教程網 >> Linux基礎 >> Linux教程 >> RabbitMQ入門教程

RabbitMQ入門教程

日期:2017/2/27 15:55:26   编辑:Linux教程
RabbitMQ 是一個消息代理。這主要的原理十分簡單,就是通過接受和轉發消息。你可以把它想象成郵局:當你將一個包裹送到郵局,你會相信郵遞員先生最終會將郵件送到接件人手上。RabbitMQ就好比一個郵箱,郵局或郵遞員。郵局和 RabbitMQ 兩種主要的不同之處在於,RabbitMQ 不處理文件,而是接受,並存儲和以二進制形式將消息轉發。

前面聲明本文都是RabbitMQ的官方指南翻譯過來的,由於本人水平有限難免有翻譯不當的地方,如發現不對的地方,請聯系下我,好及時改正。好了,正文開始:

RabbitMQ 是一個消息代理。這主要的原理十分簡單,就是通過接受和轉發消息。你可以把它想象成郵局:當你將一個包裹送到郵局,你會相信郵遞員先生最終會將郵件送到接件人手上。RabbitMQ就好比一個郵箱,郵局或郵遞員。

郵局和RabbitMQ兩種主要的不同之處在於,RabbitMQ不處理文件,而是接受,並存儲和以二進制形式將消息轉發。

RabbitMQ,在消息的傳送過程中,我們使用一些標准稱呼。

生產過程就像發送過程,發送消息的程序就是一個生產者,我們使用“P”來描述它。

producer

隊列是好比郵筒的稱呼,它位於RabbitMQ內部,雖然消息流通過RabbitMQ和你的應用程序,但是它們僅僅存儲在隊列中。一個隊列沒有范 圍限制,你可以想存儲多少就存儲多少,本質上來說它是無限大的緩存。多個生產者可以通過一個隊列發送消息,同樣多個消費者也可以通同一個消息隊列中接收消 息。隊列是畫成這樣,名字在它的上面:

queue

消費過程與接收相似,一個消費者通常是一個等著接受消息的程序,我們使用"C"來描述:

consumer

注意,那生產者,消費者和代理者不需要一定在一個機器上,事實上,大多數應用程序中,他們並不在一個機器上。

“Hello World”

(使用java客戶端)

在這部分指南中,我們將要使用java寫兩個程序;一個發送簡單消息的生產者和一個接收消息並輸出出來的消費者。我們會忽視掉一些Java API的細節,為了開始僅僅精選在這簡單的事情上,這是一個"Hello World"消息。

java-one

Java 客戶端庫
RabbitMQ 遵循AMQP協議,那是一個開放的,並且通用的消息協議。在不同語言中有數種AMQP客戶端,我們使用由RabbitMQ提供的Java客戶端。
下載客戶端庫包,檢驗簽名,將它解壓縮到你的工作路徑,從解壓到的路徑中提取JAR文件:

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(RabbitMQ Java客戶端也存在Maven中央庫中,groupIdcom.rabbitmq,artifactIdamqp-client.)

現在我們已經有了Java客戶端和依賴文件,我們可以寫一些代碼了。

發送

sending.png

我們將會讓我們的消息發送者發送消息,我們的接收者接收消息。發送者連接到RabbitMQ上,發送一個簡單的消息,然後退出。

Send.java,我們需要引入一些類:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

建立這個類,為隊列命名:

public class Send {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException {
      ...
  }
}

接著,我們創建一個服務器的連接:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

抽象的socket連接,注意協議版本的處理以及授權,諸如此類的事情。
這裡我們連接到本地機器上的代理,因此它是localhost。如果我們想連接到不同機器上的代理,只需要說明它的主機名和IP地址。

接下來我們創建一個通道,獲取操作的大多數API都位於這上。

對於發送,我們必須聲明一個發送隊列,然後我們把消息發送到這個隊列上:

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Declaring a queue is idempotent - it will only be created if it doesn't exist already. The message content is a byte array, so you can encode whatever you like there.

Lastly, we close the channel and the connection;
聲明一個隊列是冪等的,僅僅在要聲明的隊列不存在時才創建。消息內容是二進制數組,所以你可以隨你喜好編碼。

channel.close();

connection.close();

Here's the whole Send.java class.

發送沒有起作用

如果你是第一次使用RabbitMQ並且你沒有看到"Sent"消息,你可能抓耳撓腮的想到底是哪裡出的問題。可能是代理啟動時沒有足夠空間(默 認它需要至少1Gb 空間),因此拒絕接受消息。通過檢查代理的日志文件來確定這個問題,必要情況下可以降低限制大小。配置文件的文檔將會告訴你怎樣設置disk_free_limit

接收

上面代碼是構建我們的發送者。我們的接收者是從RabbitMQ中提取消息,所以不像發送者那樣發送一個簡單的消息,我們需要一直運行監聽消息並且輸出消息。
receiving

在Recv.java中的代碼有與Send中幾乎相同的引用:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;

這額外的QueueingConsumer類是用來緩存從服務器那裡發出來的信息。

跟創建發送者相同,我們打開一個連接和一個通道,聲明一個我們要消費的隊列。注意要與發送的隊列相匹配。

             java.lang.InterruptedException {

  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException {

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    }
}

注意我們在這裡同樣聲明了一個隊列。以為我們可能在發送者之前啟動接收者,在我們從中獲取消息之前我們想要確定這隊列是否真實存在。
我們通知服務器通過此隊列給我們發送消息。因此服務器會異步的給我們推送消息,在這裡我們提供一個回調對象用來緩存消息,直到我們准備好再使用它們。這就是QueueingConsumer所做的事。

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);

while (true) {
  QueueingConsumer.Delivery delivery = consumer.nextDelivery();
  String message = new String(delivery.getBody());
  System.out.println(" [x] Received '" + message + "'");
}

QueueingConsumer.nextDelivery()在另一個來自服務器的消息到來之前它會一直阻塞著。

這是整個Recv.java類。

把所有放在一起

你可以在RabbitMQ Java客戶端的類路徑上編譯這些文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

為了運行它們,你需要rabbitma-client.jar和它在類路徑上的的依賴文件。在一個終端上,運行發送者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然後,運行接收者:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

在windows環境中,我們使用分號代替冒號來分隔類路徑上的選項。

接收者將會輸出從RabbitMQ中獲取到來自發送者的消息。接收者會一直保持運行,等待消息(使用Ctrl-C停止),所以試著用另一個終端運行發送者。
如果你想檢驗隊列,試著使用rabbitmqctl list_queues0

Hello World!

時間移動到第二部分,構建一個簡單的工作隊列。

提示
為了保存輸入,你可以將類路徑設置到環境變量中

\$ export CP=.:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
\$ java -cp $CP Send

或者在 Windows環境中:

\> set CP=.;commons-io-1.2.jar;commons-cli-1.1.jar;rabbitmq-client.jar
\> java -cp %CP% Send
原文:http://www.oschina.net/news/59935/rabbitmq-manual
Copyright © Linux教程網 All Rights Reserved