亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 語言 > PHP > 正文

PHP基于rabbitmq操作類的生產者和消費者功能示例

2024-05-05 00:04:16
字體:
來源:轉載
供稿:網友

本文實例講述了PHP基于rabbitmq操作類的生產者和消費者功能。分享給大家供大家參考,具體如下:

注意事項:

1、accept.php消費者代碼需要在命令行執行

2、'username'=>'asdf','password'=>'123456' 改成自己的帳號和密碼

RabbitMQCommand.php操作類代碼

<?php/* * amqp協議操作類,可以訪問rabbitMQ * 需先安裝php_amqp擴展 */class RabbitMQCommand{  public $configs = array();  //交換機名稱  public $exchange_name = '';  //隊列名稱  public $queue_name = '';  //路由名稱  public $route_key = '';  /*   * 持久化,默認True   */  public $durable = True;  /*   * 自動刪除   * exchange is deleted when all queues have finished using it   * queue is deleted when last consumer unsubscribes   *   */  public $autodelete = False;  /*   * 鏡像   * 鏡像隊列,打開后消息會在節點之間復制,有master和slave的概念   */  public $mirror = False;  private $_conn = Null;  private $_exchange = Null;  private $_channel = Null;  private $_queue = Null;  /*   * @configs array('host'=>$host,'port'=>5672,'username'=>$username,'password'=>$password,'vhost'=>'/')   */  public function __construct($configs = array(), $exchange_name = '', $queue_name = '', $route_key = '') {    $this->setConfigs($configs);    $this->exchange_name = $exchange_name;    $this->queue_name = $queue_name;    $this->route_key = $route_key;  }  private function setConfigs($configs) {    if (!is_array($configs)) {      throw new Exception('configs is not array');    }    if (!($configs['host'] && $configs['port'] && $configs['username'] && $configs['password'])) {      throw new Exception('configs is empty');    }    if (empty($configs['vhost'])) {      $configs['vhost'] = '/';    }    $configs['login'] = $configs['username'];    unset($configs['username']);    $this->configs = $configs;  }  /*   * 設置是否持久化,默認為True   */  public function setDurable($durable) {    $this->durable = $durable;  }  /*   * 設置是否自動刪除   */  public function setAutoDelete($autodelete) {    $this->autodelete = $autodelete;  }  /*   * 設置是否鏡像   */  public function setMirror($mirror) {    $this->mirror = $mirror;  }  /*   * 打開amqp連接   */  private function open() {    if (!$this->_conn) {      try {        $this->_conn = new AMQPConnection($this->configs);        $this->_conn->connect();        $this->initConnection();      } catch (AMQPConnectionException $ex) {        throw new Exception('cannot connection rabbitmq',500);      }    }  }  /*   * rabbitmq連接不變   * 重置交換機,隊列,路由等配置   */  public function reset($exchange_name, $queue_name, $route_key) {    $this->exchange_name = $exchange_name;    $this->queue_name = $queue_name;    $this->route_key = $route_key;    $this->initConnection();  }  /*   * 初始化rabbit連接的相關配置   */  private function initConnection() {    if (empty($this->exchange_name) || empty($this->queue_name) || empty($this->route_key)) {      throw new Exception('rabbitmq exchange_name or queue_name or route_key is empty',500);    }    $this->_channel = new AMQPChannel($this->_conn);    $this->_exchange = new AMQPExchange($this->_channel);    $this->_exchange->setName($this->exchange_name);    $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);    if ($this->durable)      $this->_exchange->setFlags(AMQP_DURABLE);    if ($this->autodelete)      $this->_exchange->setFlags(AMQP_AUTODELETE);    $this->_exchange->declare();    $this->_queue = new AMQPQueue($this->_channel);    $this->_queue->setName($this->queue_name);    if ($this->durable)      $this->_queue->setFlags(AMQP_DURABLE);    if ($this->autodelete)      $this->_queue->setFlags(AMQP_AUTODELETE);    if ($this->mirror)      $this->_queue->setArgument('x-ha-policy', 'all');    $this->_queue->declare();    $this->_queue->bind($this->exchange_name, $this->route_key);  }  public function close() {    if ($this->_conn) {      $this->_conn->disconnect();    }  }  public function __sleep() {    $this->close();    return array_keys(get_object_vars($this));  }  public function __destruct() {    $this->close();  }  /*   * 生產者發送消息   */  public function send($msg) {    $this->open();    if(is_array($msg)){      $msg = json_encode($msg);    }else{      $msg = trim(strval($msg));    }    return $this->_exchange->publish($msg, $this->route_key);  }  /*   * 消費者   * $fun_name = array($classobj,$function) or function name string   * $autoack 是否自動應答   *   * function processMessage($envelope, $queue) {      $msg = $envelope->getBody();      echo $msg."/n"; //處理消息      $queue->ack($envelope->getDeliveryTag());//手動應答    }   */  public function run($fun_name, $autoack = True){    $this->open();    if (!$fun_name || !$this->_queue) return False;    while(True){      if ($autoack) $this->_queue->consume($fun_name, AMQP_AUTOACK);      else $this->_queue->consume($fun_name);    }  }}

send.php生產者代碼

<?phpset_time_limit(0);include_once('RabbitMQCommand.php');$configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');$exchange_name = 'class-e-1';$queue_name = 'class-q-1';$route_key = 'class-r-1';$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);for($i=0;$i<=100;$i++){  $ra->send(date('Y-m-d H:i:s',time()));}exit();

accept.php消費者代碼

<?phperror_reporting(0);include_once('RabbitMQCommand.php');$configs = array('host'=>'127.0.0.1','port'=>5672,'username'=>'asdf','password'=>'123456','vhost'=>'/');$exchange_name = 'class-e-1';$queue_name = 'class-q-1';$route_key = 'class-r-1';$ra = new RabbitMQCommand($configs,$exchange_name,$queue_name,$route_key);class A{  function processMessage($envelope, $queue) {    $msg = $envelope->getBody();    $envelopeID = $envelope->getDeliveryTag();    $pid = posix_getpid();    file_put_contents("log{$pid}.log", $msg.'|'.$envelopeID.''."/r/n",FILE_APPEND);    $queue->ack($envelopeID);  }}$a = new A();$s = $ra->run(array($a,'processMessage'),false);

希望本文所述對大家PHP程序設計有所幫助。


注:相關教程知識閱讀請移步到PHP教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表

圖片精選

亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲深夜福利网站| 另类美女黄大片| 97在线视频免费| 国产性猛交xxxx免费看久久| 狠狠躁18三区二区一区| 久久精品国产精品| 97视频国产在线| 人人爽久久涩噜噜噜网站| 九九九久久久久久| 欧美亚洲伦理www| 操人视频在线观看欧美| 日韩av在线免费| 久久婷婷国产麻豆91天堂| 正在播放国产一区| 日韩久久精品电影| 久久国产精品久久国产精品| 色哟哟亚洲精品一区二区| 久久韩剧网电视剧| 久久好看免费视频| 欧美在线影院在线视频| 欧美性猛交xxxx乱大交蜜桃| 日韩成人在线电影网| 国产精品高潮呻吟久久av黑人| 久久人人爽人人爽爽久久| 91精品久久久久久久久久另类| 亚洲最大福利网| 亚洲欧美激情精品一区二区| 国产精品大陆在线观看| 日韩男女性生活视频| 欧美日韩国产黄| 亚洲国产另类 国产精品国产免费| 国产精品综合久久久| 国产视频久久久| 国产丝袜一区视频在线观看| 日本成人免费在线| 久久久91精品国产一区不卡| 欧美大片免费观看在线观看网站推荐| 性色av一区二区三区在线观看| 精品久久久在线观看| 成人有码在线视频| 亚洲欧美综合区自拍另类| 国产免费一区二区三区在线观看| 久久久久在线观看| 日韩欧美一区视频| 国产精品视频专区| 国产精品久久不能| 欧美日韩激情小视频| 欧美日韩加勒比精品一区| 最新69国产成人精品视频免费| 久久精品人人做人人爽| 91在线免费网站| 欧美精品免费在线观看| 欧美在线视频在线播放完整版免费观看| www日韩中文字幕在线看| 日韩视频在线免费观看| 久久夜色撩人精品| 欧美成人精品xxx| 亚洲老头老太hd| 成人激情黄色网| 欧美电影免费观看大全| 亚洲小视频在线观看| 久久婷婷国产麻豆91天堂| 欧美日韩另类字幕中文| 日韩中文字幕在线观看| 国产成人精品一区二区三区| 亚洲品质视频自拍网| 精品福利在线观看| 中文在线资源观看视频网站免费不卡| 隔壁老王国产在线精品| 国产香蕉精品视频一区二区三区| 九九热这里只有精品6| 国内精久久久久久久久久人| 精品国产一区av| 中文字幕亚洲二区| 97欧美精品一区二区三区| 亚洲精品电影在线| 精品国偷自产在线视频| 欧美在线不卡区| 国产成人精品久久| 国产自摸综合网| 久久视频在线播放| 久久精品美女视频网站| 俺也去精品视频在线观看| 亚洲风情亚aⅴ在线发布| 欧美黄色片视频| www.xxxx欧美| 久久五月情影视| 精品精品国产国产自在线| 成人免费视频网址| 日韩免费观看在线观看| 午夜精品美女自拍福到在线| 亚洲国产精品电影在线观看| 91精品免费视频| 欧美亚洲国产另类| 国产主播喷水一区二区| 国产91精品黑色丝袜高跟鞋| 久久久精品免费| 国产午夜精品麻豆| 国产精品第二页| 久久精品国产亚洲| 清纯唯美日韩制服另类| 亚洲国产精品久久久久| 亚洲片国产一区一级在线观看| 色先锋资源久久综合5566| 国产欧美日韩免费看aⅴ视频| 亚洲色无码播放| 日韩欧美在线免费| 自拍视频国产精品| 91欧美精品成人综合在线观看| 国产精品高潮呻吟久久av无限| 久久韩剧网电视剧| 日韩精品极品在线观看| 国产精品中文字幕久久久| 色综合男人天堂| 久久国产精品久久久久久久久久| 中文字幕亚洲欧美日韩在线不卡| 久热精品视频在线免费观看| 亚洲午夜av久久乱码| 中文字幕国产日韩| 中文字幕不卡av| 日韩美女在线观看一区| 国产日韩精品一区二区| 欧美激情视频免费观看| 日韩中文在线中文网三级| 日韩精品视频在线播放| 日韩欧美成人精品| 久久国产精品免费视频| 国产成人精品免高潮费视频| 亚洲最大成人免费视频| 亚洲欧美一区二区激情| 国产97在线亚洲| 一夜七次郎国产精品亚洲| 在线性视频日韩欧美| 国产精品久久久久久久久久久不卡| 亚洲精品www| 久久久免费观看| 97碰碰碰免费色视频| 红桃视频成人在线观看| 黄色精品在线看| 91网在线免费观看| 国产91精品久久久久| 日韩av免费在线| 精品欧美国产一区二区三区| 国模gogo一区二区大胆私拍| 2019中文字幕全在线观看| 青草青草久热精品视频在线观看| 国产精品极品美女在线观看免费| 国产精品久久久91| 日韩精品久久久久久久玫瑰园| 97久久精品人搡人人玩| 欧美激情日韩图片| 日韩在线免费高清视频| 国产精品电影网| 欧美三级欧美成人高清www| 亚洲人成绝费网站色www| 欧美国产日韩一区二区三区| 欧美性极品xxxx做受| 久久免费视频网站| 久久人91精品久久久久久不卡| 69久久夜色精品国产7777| 97视频免费看| 一区二区三欧美| 久久频这里精品99香蕉| 日韩激情视频在线|