亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > Python > 正文

利用Python學習RabbitMQ消息隊列

2020-01-04 17:55:23
字體:
來源:轉載
供稿:網友
RabbitMQ和郵局的主要區別就是RabbitMQ接收、存儲和發送的是二進制數據----消息,本篇文章給大家介紹利用Python學習RabbitMQ消息隊列,對python消息隊列相關知識感興趣的朋友參考下
 

RabbitMQ可以當做一個消息代理,它的核心原理非常簡單:即接收和發送消息,可以把它想象成一個郵局:我們把信件放入郵箱,郵遞員就會把信件投遞到你的收件人處,RabbitMQ就是一個郵箱、郵局、投遞員功能綜合體,整個過程就是:郵箱接收信件,郵局轉發信件,投遞員投遞信件到達收件人處。

RabbitMQ和郵局的主要區別就是RabbitMQ接收、存儲和發送的是二進制數據----消息。

rabbitmq基本管理命令:

一步啟動Erlang node和Rabbit應用:sudo rabbitmq-server

在后臺啟動Rabbit node:sudo rabbitmq-server -detached

關閉整個節點(包括應用):sudo rabbitmqctl stop

add_user <UserName> <Password>delete_user <UserName>change_password <UserName> <NewPassword>list_usersadd_vhost <VHostPath>delete_vhost <VHostPath>list_vhostsset_permissions [-p <VHostPath>] <UserName> <Regexp> <Regexp> <Regexp>clear_permissions [-p <VHostPath>] <UserName>list_permissions [-p <VHostPath>]list_user_permissions <UserName>list_queues [-p <VHostPath>] [<QueueInfoItem> ...]list_exchanges [-p <VHostPath>] [<ExchangeInfoItem> ...]list_bindings [-p <VHostPath>]list_connections [<ConnectionInfoItem> ...]

Demo:

producer.py

 #!/usr/bin/env python # -*- coding: utf_ -*- # Date: 年月日 # Author:蔚藍行 # 博客 http://www.cnblogs.com/duanv/ import pika import sys #創建連接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #創建虛擬連接channel cha = con.channel() #創建隊列anheng,durable參數為真時,隊列將持久化;exclusive為真時,建立臨時隊列 result=cha.queue_declare(queue='anheng',durable=True,exclusive=False) #創建名為yanfa,類型為fanout的exchange,其他類型還有direct和topic,如果指定durable為真,exchange將持久化 cha.exchange_declare(durable=False,           exchange='yanfa',           type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa',         queue=result.method.queue,        routing_key='',)  #公平分發,使每個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) #發送信息到隊列‘anheng' message = ' '.join(sys.argv[:]) #消息持久化指定delivery_mode=; cha.basic_publish(exchange='',          routing_key='anheng',          body=message,          properties=pika.BasicProperties(           delivery_mode = ,         )) print '[x] Sent %r' % (message,) #關閉連接 con.close()

consumer.py

 #!/usr/bin/env python # -*- coding: utf_ -*- # Date: 年月日 # Author:蔚藍行 # 博客 http://www.cnblogs.com/duanv/ import pika #建立連接connection到localhost con = pika.BlockingConnection(pika.ConnectionParameters('localhost')) #創建虛擬連接channel cha = con.channel() #創建隊列anheng result=cha.queue_declare(queue='anheng',durable=True) #創建名為yanfa,類型為fanout的交換機,其他類型還有direct和topic cha.exchange_declare(durable=False,           exchange='yanfa',            type='direct',) #綁定exchange和queue,result.method.queue獲取的是隊列名稱 cha.queue_bind(exchange='yanfa',        queue=result.method.queue,        routing_key='',) #公平分發,使每個consumer在同一時間最多處理一個message,收到ack前,不會分配新的message cha.basic_qos(prefetch_count=) print ' [*] Waiting for messages. To exit press CTRL+C' #定義回調函數 def callback(ch, method, properties, body):   print " [x] Received %r" % (body,)   ch.basic_ack(delivery_tag = method.delivery_tag) cha.basic_consume(callback,          queue='anheng',          no_ack=False,) cha.start_consuming()

一、概念:

Connection: 一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。程序的起始處就是建立這個TCP連接。

Channels: 虛擬連接。建立在上述的TCP連接中。數據流動都是在Channel中進行的。一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。

二、隊列:

首先建立一個Connection,然后建立Channels,在channel上建立隊列

建立時指定durable參數為真,隊列將持久化;指定exclusive為真,隊列為臨時隊列,關閉consumer后該隊列將不再存在,一般情況下建立臨時隊列并不指定隊列名稱,rabbitmq將隨機起名,通過result.method.queue來獲取隊列名:

result = channel.queue_declare(exclusive=True)

result.method.queue

區別:durable是隊列持久化與否,如果為真,隊列將在rabbitmq服務重啟后仍存在,如果為假,rabbitmq服務重啟前不會消失,與consumer關閉與否無關;

而exclusive是建立臨時隊列,當consumer關閉后,該隊列就會被刪除

三、exchange和bind

Exchange中durable參數指定exchange是否持久化,exchange參數指定exchange名稱,type指定exchange類型。Exchange類型有direct,fanout和topic。

Bind是將exchange與queue進行關聯,exchange參數和queue參數分別指定要進行bind的exchange和queue,routing_key為可選參數。

Exchange的三種模式:

Direct:

任何發送到Direct Exchange的消息都會被轉發到routing_key中指定的Queue

1.一般情況可以使用rabbitMQ自帶的Exchange:””(該Exchange的名字為空字符串);

2.這種模式下不需要將Exchange進行任何綁定(bind)操作;

3.消息傳遞時需要一個“routing_key”,可以簡單的理解為要發送到的隊列名字;

4.如果vhost中不存在routing_key中指定的隊列名,則該消息會被拋棄。

Demo中雖然聲明了一個exchange='yanfa'和queue='anheng'的bind,但是在后面發送消息時并沒有使用該exchange和bind,而是采用了direct的模式,沒有指定exchange,而是指定了routing_key的名稱為隊列名,消息將發送到指定隊列。

如果一個exchange 聲明為direct,并且bind中指定了routing_key,那么發送消息時需要同時指明該exchange和routing_key.

Fanout:

任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上

1.可以理解為路由表的模式

2.這種模式不需要routing_key

3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。

4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。

Demo中創建了一個將一個exchange和一個queue進行fanout類型的bind.但是發送信息時沒有用到它,如果要用到它,只要在發送消息時指定該exchange的名稱即可,該exchange就會將消息發送到所有和它bind的隊列中。在fanout模式下,指定的routing_key是無效的 。

Topic:

任何發送到Topic Exchange的消息都會被轉發到所有關心routing_key中指定話題的Queue上

1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(routing_key),Exchange會將消息轉發到所有關注主題能與routing_key模糊匹配的隊列。

2.這種模式需要routing_key,也許要提前綁定Exchange與Queue。

3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個routing_key為”MQ.log.error”的消息會被轉發到該隊列)。

4.“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。

5.同樣,如果Exchange沒有發現能夠與routing_key匹配的Queue,則會拋棄此消息。

四、任務分發

1.Rabbitmq的任務是循環分發的,如果開啟兩個consumer,producer發送的信息是輪流發送到兩個consume的。

2.在producer端使用cha.basic_publish()來發送消息,其中body參數就是要發送的消息,properties=pika.BasicProperties(delivery_mode = 2,)啟用消息持久化,可以防止RabbitMQ Server 重啟或者crash引起的數據丟失。

3.在接收端使用cha.basic_consume()無限循環監聽,如果設置no-ack參數為真,每次Consumer接到數據后,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然后從queue中刪除了。為了保證數據不被丟失,RabbitMQ支持消息確認機制,即acknowledgments。為了保證數據能被正確處理而不僅僅是被Consumer收到,那么我們不能采用no-ack。而應該是在處理完數據后發送ack。

在處理數據后發送的ack,就是告訴RabbitMQ數據已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。如果Consumer退出了但是沒有發送ack,那么RabbitMQ就會把這個Message發送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下數據也不會丟失。

這里并沒有用到超時機制。RabbitMQ僅僅通過Consumer的連接中斷來確認該Message并沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做數據處理。

Demo的callback方法中ch.basic_ack(delivery_tag = method.delivery_tag)告訴rabbitmq消息已經正確處理。如果沒有這條代碼,Consumer退出時,Message會重新分發。然后RabbitMQ會占用越來越多的內存,由于RabbitMQ會長時間運行,因此這個“內存泄漏”是致命的。去調試這種錯誤,可以通過一下命令打印un-acked Messages:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

4.公平分發:設置cha.basic_qos(prefetch_count=1),這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。

五、注意:

生產者和消費者都應該聲明建立隊列,網上教程上說第二次創建如果參數和第一次不一樣,那么該操作雖然成功,但是queue的屬性并不會被修改。

可能因為版本問題,在我的測試中如果第二次聲明建立的隊列屬性和第一次不完全相同,將報類似這種錯406, "PRECONDITION_FAILED - parameters for queue 'anheng' in vhost '/' not equivalent"

如果是exchange第二次創建屬性不同,將報這種錯406, "PRECONDITION_FAILED - cannot redeclare exchange 'yanfa' in vhost '/' with different type, durable, internal or autodelete value"

如果第一次聲明建立隊列也出現這個錯誤,說明之前存在名字相同的隊列且本次聲明的某些屬性和之前聲明不同,可通過命令sudo rabbitmqctl list_queues查看當前有哪些隊列。解決方法是聲明建立另一名稱的隊列或刪除原有隊列,如果原有隊列是非持久化的,可通過重啟rabbitmq服務刪除原有隊列,如果原有隊列是持久化的,只能刪除它所在的vhost,然后再重建vhost,再設置vhost的權限(先確認該vhost中沒有其他有用隊列)。

sudo rabbitmqctl delete_vhost /sudo rabbitmqctl add_vhost /sudo rabbitmqctl set_permissions -p / username '.*' '.*' '.*'

以上內容是小編給大家介紹的利用Python學習RabbitMQ消息隊列,希望大家喜歡。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美激情精品久久久久久免费印度| 成人春色激情网| 成人性生交大片免费观看嘿嘿视频| 最新国产精品亚洲| 亚洲a级在线观看| 欧美日韩免费观看中文| 亚洲第一精品久久忘忧草社区| 亚洲一区二区三区香蕉| 91精品国产综合久久香蕉的用户体验| 久久久免费观看| 91精品啪在线观看麻豆免费| 亚洲视频在线免费观看| 亚洲精品国产精品乱码不99按摩| 国产一区二区三区四区福利| 亚洲娇小xxxx欧美娇小| 久久久免费精品视频| 国产日韩欧美电影在线观看| 国产aⅴ夜夜欢一区二区三区| 欧美视频在线视频| 91久久精品美女高潮| 欧美性生交xxxxx久久久| 69久久夜色精品国产69| 欧洲亚洲免费在线| 欧美中文字幕视频| 国产一区二区在线免费| 亚洲男子天堂网| 成人激情视频在线播放| 国产精品成人免费视频| 欧美国产日韩中文字幕在线| 狠狠躁夜夜躁久久躁别揉| 亚洲欧美日本另类| 久久久久久国产| 日韩在线观看免费全| 国产偷国产偷亚洲清高网站| 色婷婷综合成人| 亚洲男人的天堂网站| 亚洲最大福利视频网| 国产精品久久久久久久久久东京| 亚洲无av在线中文字幕| 91在线视频成人| 中文字幕在线看视频国产欧美| 欧美日韩aaaa| 亚洲精品白浆高清久久久久久| 日韩黄色高清视频| 69久久夜色精品国产69乱青草| 在线视频一区二区| 欧美区在线播放| 久久久精品999| 国产精品久久久久久婷婷天堂| 久久久久久久久91| 国产亚洲激情视频在线| 欧美成人精品一区二区| 国产精品久久久久91| 久久久av一区| 亚洲图片欧洲图片av| 精品视频久久久久久久| 国产精品视频久久久| 欧美专区第一页| 久久黄色av网站| 欧美激情手机在线视频| 91精品国产乱码久久久久久蜜臀| 热久久99这里有精品| 欧美日韩人人澡狠狠躁视频| 日本一区二三区好的精华液| 日韩高清a**址| 亚洲国产欧美一区二区丝袜黑人| 亚洲国产古装精品网站| 日韩中文字幕免费| 午夜精品福利在线观看| 欧美乱大交xxxxx另类电影| 成人亚洲欧美一区二区三区| 亚洲精品国产品国语在线| 88国产精品欧美一区二区三区| 精品在线小视频| 国产精品96久久久久久| 欧美高清无遮挡| 在线观看国产精品淫| 国产精品男人爽免费视频1| 亚洲在线观看视频| 欧美精品久久久久久久久久| 国产视频丨精品|在线观看| 久久精品电影网| 欧美极品少妇全裸体| 欧美视频在线视频| 欧美一区二区三区图| 欧美刺激性大交免费视频| 国产欧美在线视频| 亚洲欧洲国产一区| 精品爽片免费看久久| 色综合91久久精品中文字幕| 欧美日韩精品中文字幕| 国产精品久久久久久亚洲影视| 国产精品狼人色视频一区| 久久不射热爱视频精品| 亚洲精品电影网站| 国产这里只有精品| 日韩大胆人体377p| 亚洲伊人第一页| 久久久久久av| 久久国产视频网站| 国产成人精品国内自产拍免费看| 国产成人在线播放| 国产精品吊钟奶在线| 国产成人精品av在线| 日韩精品久久久久久久玫瑰园| 亚洲丁香婷深爱综合| 欧美高清视频在线| 久久精品电影网站| 日韩电影中文字幕在线观看| 欧美理论电影网| 国产精品99久久久久久人| 欧美大全免费观看电视剧大泉洋| 欧美专区福利在线| 欧美电影在线免费观看网站| 精品日韩美女的视频高清| 亚洲国产精品久久久| 亚洲精品一区久久久久久| 国产精品爽黄69| 国产伦精品一区二区三区精品视频| www.日韩系列| 国产精品嫩草影院久久久| 一本一道久久a久久精品逆3p| 国产在线精品一区免费香蕉| 久久精品青青大伊人av| 日本一区二三区好的精华液| 国产精品jizz在线观看麻豆| 亚洲a成v人在线观看| 精品视频在线播放| 精品久久中文字幕| 国语自产偷拍精品视频偷| 91精品视频网站| 性夜试看影院91社区| 亚洲国产中文字幕久久网| 亚洲精品免费av| 在线视频日韩精品| 国产免费一区二区三区在线能观看| 中文欧美在线视频| 91精品久久久久久久久久久久久| 欧美激情一区二区三区在线视频观看| 久久久国产在线视频| 欧美亚洲另类视频| 亚洲永久在线观看| 欧美日韩一区二区精品| 亚洲性日韩精品一区二区| 欧美极品美女电影一区| 少妇av一区二区三区| 亚洲精品自拍偷拍| 久久精品国产亚洲精品| 亚洲成人1234| 亚洲福利视频免费观看| 国产精品亚洲第一区| 欧美黄色片在线观看| 成人免费视频网| 亚洲国产福利在线| 欧美日韩另类字幕中文| 亚洲视频专区在线| 日韩精品在线电影| 精品视频久久久| 国产成人av网| 丝袜亚洲另类欧美重口| 精品一区二区三区电影| 国产亚洲欧洲高清| 亚洲精选中文字幕| 亚洲大胆人体视频|