亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > PHP > 正文

Kafka的介紹以及基于PHP的kafka的安裝和測試

2020-03-22 18:55:41
字體:
來源:轉載
供稿:網友
本篇文章給大家分享的內容是關于Kafka的介紹以及基于PHP的kafka的安裝和測試,內容很詳細,有需要的朋友可以參考一下,希望可以幫助到你們。

簡介

Kafka 是一種高吞吐量的分布式發布訂閱消息系統

kafka角色必知

producer:生產者。
consumer:消費者。
topic: 消息以topic為類別記錄,Kafka將消息種子(Feed)分類, 每一類的消息稱之為一個主題(Topic)。
broker:以集群的方式運行,可以由一個或多個服務組成,每個服務叫做一個broker;消費者可以訂閱一個或多個主題(topic), 并從Broker拉數據,從而消費這些已發布的消息。

經典模型

1. 一個主題下的分區不能小于消費者數量,即一個主題下消費者數量不能大于分區屬,大了就浪費了空閑了
2. 一個主題下的一個分區可以同時被不同消費組其中某一個消費者消費
3. 一個主題下的一個分區只能被同一個消費組的一個消費者消費

3315151442-5b5832bb37224_articlex.png

常用參數說明request.required.acks

Kafka producer的ack有3中機制,初始化producer時的producerconfig可以通過配置request.required.acks不同的值來實現。

0:這意味著生產者producer不等待來自broker同步完成的確認繼續發送下一條(批)消息。此選項提供最低的延遲但最弱的耐久性保證(當服務器發生故障時某些數據會丟失,如leader已死,但producer并不知情,發出去的信息broker就收不到)。

1:這意味著producer在leader已成功收到的數據并得到確認后發送下一條message。此選項提供了更好的耐久性為客戶等待服務器確認請求成功(被寫入死亡leader但尚未復制將失去了唯一的消息)。

-1:這意味著producer在follower副本確認接收到數據后才算一次發送完成。
此選項提供最好的耐久性,我們保證沒有信息將丟失,只要至少一個同步副本保持存活。

三種機制,性能依次遞減 (producer吞吐量降低),數據健壯性則依次遞增。

auto.offset.reset

1. earliest:自動將偏移重置為最早的偏移量
2. latest:自動將偏移量重置為最新的偏移量(默認)
3. none:如果consumer group沒有發現先前的偏移量,則向consumer拋出異常。
4. 其他的參數:向consumer拋出異常(無效參數)

kafka安裝和簡單測試安裝kafka(不需要安裝,解包即可)
# 官方下載地址:http://kafka.apache.org/downloads# wget https://www.apache.org/dyn/closer.cgi?path=/kafka/1.1.1/kafka_2.12-1.1.1.tgztar -xzf kafka_2.12-1.1.1.tgzcd kafka_2.12-1.1.0
啟動kafka server
# 需先啟動zookeeperbin/zookeeper-server-start.sh config/zookeeper.propertiesbin/kafka-server-start.sh config/server.properties
啟動kafka客戶端測試
# 創建一個話題,test話題2個分區bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic testCreated topic "test".# 顯示所有話題bin/kafka-topics.sh --list --zookeeper localhost:2181test# 顯示話題信息bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testTopic:test    PartitionCount:2    ReplicationFactor:1    Configs:    Topic: test    Partition: 0    Leader: 0    Replicas: 0    Isr: 0    Topic: test    Partition: 1    Leader: 0    Replicas: 0    Isr: 0# 啟動一個生產者(輸入消息)bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test[等待輸入自己的內容 出現>輸入即可]>i am a new msg !>i am a good msg ?# 啟動一個生產者(等待消息) # 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning[等待消息]i am a new msg !i am a good msg ?
安裝kafka的php擴展
git clone https://github.com/arnaud-lb/php-rdkafka.gitcd php-rdkafkaphpize./configuremake all -j 5sudo make installvim [php]/php.iniextension=rdkafka.so
php代碼實踐生產者
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./dr_cb.log", var_export($message, true).PHP_EOL, FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);});$rk = new RdKafka/Producer($conf);$rk->setLogLevel(LOG_DEBUG);$rk->addBrokers("127.0.0.1");$cf = new RdKafka/TopicConf();$cf->set('request.required.acks', 0);$topic = $rk->newTopic("test", $cf);$option = 'qkl';for ($i = 0; $i < 20; $i++) {    //RD_KAFKA_PARTITION_UA自動選擇分區    //$option可選    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "qkl . $i", $option);}$len = $rk->getOutQLen();while ($len > 0) {    $len = $rk->getOutQLen();    var_dump($len);    $rk->poll(50);}
運行生產者
php producer.php# outputint(20)int(20)int(20)int(20)int(0)# 你可以查看你剛才上面啟動的消費者shell應該會輸出消息qkl . 0qkl . 1qkl . 2qkl . 3qkl . 4qkl . 5qkl . 6qkl . 7qkl . 8qkl . 9qkl . 10qkl . 11qkl . 12qkl . 13qkl . 14qkl . 15qkl . 16qkl . 17qkl . 18qkl . 19
消費者
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./c_dr_cb.log", var_export($message, true), FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    file_put_contents("./err_cb.log", sprintf("Kafka error: %s (reason: %s)", rd_kafka_err2str($err), $reason).PHP_EOL, FILE_APPEND);});//設置消費組$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka/Consumer($conf);$rk->addBrokers("127.0.0.1");$topicConf = new RdKafka/TopicConf();$topicConf->set('request.required.acks', 1);//在interval.ms的時間內自動提交確認、建議不要啟動//$topicConf->set('auto.commit.enable', 1);$topicConf->set('auto.commit.enable', 0);$topicConf->set('auto.commit.interval.ms', 100);// 設置offset的存儲為file//$topicConf->set('offset.store.method', 'file');// 設置offset的存儲為broker $topicConf->set('offset.store.method', 'broker');//$topicConf->set('offset.store.path', __DIR__);//smallest:簡單理解為從頭開始消費,其實等價于上面的 earliest//largest:簡單理解為從最新的開始消費,其實等價于上面的 latest//$topicConf->set('auto.offset.reset', 'smallest');$topic = $rk->newTopic("test", $topicConf);// 參數1消費分區0// RD_KAFKA_OFFSET_BEGINNING 重頭開始消費// RD_KAFKA_OFFSET_STORED 最后一條消費的offset記錄開始消費// RD_KAFKA_OFFSET_END 最后一條消費$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);//$topic->consumeStart(0, RD_KAFKA_OFFSET_END); ////$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);while (true) {    //參數1表示消費分區,這里是分區0    //參數2表示同步阻塞多久    $message = $topic->consume(0, 12 * 1000);    switch ($message->err) {        case RD_KAFKA_RESP_ERR_NO_ERROR:            var_dump($message);            break;        case RD_KAFKA_RESP_ERR__PARTITION_EOF:            echo "No more messages; will wait for more/n";            break;        case RD_KAFKA_RESP_ERR__TIMED_OUT:            echo "Timed out/n";            break;        default:            throw new /Exception($message->errstr(), $message->err);            break;    }}
查看服務器元數據(topic/partition/broker)
<?php$conf = new RdKafka/Conf();$conf->setDrMsgCb(function ($kafka, $message) {    file_put_contents("./xx.log", var_export($message, true), FILE_APPEND);});$conf->setErrorCb(function ($kafka, $err, $reason) {    printf("Kafka error: %s (reason: %s)/n", rd_kafka_err2str($err), $reason);});$conf->set('group.id', 'myConsumerGroup');$rk = new RdKafka/Consumer($conf);$rk->addBrokers("127.0.0.1");$allInfo = $rk->metadata(true, NULL, 60e3);$topics = $allInfo->getTopics();echo rd_kafka_offset_tail(100);echo "--";echo count($topics);echo "--";foreach ($topics as $topic) {    $topicName = $topic->getTopic();    if ($topicName == "__consumer_offsets") {        continue ;    }    $partitions = $topic->getPartitions();    foreach ($partitions as $partition) {//        $rf = new ReflectionClass(get_html' target='_blank'>class($partition));//        foreach ($rf->getMethods() as $f) {//            var_dump($f);//        }//        die();        $topPartition = new RdKafka/TopicPartition($topicName, $partition->getId());        echo  "當前的話題:" . ($topPartition->getTopic()) . " - " . $partition->getId() . " - ";        echo  "offset:" . ($topPartition->getOffset()) . PHP_EOL;    }}

相關推薦:

kafka安裝及Kafka-PHP擴展的使用,kafkakafka-php擴展

kafka裝配及Kafka-PHP擴展的使用

以上就是Kafka的介紹以及基于PHP的kafka的安裝和測試的詳細內容,更多請關注 其它相關文章!

鄭重聲明:本文版權歸原作者所有,轉載文章僅為傳播更多信息之目的,如作者信息標記有誤,請第一時間聯系我們修改或刪除,多謝。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
久久精品这里热有精品| 国产日韩欧美视频在线| 久久综合网hezyo| 视频直播国产精品| 国产精品久久久久久久av大片| 欧美成人精品激情在线观看| 成人免费视频网址| 国内精品模特av私拍在线观看| 国产极品精品在线观看| 国产中文日韩欧美| 97精品一区二区视频在线观看| 国产精品久久久91| 中文字幕视频一区二区在线有码| 国产欧美精品xxxx另类| 91精品啪在线观看麻豆免费| 国产欧美婷婷中文| 亚洲精品suv精品一区二区| 欧美日韩亚洲天堂| 国产精品青青在线观看爽香蕉| 亚洲天堂影视av| 亚洲免费视频在线观看| 成人免费视频网址| 蜜臀久久99精品久久久久久宅男| 国产精品mp4| 日韩欧美成人区| 精品国产1区2区| 亚洲第一精品夜夜躁人人爽| 91麻豆国产语对白在线观看| 色青青草原桃花久久综合| 91久久久久久久| 亚洲欧美综合另类中字| 久久久久久久久久久网站| 日韩成人中文电影| 日韩精品视频观看| 亚洲精品国偷自产在线99热| 日韩欧美在线免费| 中文字幕日韩av电影| 国产精品久久久久久一区二区| 亚洲天堂av综合网| 久久亚洲精品网站| 日韩电影在线观看免费| 26uuu亚洲国产精品| 久青草国产97香蕉在线视频| 在线国产精品视频| 黑人与娇小精品av专区| 精品美女久久久久久免费| 欧美大片免费观看| 亚洲人成电影网站色…| 亚洲人精品午夜在线观看| 亚洲综合中文字幕68页| 性色av一区二区三区在线观看| 九色精品免费永久在线| 欧美又大又粗又长| 日韩av高清不卡| 国产精品福利在线| 欧美午夜影院在线视频| 欧美最顶级丰满的aⅴ艳星| 亚洲sss综合天堂久久| 成人写真福利网| 中文字幕精品—区二区| 国产成人精彩在线视频九色| 欧美日韩一区二区三区| 欧美亚洲另类制服自拍| 久久人91精品久久久久久不卡| 亚洲欧美色图片| 中文字幕亚洲字幕| 成人欧美一区二区三区黑人孕妇| 亚洲精品久久久久久久久久久久久| 亚洲aaa激情| 在线观看亚洲视频| 91伊人影院在线播放| 国产精品久久久久久亚洲影视| 国产91精品不卡视频| 欧美成人中文字幕在线| 国产盗摄xxxx视频xxx69| 国产精品h在线观看| 亚洲精品免费网站| 国产美女精彩久久| 日韩欧美大尺度| 中文字幕av一区二区| 最近中文字幕日韩精品| 久久久国产影院| 欧美一级高清免费| 揄拍成人国产精品视频| 综合av色偷偷网| 原创国产精品91| 91视频-88av| 国产成人精品视| 亚洲成人黄色在线观看| 亚洲大胆人体av| 美女福利视频一区| 在线观看日韩视频| 国产精品一区电影| 欧美裸身视频免费观看| 国产免费观看久久黄| 国产日韩中文字幕在线| 日韩高清中文字幕| 精品人伦一区二区三区蜜桃网站| 中文字幕欧美日韩精品| 国产精品视频一| 欧美孕妇孕交黑巨大网站| 成人伊人精品色xxxx视频| 精品国产一区二区三区久久久狼| 在线看国产精品| 欧美韩国理论所午夜片917电影| 亚洲成人国产精品| 91久久精品在线| 精品福利在线视频| 亚洲男人第一网站| 热久久视久久精品18亚洲精品| 91亚洲精品一区二区| 国产精品电影网| 国产欧亚日韩视频| 亚洲a成v人在线观看| 韩日欧美一区二区| 亚洲视频自拍偷拍| 久久艹在线视频| 欧美一级黑人aaaaaaa做受| 97在线视频精品| 久久视频国产精品免费视频在线| 国产精品自拍小视频| 亚洲黄在线观看| 三级精品视频久久久久| www日韩欧美| 国产精品一区久久久| 国产精品久久久久免费a∨大胸| 日韩在线观看高清| 中文字幕亚洲第一| 国产午夜精品一区理论片飘花| 少妇av一区二区三区| 精品日韩视频在线观看| 2018中文字幕一区二区三区| 中文字幕在线国产精品| 欧美壮男野外gaytube| 91福利视频在线观看| 久久夜精品va视频免费观看| 日韩电影网在线| 国产精品视频导航| 欧美日本高清视频| 国产精品久久久久久网站| 久久久久久久久国产精品| 情事1991在线| 久久91超碰青草是什么| 国产网站欧美日韩免费精品在线观看| 97视频在线观看播放| 亚洲精品久久久久久久久久久久| 亚洲精品日韩久久久| 欧美性猛交xxxx富婆| 国产精品久久久久久中文字| 91在线视频一区| 亚洲乱码一区二区| 欧美激情在线有限公司| 久久久久这里只有精品| 国产999精品视频| 久久国产精品免费视频| 97在线看福利| 成人精品一区二区三区电影免费| 青青在线视频一区二区三区| 日韩av免费网站| 久久伊人免费视频| 欧美一级电影免费在线观看| 久久久中精品2020中文| 久久久精品国产亚洲| 清纯唯美亚洲综合|