亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > Golang > 正文

golang如何使用sarama訪問kafka

2020-04-01 18:50:41
字體:
來源:轉載
供稿:網友

下面一個客戶端代碼例子訪問kafka服務器,來發送和接受消息。

使用方式

1、命令行參數

$ ./kafkaclient -hUsage of ./client: -ca string  CA Certificate (default "ca.pem") -cert string  Client Certificate (default "cert.pem") -command string  consumer|producer (default "consumer") -host string  Common separated kafka hosts (default "localhost:9093") -key string  Client Key (default "key.pem") -partition int  Kafka topic partition -tls  TLS enable -topic string  Kafka topic (default "test--topic")

2、作為producer啟動

$ ./kafkaclient -command producer / -host kafka1:9092,kafka2:9092## TLS-enabled$ ./kafkaclient -command producer / -tls -cert client.pem -key client.key -ca ca.pem / -host kafka1:9093,kafka2:9093

producer發送消息給kafka:

> aaa2018/12/15 07:11:21 Produced message: [aaa]> bbb2018/12/15 07:11:30 Produced message: [bbb]> quit

3、作為consumer啟動

$ ./kafkaclient -command consumer / -host kafka1:9092,kafka2:9092## TLS-enabled$ ./kafkaclient -command consumer / -tls -cert client.pem -key client.key -ca ca.pem / -host kafka1:9093,kafka2:9093

consumer從kafka接受消息:

2018/12/15 07:11:21 Consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 Consumed message: [bbb], offset: [5]

完整源代碼如下

這個代碼使用到了Shopify/sarama庫,請自行下載使用。

$ cat kafkaclient.gopackage mainimport ( "flag" "fmt" "log" "os" "io/ioutil" "bufio" "strings" "crypto/tls" "crypto/x509" "github.com/Shopify/sarama")var ( command  string tlsEnable bool hosts  string topic  string partition int clientcert string clientkey string cacert  string)func main() { flag.StringVar(&command, "command",  "consumer",   "consumer|producer") flag.BoolVar(&tlsEnable, "tls",   false,    "TLS enable") flag.StringVar(&hosts,  "host",   "localhost:9093", "Common separated kafka hosts") flag.StringVar(&topic,  "topic",  "test--topic",  "Kafka topic") flag.IntVar(&partition,  "partition", 0,     "Kafka topic partition") flag.StringVar(&clientcert, "cert",   "cert.pem",   "Client Certificate") flag.StringVar(&clientkey, "key",   "key.pem",   "Client Key") flag.StringVar(&cacert,  "ca",   "ca.pem",   "CA Certificate") flag.Parse() config := sarama.NewConfig() if tlsEnable {  //sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)  tlsConfig, err := genTLSConfig(clientcert, clientkey, cacert)  if err != nil {   log.Fatal(err)  }  config.Net.TLS.Enable = true  config.Net.TLS.Config = tlsConfig } client, err := sarama.NewClient(strings.Split(hosts, ","), config) if err != nil {  log.Fatalf("unable to create kafka client: %q", err) } if command == "consumer" {  consumer, err := sarama.NewConsumerFromClient(client)  if err != nil {   log.Fatal(err)  }  defer consumer.Close()  loopConsumer(consumer, topic, partition) } else {  producer, err := sarama.NewAsyncProducerFromClient(client)  if err != nil {   log.Fatal(err)  }  defer producer.Close()  loopProducer(producer, topic, partition) }}func genTLSConfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.Config, error) { // load client cert clientcert, err := tls.LoadX509KeyPair(clientcertfile, clientkeyfile) if err != nil {  return nil, err } // load ca cert pool cacert, err := ioutil.ReadFile(cacertfile) if err != nil {  return nil, err } cacertpool := x509.NewCertPool() cacertpool.AppendCertsFromPEM(cacert) // generate tlcconfig tlsConfig := tls.Config{} tlsConfig.RootCAs = cacertpool tlsConfig.Certificates = []tls.Certificate{clientcert} tlsConfig.BuildNameToCertificate() // tlsConfig.InsecureSkipVerify = true // This can be used on test server if domain does not match cert: return &tlsConfig, err}func loopProducer(producer sarama.AsyncProducer, topic string, partition int) { scanner := bufio.NewScanner(os.Stdin) fmt.Print("> ") for scanner.Scan() {  text := scanner.Text()  if text == "" {  } else if text == "exit" || text == "quit" {   break  } else {   producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: sarama.StringEncoder(text)}   log.Printf("Produced message: [%s]/n",text)  }  fmt.Print("> ") }}func loopConsumer(consumer sarama.Consumer, topic string, partition int) { partitionConsumer, err := consumer.ConsumePartition(topic, int32(partition), sarama.OffsetNewest) if err != nil {  log.Println(err)  return } defer partitionConsumer.Close() for {  msg := <-partitionConsumer.Messages()  log.Printf("Consumed message: [%s], offset: [%d]/n", msg.Value, msg.Offset) }}

編譯:

$ go build kafkaclient.go

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
色先锋资源久久综合5566| 亚洲欧美日本另类| 亚洲欧美精品一区二区| 91精品国产电影| 欧美国产视频一区二区| 国内外成人免费激情在线视频| 亚洲国产精品推荐| 精品久久在线播放| 国产日韩欧美在线看| 国产一区私人高清影院| 国产自摸综合网| 久久久久女教师免费一区| 国产精品美女在线观看| 国产精品久在线观看| 性欧美暴力猛交69hd| 色青青草原桃花久久综合| 精品偷拍各种wc美女嘘嘘| 欧美在线视频a| 77777少妇光屁股久久一区| 色偷偷88888欧美精品久久久| 亚洲免费精彩视频| 国产午夜精品一区二区三区| 中文字幕在线视频日韩| 国产精品久久久久久久久久免费| 91亚洲国产成人久久精品网站| 97超碰国产精品女人人人爽| 国产美女高潮久久白浆| 国产精品欧美亚洲777777| 国产精品久久久av久久久| 欧美第一黄网免费网站| 久久久久久18| 91大神福利视频在线| 欧美日韩亚洲一区二区三区| 成人两性免费视频| 欧美美最猛性xxxxxx| 国产日产久久高清欧美一区| 美女性感视频久久久| 亚洲欧洲一区二区三区在线观看| www.99久久热国产日韩欧美.com| 欧美激情亚洲另类| 久久久久国产精品免费网站| 亚洲午夜精品久久久久久久久久久久| 精品国偷自产在线| 尤物精品国产第一福利三区| 91大神福利视频在线| 久久久久久97| 精品久久久久久中文字幕| 成人观看高清在线观看免费| 精品久久久久久中文字幕一区奶水| 国产91色在线播放| 精品激情国产视频| 亚洲韩国青草视频| 亚洲国产成人久久| 69影院欧美专区视频| 精品少妇一区二区30p| 精品动漫一区二区| 97人人模人人爽人人喊中文字| 亚洲丁香久久久| 91精品国产色综合久久不卡98| 91久久嫩草影院一区二区| 国产精品国产福利国产秒拍| 久久免费国产精品1| 亚洲第一福利在线观看| 在线性视频日韩欧美| 成人黄色片网站| 日韩中文字幕在线精品| 91免费看视频.| 91欧美视频网站| 91在线视频免费| 91亚洲精品在线观看| 亚洲午夜色婷婷在线| 欧美成人午夜激情视频| 国产精品久久久久久av福利软件| 伊人久久久久久久久久久久久| 福利视频一区二区| 欧美激情精品久久久久久久变态| 欧美日韩亚洲精品内裤| 日本一欧美一欧美一亚洲视频| 91av在线看| 亚洲国产天堂久久综合网| 在线播放国产精品| 亚洲国产精品热久久| 色狠狠av一区二区三区香蕉蜜桃| 成人福利视频在线观看| 中文精品99久久国产香蕉| 亚洲精品www久久久久久广东| 97国产成人精品视频| 亚洲综合av影视| 欧美日韩国产一区二区三区| 日本一区二区在线免费播放| 亚洲在线免费看| 91精品91久久久久久| 日韩欧美国产网站| 欧美性xxxxxxxxx| 国产午夜精品全部视频在线播放| 日韩有码片在线观看| 亚洲男人的天堂在线| 日韩电影中文字幕av| 91av在线不卡| 亚洲精品视频免费| 热久久这里只有精品| 久久欧美在线电影| 黑人精品xxx一区| 欧美猛交免费看| 久久久久国产精品一区| 国产精品久久久久久久久| 久久精品男人天堂| 日韩中文娱乐网| 中文字幕在线日韩| 国产亚洲免费的视频看| 欧美日本亚洲视频| 国产成人久久久精品一区| 7m精品福利视频导航| 欧美亚州一区二区三区| 国产精品白嫩美女在线观看| 国产一区二区三区精品久久久| 精品久久久久久中文字幕| 日韩av在线网址| 欧美日韩激情小视频| 亚洲理论在线a中文字幕| 97在线视频国产| 中日韩午夜理伦电影免费| 热99久久精品| 精品久久久在线观看| 国产在线观看不卡| 亚洲美女av在线播放| 伊人成人开心激情综合网| 黄色成人av网| 日韩欧美大尺度| 国产在线视频欧美| 国产不卡av在线免费观看| 丝袜情趣国产精品| 国产成人福利夜色影视| 色偷偷888欧美精品久久久| 亚洲影院高清在线| 狠狠躁天天躁日日躁欧美| 成人免费看黄网站| 成人女保姆的销魂服务| 国产成人精品av| 亚洲人线精品午夜| 亚洲激情第一页| 日韩在线观看av| 亚洲国产另类 国产精品国产免费| 91亚洲精品久久久| 久久99国产精品自在自在app| 免费不卡在线观看av| 欧美另类暴力丝袜| 日韩视频免费在线观看| 日韩精品高清在线| 欧美在线视频网站| 国模精品一区二区三区色天香| 国产精品久久久久久久久久久久| 97在线视频免费播放| 欧美xxxwww| 欧美一区二区三区图| 欧美日韩一区免费| 动漫精品一区二区| 丝袜一区二区三区| 久久久久久av| 久久手机精品视频| 国产精品扒开腿爽爽爽视频| 亚洲va码欧洲m码| 性金发美女69hd大尺寸| 国产精品入口免费视频一|