AMQP ,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用于組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。網站在: http://www.rabbitmq.com/ 上面有各種語言教程和實例代碼
AMPQ協議為了能夠滿足各種消息隊列需求,在概念上比較復雜,了解了這些概念,是使用好RabbitMQ的基礎。
vhosts : html' target='_blank'>虛擬主機虛擬主機( virtual host ):一個虛擬主機持有一組交換機、隊列和綁定。為什么需要多個虛擬主機呢? RabbitMQ 當中,用戶只能在虛擬主機的粒度進行權限控制。因此,如果需要禁止 A 組訪問 B 組的交換機 / 隊列 / 綁定,必須為 A 和 B 分別創建一個虛擬主機。每一個 RabbitMQ 服務器都有一個默認的虛擬主機 “/” 。
一個RabbitMQ的Server上可以有多個vhosts,用戶與權限設置就是依附于vhosts。對一般PHP應用,不需要用戶權限設定,直接使用默認就存在的”/”就可以了,用戶可以使用默認就存在的”guest”。一個簡單的配置示例:
$conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost'=>'/');connection 與 channel : 連接與信道
connection是指物理的連接,一個client與一個server之間有一個連接;一個連接上可以建立多個channel,可以理解為邏輯上的連接。一般應用的情況下,有一個channel就夠用了,不需要創建更多的channel。示例代碼:
//創建連接和channel$conn = new AMQPConnection($conn_args);if (!$conn->connect()) { die("Cannot connect to the broker!/n");}$channel = new AMQPChannel($conn);Exchange 與 routingkey : 交換機 與 路由鍵
為了將不同類型的message進行區分,設置了Exchange交換機與Route路由兩個概念。比如,將A類型的message發送到名為‘C1’的交換機,將類型為B的發送到’C2′的交換機。當客戶端連接C1處理隊列消息時,取到的就只是A類型message。進一步的,如果A類型message也非常多,需要進一步細化區分,比如某個客戶端只處理A類型message中針對K用戶的message,routingkey就是來做這個用途的。
$e_name = 'e_linvo'; //交換機名$k_route = array(0=> 'key_1', 1=> 'key_2'); //路由key//創建交換機$ex = new AMQPExchange($channel);$ex->setName($e_name);$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型$ex->setFlags(AMQP_DURABLE); //持久化echo "Exchange Status:".$ex->declare()."/n";for($i=0; $ipublish($message . date('H:i:s'), $k_route[i%2])."/n";}
由以上代碼可以看到,發送消息時,只要有“交換機”就夠了。至于交換機后面有沒有對應的處理隊列,發送方是不用管的。routingkey可以是空的字符串。在示例中,我使用了兩個key交替發送消息,是為了下面更便于理解routingkey的作用。
對于交換機,有兩個重要的概念:
交換機( Exchange ):可以理解成具有路由表的路由程序。每個消息都有一個路由鍵( routing key ),就是一個簡單的字符串。交換機中有一系列的綁定( binding ),即路由規則( routes )。交換機可以有多個。多個隊列可以和同一個交換機綁定,同時多個交換機也可以和同一個隊列綁定。(多對多的關系)
A,類型。有三種類型:
1. Fanout Exchange (不處理路由鍵):一個發送到交換機上的消息都會被轉發到與該交換機綁定的所有隊列上。 Fanout 交換機發消息是最快的。
2. Direct Exchange (處理路由鍵):如果一個隊列綁定到該交換機上,并且當前要求路由鍵為 X ,只有路由鍵是 X 的消息才會被這個隊列轉發。
3. Topic Exchange (將路由鍵和某模式進行匹配,可以理解成模糊處理):路由鍵的詞由 “.” 隔開,符號 “#” 表示匹配 0 個或多個詞,符號 “*” 表示匹配不多不少一個詞。
類型總結:Fanout類型最簡單,這種模型忽略routingkey;Direct類型是使用最多的,使用確定的routingkey。這種模型下,接收消息時綁定’key_1′則只接收key_1的消息;最后一種是Topic,這種模式與Direct類似,但是支持通配符進行匹配,比如: ‘key_*’,就會接受key_1和key_2。Topic貌似美好,但是有可能導致不嚴謹,所以還是推薦使用Direct。
B,持久化。指定了持久化的交換機,在重新啟動時才能重建,否則需要客戶端重新聲明生成才行。
需要特別明確的概念:交換機的持久化,并不等于消息的持久化。只有在持久化隊列中的消息,才能持久化;如果沒有隊列,消息是沒有地方存儲的;消息本身在投遞時也有一個持久化標志的,PHP中默認投遞到持久化交換機就是持久的消息,不用特別指定。
4,queue: 隊列講了這么多,才講到隊列呀。事實上,隊列僅是針對接收方(consumer)的,由接收方根據需求創建的。只有隊列創建了,交換機才會將新接受到的消息送到隊列中,交換機是不會在隊列創建之前的消息放進來的。換句話說,在建立隊列之前,發出的所有消息都被丟棄了。下面這個圖比RabbitMQ官方的圖更清楚——Queue是屬于ReceiveMessage的一部分。
接下來看一下創建隊列及接收消息的示例:
$e_name = 'e_linvo'; //交換機名$q_name = 'q_linvo'; //隊列名$k_route = ''; //路由key //創建連接和channel$conn = new AMQPConnection($conn_args);if (!$conn->connect()) { die("Cannot connect to the broker!/n"); } $channel = new AMQPChannel($conn); //創建交換機 $ex = new AMQPExchange($channel);$ex->setName($e_name);$ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型$ex->setFlags(AMQP_DURABLE); //持久化echo "Exchange Status:".$ex->declare()."/n"; //創建隊列$q = new AMQPQueue($channel);$q->setName($q_name);$q->setFlags(AMQP_DURABLE); //持久化 //綁定交換機與隊列,并指定路由鍵echo 'Queue Bind: '.$q->bind($e_name, $k_route)."/n"; //阻塞模式接收消息echo "Message:/n";$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 $conn->disconnect();/** * 消費回調函數 * 處理消息 */function processMessage($envelope, $queue) { var_dump($envelope->getRoutingKey); $msg = $envelope->getBody(); echo $msg."/n"; //處理消息}
從上述示例中可以看到,交換機既可以由消息發送端創建,也可以由消息消費者創建。
創建一個隊列(line:20)后,需要將隊列綁定到交換機上(line:25)隊列才能工作,routingkey也是在這里指定的。有的資料上寫成bindingkey,其實一回事兒,弄兩個名詞反倒容易混淆。
消息的處理,是有兩種方式:
A,一次性。用 $q->get([...]),不管取到取不到消息都會立即返回,一般情況下使用輪詢處理消息隊列就要用這種方式;
B,阻塞。用 $q->consum( callback, [...] ) 程序會進入持續偵聽狀態,每收到一個消息就會調用callback指定的函數一次,直到某個callback函數返回FALSE才結束。
關于callback,這里多說幾句: PHP的call_back是支持使用數組的,比如: $c = new MyClass(); $c->counter = 100; $q->consume( array($c,’myfunc’) ) 這樣就可以調用自己寫的處理類。MyClass中myfunc的參數定義,與上例中processMessage一樣就行。
在上述示例中,使用的$routingkey = ”, 意味著接收全部的消息。我們可以將其改為 $routingkey = ‘key_1′,可以看到結果中僅有設置routingkey為key_1的內容了。
注意: routingkey = ‘key_1′ 與 routingkey = ‘key_2′ 是兩個不同的隊列。假設: client1 與 client2 都連接到 key_1 的隊列上,一個消息被client1處理之后,就不會被client2處理。而 routingkey = ” 是另類,client_all綁定到 ” 上,將消息全都處理后,client1和client2上也就沒有消息了。
在程序設計上,需要規劃好exchange的名稱,以及如何使用key區分開不同類型的標記,在消息產生的地方插入發送消息代碼。后端處理,可以針對每一個key啟動一個或多個client,以提高消息處理的實時性。如何使用PHP進行多線程的消息處理,將在下一節中講述。
安裝erlang依賴的基本環境#操作系統:CentOS release 6.2yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel java-devel unixODBC-devel;Erlang安裝方式一:源碼編譯
訪問 網址下載頁
wget http://www.erlang.org/download/otp_src_R16B03.tar.gz;tar -zxvf otp_src_R16B03.tar.gz;cd otp_src_R16B03;./configure --prefix=/usr/local/erlang --with-ssl -enable-threads -enable-smmp-support -enable-kernel-poll --enable-hipe --without-javac;#不用java編譯,故去掉java避免錯誤make && make install;配置erlang環境
#vi /etc/profile在文件最后加入:PATH=$PATH:/usr/local/erlang/binexport PATH#source /etc/profileErlang安裝方式二:YUM安裝安裝erlang的YUM源
訪問 網址YUM安裝教程
#自動安裝erlang的YUM源wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpmrpm -Uvh erlang-solutions-1.0-1.noarch.rpm#或手動安裝YUM源rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.ascAdd the following lines to some file in /etc/yum.repos.d/:[erlang-solutions]name=Centos $releasever - $basearch - Erlang Solutionsbaseurl=http://packages.erlang-solutions.com/rpm/centos/$releasever/$basearchgpgcheck=1gpgkey=http://packages.erlang-solutions.com/rpm/erlang_solutions.ascenabled=1yum erlang
yum -y install erlang;安裝成功檢測
安裝完后輸入“erl”以下提示即為安裝成功:
[root@localhost ~]# erlErlang/OTP 18 [erts-7.2] [source-e6dd627] [64-bit] [async-threads:10] [hipe] [kernel-poll:false]Eshell V7.2 (abort with ^G)
鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯系我們修改或刪除,多謝。
新聞熱點
疑難解答