亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

NET環境中使用RabbitMQ

2019-11-14 13:29:40
字體:
來源:轉載
供稿:網友

 

一 環境搭建

首先,由于RabbitMQ使用Erlang編寫的,需要運行在Erlang運行時環境上,所以在安裝RabbitMQ Server之前需要安裝Erlang 運行時環境,可以到Erlang官網下載對應平臺的安裝文件。如果沒有安裝運行時環境,安裝RabbitMQ Server的時候,會提示需要先安裝Erlang環境。 安裝完成之后,確保已經將Erlang的安裝路徑注冊到系統的環境變量中。安裝完Erlang之后,這個環境會自動設置,如果沒有:按照下圖進行設置。

 

然后,去RabbitMQ官網下載RabbitMQ Server服務端程序,選擇合適的平臺版本下載。安裝完成之后,就可以開始使用了。

現在就可以對RabbitMQ Server進行配置了。

首先,切換到RabbitMQ Server的安裝目錄:

在sbin下面有很多batch文件,用來控制RabbitMQ Server。

最簡單的方式是使RabbitMQ以Windows Service的方式在后臺運行,所以我們需要以管理員權限打開cmd,然后切換到sbin目錄下,執行這三條命令即可:

rabbitmq-service installrabbitmq-service enablerabbitmq-service start

現在RabbitMQ的服務端已經啟動起來了(如果啟動失敗,請檢查安裝后是否服務已經啟動,如果沒有,則可能是因為安裝版本的原因)。

下面可以使用sbin目錄下面的rabbitmqctl.bat這個腳本來查看和控制服務端狀態的,在cmd中直接運行rabbitmqctl status。如果不是看到以下結果:,需要到C:/Windows目錄下,將.erlang.cookie文件,拷貝到用戶目錄下 C:/Users/{用戶名},這是Erlang的Cookie文件,允許與Erlang進行交互:

RabbitMQ Server上面也有用戶概念,安裝好之后,使用rabbitmqctl list_users命令,可以看到上面目前的用戶:

可以使用下面的命令來添加用戶并設置權限:

rabbitmqctl  add_user  test  123456rabbitmqctl  set_permissions  test  ".*"  ".*"  ".*"rabbitmqctl  set_user_tags test administrator

上面的一條命令添加了一個名為test的用戶,并設置了密碼123456,下面的命令為用戶test分別授予對所有消息隊列的配置、讀和寫的權限。

現在我們可以將默認的guest用戶刪掉,使用下面的命令即可:

rabbitmqctl delete_user guest

如果要修改密碼,可以使用下面的命令:

rabbitmqctl change_passWord {username}  {newpassowrd}

二 開始使用

在.NET中使用RabbitMQ需要下載RabbitMQ的客戶端程序集,可以到官網下載,下載解壓后就可以得到RabbitMQ.Client.dll,這就是RabbitMQ的客戶端。

在使用RabitMQ之前,需要對下面的幾個基本概念說明一下:

RabbitMQ是一個消息代理。他從消息生產者(

producer

queue

consumer

通常,消息生產者,消息消費者和消息代理不在同一臺機器上。

2.1 Hello World

為了展示RabbitMQ的基本使用,我們發送一個HelloWorld消息,然后接收并處理。

rabbitmq hello world

首先創建一個控制臺程序,用來將消息發送到RabbitMQ的消息隊列中,代碼如下:

首先,需要創建一個ConnectionFactory,設置目標,由于是在本機,所以設置為localhost,如果RabbitMQ不在本機,只需要設置目標機器的ip地址或者機器名稱即可,然后設置前面創建的用戶名test和密碼123456。

緊接著要創建一個Channel,如果要發送消息,需要創建一個隊列,然后將消息發布到這個隊列中。在創建隊列的時候,只有RabbitMQ上該隊列不存在,才會去創建。消息是以二進制數組的形式傳輸的,所以如果消息是實體對象的話,需要序列化和然后轉化為二進制數組。

現在客戶端發送代碼已經寫好了,運行之后,消息會發布到RabbitMQ的消息隊列中,現在需要編寫服務端的代碼連接到RabbitMQ上去獲取這些消息。

同樣,創建一個名為Receive的服務端控制臺應用程序,服務端代碼如下:

和發送一樣,首先需要定義連接,然后聲明消息隊列。要接收消息,需要定義一個Consume,然后從消息隊列中不斷Dequeue消息,然后處理。

現在發送端和接收端的代碼都寫好了,運行發送端,發送消息:

 

現在,名為hello的消息隊列中,發送了一條消息。這條消息存儲到了RabbitMQ的服務器上了。使用rabbitmqctl 的list_queues可以查看所有的消息隊列,以及里面的消息個數,可以看到,目前Rabbitmq上只有一個消息隊列,里面只有一條消息:

rabbitmqctl list_queuesListing queues ...hello   1

現在運行接收端程序:

可以看到,已經接受到了客戶端發送的Hello World,現在再來看RabitMQ上的消息隊列信息:

rabbitmqctl list_queuesListing queues ...hello   0

可以看到,hello這個隊列中的消息隊列個數為0,這表示,當接收端,接收到消息之后,RabbitMQ上就把這個消息刪掉了。

2.2 工作隊列

前面的例子展示了如何往一個指定的消息隊列中發送和收取消息?,F在我們創建一個工作隊列(work queue)來將一些耗時的任務分發給多個工作者(workers):

rabbitmq-work queue

工作隊列(work queues, 又稱任務隊列Task Queues)的主要思想是為了避免立即執行并等待一些占用大量資源、時間的操作完成。而是把任務(Task)當作消息發送到隊列中,稍后處理。一個運行在后臺的工作者(worker)進程就會取出任務然后處理。當運行多個工作者(workers)時,任務會在它們之間共享。

這個在網絡應用中非常有用,它可以在短暫的HTTP請求中處理一些復雜的任務。在一些實時性要求不太高的地方,我們可以處理完主要操作之后,以消息的方式來處理其他的不緊要的操作,比如寫日志等等。

準備

在第一部分,發送了一個包含“Hello World!”的字符串消息。現在發送一些字符串,把這些字符串當作復雜的任務。這里使用time.sleep()函數來模擬耗時的任務。在字符串中加上點號(.)來表示任務的復雜程度,一個點(.)將會耗時1秒鐘。比如"Hello..."就會耗時3秒鐘。

對之前示例的send.cs做些簡單的調整,以便可以發送隨意的消息。這個程序會按照計劃發送任務到我們的工作隊列中。

static void Main(string[] args){    var factory = new ConnectionFactory();    factory.HostName = "localhost";    factory.UserName = "test";    factory.Password = "123456";    using (var connection = factory.CreateConnection())    {        using (var channel = connection.CreateModel())        {            channel.QueueDeclare("hello", false, false, false, null);            string message = GetMessage(args);            var properties = channel.CreateBasicProperties();            properties.DeliveryMode = 2;            var body = Encoding.UTF8.GetBytes(message);            channel.BasicPublish("", "hello", properties, body);            Console.WriteLine(" set {0}", message);        }    }    Console.ReadKey();}private static string GetMessage(string[] args){    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}

加粗部分是經過修改過了的。

接著我們修改接收端,讓他根據消息中的逗點的個數來Sleep對應的秒數:

static void Main(string[] args){    var factory = new ConnectionFactory();    factory.HostName = "localhost";    factory.UserName = "test";    factory.Password = "123456";    using (var connection = factory.CreateConnection())    {        using (var channel = connection.CreateModel())        {            channel.QueueDeclare("hello", false, false, false, null);            var consumer = new QueueingBasicConsumer(channel);            channel.BasicConsume("hello", true, consumer);            while (true)            {                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                var body = ea.Body;                var message = Encoding.UTF8.GetString(body);                int dots = message.Split('.').Length - 1;                Thread.Sleep(dots * 1000);                                        Console.WriteLine("Received {0}", message);                Console.WriteLine("Done");            }        }    }}

輪詢分發

使用工作隊列的一個好處就是它能夠并行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了,擴展很簡單。

現在,我們先啟動兩個接收端,等待接受消息,然后啟動一個發送端開始發送消息。

Send message queue 

在cmd條件下,發送了5條消息,每條消息后面的逗點表示該消息需要執行的時長,來模擬耗時的操作。

然后可以看到,兩個接收端依次接收到了發出的消息:

receive message queue 

默認,RabbitMQ會將每個消息按照順序依次分發給下一個消費者。所以每個消費者接收到的消息個數大致是平均的。 這種消息分發的方式稱之為輪詢(round-robin)。

2.3 消息響應

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運行到一半就掛掉。在當前的代碼中,當RabbitMQ將消息發送給消費者(consumers)之后,馬上就會將該消息從隊列中移除。此時,如果把處理這個消息的工作者(worker)停掉,正在處理的這條消息就會丟失。同時,所有發送到這個工作者的還沒有處理的消息都會丟失。

我們不想丟失任何任務消息。如果一個工作者(worker)掛掉了,我們希望該消息會重新發送給其他的工作者(worker)。

為了防止消息丟失,RabbitMQ提供了消息響應(acknowledgments)機制。消費者會通過一個ack(響應),告訴RabbitMQ已經收到并處理了某條消息,然后RabbitMQ才會釋放并刪除這條消息。

如果消費者(consumer)掛掉了,沒有發送響應,RabbitMQ就會認為消息沒有被完全處理,然后重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失消息。

消息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送消息。這樣在處理一個耗時非常長的消息任務的時候就不會出問題了。

消息響應默認是開啟的。在之前的例子中使用了no_ack=True標識把它關閉。是時候移除這個標識了,當工作者(worker)完成了任務,就發送一個響應。

channel.BasicConsume("hello", false, consumer);while (true){    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();    var body = ea.Body;    var message = Encoding.UTF8.GetString(body);    int dots = message.Split('.').Length - 1;    Thread.Sleep(dots * 1000);    Console.WriteLine("Received {0}", message);    Console.WriteLine("Done");    channel.BasicAck(ea.DeliveryTag, false);}

現在,可以保證,即使正在處理消息的工作者被停掉,這些消息也不會丟失,所有沒有被應答的消息會被重新發送給其他工作者.

一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,但是后果很嚴重. 當客戶端退出時,待處理的消息就會被重新分發,但是RabitMQ會消耗越來越多的內存,因為這些沒有被應答的消息不能夠被釋放。調試這種case,可以使用rabbitmqct打印messages_unacknoledged字段。

rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello    0       0...done.

2.4 消息持久化

前面已經搞定了即使消費者down掉,任務也不會丟失,但是,如果RabbitMQ Server停掉了,那么這些消息還是會丟失。

當RabbitMQ Server 關閉或者崩潰,那么里面存儲的隊列和消息默認是不會保存下來的。如果要讓RabbitMQ保存住消息,需要在兩個地方同時設置:需要保證隊列和消息都是持久化的。

首先,要保證RabbitMQ不會丟失隊列,所以要做如下設置:

bool durable = true;channel.QueueDeclare("hello", durable, false, false, null);

雖然在語法上是正確的,但是在目前階段是不正確的,因為我們之前已經定義了一個非持久化的hello隊列。RabbitMQ不允許我們使用不同的參數重新定義一個已經存在的同名隊列,如果這樣做就會報錯?,F在,定義另外一個不同名稱的隊列:

bool durable = true;channel.queueDeclare("task_queue", durable, false, false, null);

queueDeclare 這個改動需要在發送端和接收端同時設置。

現在保證了task_queue這個消息隊列即使在RabbitMQ Server重啟之后,隊列也不會丟失。 然后需要保證消息也是持久化的, 這可以通過設置IBasicProperties.SetPersistent 為true來實現:

var properties = channel.CreateBasicProperties();properties.SetPersistent(true);

需要注意的是,將消息設置為持久化并不能完全保證消息不丟失。雖然他告訴RabbitMQ將消息保存到磁盤上,但是在RabbitMQ接收到消息和將其保存到磁盤上這之間仍然有一個小的時間窗口。 RabbitMQ 可能只是將消息保存到了緩存中,并沒有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對于一個簡單任務隊列來說已經足夠。如果需要消息隊列持久化的強保證,可以使用publisher confirms

2.5 公平分發

你可能會注意到,消息的分發可能并沒有如我們想要的那樣公平分配。比如,對于兩個工作者。當奇數個消息的任務比較重,但是偶數個消息任務比較輕時,奇數個工作者始終處理忙碌狀態,而偶數個工作者始終處理空閑狀態。但是RabbitMQ并不知道這些,他仍然會平均依次的分發消息。

為了改變這一狀態,我們可以使用basicQos方法,設置perfetchCount=1 。這樣就告訴RabbitMQ 不要在同一時間給一個工作者發送多于1個的消息,或者換句話說。在一個工作者還在處理消息,并且沒有響應消息之前,不要給他分發新的消息。相反,將這條新的消息發送給下一個不那么忙碌的工作者。

channel.BasicQos(0, 1, false); 

2.6 完整實例

現在將所有這些放在一起:

發送端代碼如下:

static void Main(string[] args){    var factory = new ConnectionFactory();    factory.HostName = "localhost";    factory.UserName = "test";    factory.Password = "123456";    using (var connection = factory.CreateConnection())    {        using (var channel = connection.CreateModel())        {                               bool durable = true;            channel.QueueDeclare("task_queue", durable, false, false, null);                                string message = GetMessage(args);            var properties = channel.CreateBasicProperties();            properties.SetPersistent(true);                              var body = Encoding.UTF8.GetBytes(message);            channel.BasicPublish("", "task_queue", properties, body);            Console.WriteLine(" set {0}", message);        }    }    Console.ReadKey();}private static string GetMessage(string[] args){    return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!");}

接收端代碼如下:

static void Main(string[] args){    var factory = new ConnectionFactory();    factory.HostName = "localhost";    factory.UserName = "test";    factory.Password = "123456";    using (var connection = factory.CreateConnection())    {        using (var channel = connection.CreateModel())        {            bool durable = true;            channel.QueueDeclare("task_queue", durable, false, false, null);            channel.BasicQos(0, 1, false);            var consumer = new QueueingBasicConsumer(channel);            channel.BasicConsume("task_queue", false, consumer);            while (true)            {                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();                var body = ea.Body;                var message = Encoding.UTF8.GetString(body);                int dots = message.Split('.').Length - 1;                Thread.Sleep(dots * 1000);                Console.WriteLine("Received {0}", message);                Console.WriteLine("Done");                channel.BasicAck(ea.DeliveryTag, false);            }        }    }}

三 管理界面

RabbitMQ還有一個管理界面,通過該界面可以查看RabbitMQ Server 當前的狀態,該界面是以插件形式提供的,并且在安裝RabbitMQ的時候已經自帶了該插件。需要做的是在RabbitMQ控制臺界面中啟用該插件,命令如下:

rabbitmq-plugins enable rabbitmq_management

rabbitmq management

現在,在瀏覽器中輸入 http://server-name:15672/ server-name換成機器地址或者域名,如果是本地的,直接用localhost在輸入之后,彈出登錄界面,使用我們之前創建的用戶登錄。

 .

在該界面上可以看到當前RabbitMQServer的所有狀態。

四 總結

本文簡單介紹了消息隊列的相關概念,并介紹了RabbitMQ消息代理的基本原理以及在Windows 上如何安裝RabbitMQ和在.NET中如何使用RabbitMQ。消息隊列在構建分布式系統和提高系統的可擴展性和響應性方面有著很重要的作用,希望本文對您了解消息隊列以及如何使用RabbitMQ有所幫助。

五 參考文獻

  1. http://www.infoq.com/cn/articles/message-based-distributed-architecture
  2. http://www.rabbitmq.com/getstarted.html
  3. http://www.codethinked.com/using-rabbitmq-with-c-and-net
  4. http://www.infoq.com/cn/articles/AMQP-RabbitMQ
  5. http://www.infoq.com/cn/articles/ebay-scalability-best-practices

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美理论电影网| 色综合老司机第九色激情| 亚洲人成电影网站色…| 最新国产成人av网站网址麻豆| 九九视频直播综合网| 亚洲大胆人体视频| 91社影院在线观看| 欧美性猛交xxxx偷拍洗澡| 国产在线精品自拍| 亚洲成人教育av| 日韩在线免费高清视频| 日韩欧美在线观看| 欧美成人免费全部观看天天性色| 综合网中文字幕| 欧美不卡视频一区发布| 亚洲网在线观看| 久久人人爽人人爽人人片av高请| 一本色道久久88综合亚洲精品ⅰ| 亚洲a级在线播放观看| 国产精品久久久久久久久久东京| 亚洲国产毛片完整版| 国产视频久久网| 国产精品无码专区在线观看| 国产亚洲欧美另类中文| 国产成人精品久久二区二区| 日韩av免费在线| 亚洲福利在线视频| 在线播放精品一区二区三区| 久久久电影免费观看完整版| 欧美午夜性色大片在线观看| 亚洲高清一区二| 97国产真实伦对白精彩视频8| 欧美性开放视频| 国产综合在线视频| 精品无人区乱码1区2区3区在线| 亚洲综合国产精品| 色偷偷91综合久久噜噜| 在线观看国产精品淫| 国产中文字幕91| 亚洲福利在线播放| 久久这里有精品| 久久天天躁狠狠躁老女人| 国产精品直播网红| 日韩欧美在线一区| 91精品国产91久久久久久吃药| 一本大道亚洲视频| 丝袜美腿精品国产二区| 久久99精品久久久久久琪琪| 精品中文字幕久久久久久| 中文字幕视频在线免费欧美日韩综合在线看| 中文字幕视频在线免费欧美日韩综合在线看| 欧美乱妇40p| 精品国产福利在线| 久久色免费在线视频| 久久久精品久久久久| 欧美黄色成人网| 中文字幕国产精品| 日韩亚洲精品视频| 亚洲精品女av网站| 深夜福利亚洲导航| 欧美www视频在线观看| 日本国产欧美一区二区三区| 日本欧美精品在线| 亚洲999一在线观看www| 国外成人在线视频| 欧美激情视频在线观看| 午夜精品美女自拍福到在线| 欧美亚洲国产视频| 亚洲激情小视频| 亚洲欧美一区二区三区情侣bbw| 国产精品激情av电影在线观看| 亚洲第一视频网| 77777亚洲午夜久久多人| 欧美性猛交xxxx黑人| 亚洲色图综合久久| 国产成+人+综合+亚洲欧美丁香花| 国产亚洲欧洲在线| 91精品国产综合久久久久久久久| 久久九九全国免费精品观看| 中文字幕欧美精品日韩中文字幕| 久久乐国产精品| 亚洲va欧美va国产综合久久| 国产成人精品免高潮在线观看| 91高清视频在线免费观看| 91精品啪在线观看麻豆免费| 欧美成人剧情片在线观看| 日韩电影中文字幕av| 欧美激情精品久久久久久| 日韩免费中文字幕| 国产成人avxxxxx在线看| 色妞久久福利网| 久久久国产精彩视频美女艺术照福利| 国产精品中文字幕久久久| 国产97在线|日韩| 欧美精品日韩www.p站| 国产精品自产拍在线观| 精品中文视频在线| 国产日韩中文字幕| 亚洲欧美日韩第一区| 欧美贵妇videos办公室| 国产精品91久久久| 久久久久久成人| 57pao成人国产永久免费| 欧美黑人性生活视频| 国产精品视频自在线| 成人精品一区二区三区电影黑人| 亚洲欧美日韩久久久久久| 亚洲网站视频福利| 69**夜色精品国产69乱| 91欧美精品午夜性色福利在线| 91高清视频在线免费观看| 大量国产精品视频| 久久6精品影院| 欧美大片在线免费观看| 午夜欧美大片免费观看| 亚洲日本中文字幕| 午夜精品久久久久久99热| 亚洲人成电影网| 国产精品免费久久久久影院| 欧美日韩国产一中文字不卡| 国产亚洲精品久久久久久777| 日韩亚洲精品视频| 亚洲最大的免费| 日韩精品免费观看| 久久人人爽人人爽人人片亚洲| 91精品视频免费| 国产欧美精品一区二区三区-老狼| 亚洲四色影视在线观看| 国产精品久久久一区| 成人免费直播live| 91日本在线观看| 欧美激情一区二区三区高清视频| 国产精品久久久久久久久借妻| 亚洲国产又黄又爽女人高潮的| 91日本在线视频| 中日韩美女免费视频网站在线观看| 亚洲精品日韩丝袜精品| 久久伊人免费视频| 精品福利在线观看| 45www国产精品网站| 欧美精品一区二区免费| 日韩中文字幕在线| 91在线|亚洲| 国产在线视频不卡| 亚洲欧美日韩精品| 国产一区二区三区在线| 国产精品91在线| 亚洲福利在线观看| 国产精品亚洲片夜色在线| 中文字幕日韩在线视频| 国产亚洲精品一区二区| 国产精品视频地址| 亚洲国产高清福利视频| 91九色视频导航| 最新亚洲国产精品| 国产精品视频公开费视频| 一个人www欧美| 欧美日韩亚洲一区二区三区| 亚洲综合最新在线| 自拍偷拍亚洲欧美| 国产成人小视频在线观看| 亚洲欧美综合图区| 97人人模人人爽人人喊中文字| 久久久久久久久久国产|