亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 數據庫 > Redis > 正文

關于redigo中PubSub的一點小坑分析

2020-03-17 12:19:21
字體:
來源:轉載
供稿:網友

前言

最近在用 golang 做一些 redis 相關的操作,選用了 redigo 這個第三方庫。然后在使用 Pub/Sub 的時候,卻發現了一個小……

Redis Client

首先,我們來初始化一個帶連接池的 Redis Client:

import (	"github.com/gomodule/redigo/redis")type RedisClient struct {	pool *redis.Pool}func NewRedisClient(addr string, db int, passwd string) *RedisClient {	pool := &redis.Pool{		MaxIdle:  10,		IdleTimeout: 300 * time.Second,		Dial: func() (redis.Conn, error) {			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))			if err != nil {				return nil, err			}			return c, nil		},		TestOnBorrow: func(c redis.Conn, t time.Time) error {			if time.Since(t) < time.Minute {				return nil			}			_, err := c.Do("PING")			return err		},	}	log.Printf("new redis pool at %s", addr)	client := &RedisClient{		pool: pool,	}	return client}

Publish

然后我們可以簡單的實現一個 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {	c := r.pool.Get()	defer c.Close()	n, err := redis.Int(c.Do("PUBLISH", channel, message))	if err != nil {		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)	}	return n, nil}

Subscribe

接下來就是一個稍微復雜點的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}

最后,我們寫一個簡單地 main 函數來調用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}

咋一看之下,好像并沒有什么異常?然而,如果我們這時候去看 redis 的 tcp 連接,就可以發現一些貓膩:

$sudo netstat -antp | grep redistcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個連接,而 connection pool 似乎沒有什么作用。

更進一步地調試,我們發現在 defer psc.Close() 的時候就卡住了,也就是上面的 10 個 goroutine 其實并沒有正常退出。

Concurrent

排查許久之后,終于定位到了問題!引用 redigo 的說明

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說,雖然一個連接可以在不同的 goroutine 并發調用 Receive() 和 Subscribe()(subscribe調用了send和flush) ,但是卻不能再有其他并發操作(比如 Close())。

其他相似的問題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:

	// start a new goroutine to receive message	go func() {		// IMPORTANT!		defer psc.Close()		for {			switch msg := psc.Receive().(type) {			case error:

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對VEVB武林網的支持。


注:相關教程知識閱讀請移步到Redis頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
日韩在线精品一区| 久久视频在线免费观看| 国产精品成久久久久三级| 一区二区三区黄色| 国产精品一区二区久久| 欧洲亚洲女同hd| 国产一区在线播放| 狠狠躁18三区二区一区| 68精品国产免费久久久久久婷婷| 欧美性猛交xxxx富婆| 欧美大片在线看| 国产精品久久久久久中文字| 亚洲男人第一av网站| 日韩av在线免费观看一区| 国产精品三级美女白浆呻吟| 91精品久久久久久久久久久久久| 精品美女永久免费视频| 日产精品久久久一区二区福利| 欧美激情伊人电影| 美日韩精品视频免费看| 国产精品xxxxx| 亚洲精品一区中文| 亚洲片国产一区一级在线观看| 国产精品久久久久久久久久免费| 亚洲精品在线不卡| 国产大片精品免费永久看nba| 隔壁老王国产在线精品| 成人美女免费网站视频| 日韩69视频在线观看| 日韩av中文在线| 欧美性猛交xxxx富婆弯腰| 精品福利视频导航| 国产精品专区第二| 国产aⅴ夜夜欢一区二区三区| 国产精品白丝av嫩草影院| 日韩av大片免费看| 热re99久久精品国产66热| 欧美电影免费观看| 奇米成人av国产一区二区三区| 日韩精品中文字幕在线播放| 91久久在线观看| 国产主播欧美精品| 亚洲一区第一页| 永久免费精品影视网站| 日韩网站免费观看高清| 一本一本久久a久久精品牛牛影视| 国产精品av免费在线观看| 中文字幕不卡在线视频极品| 97视频在线观看播放| 亚洲男人天堂网站| 日韩成人激情影院| 日韩av不卡电影| 国产精品日日摸夜夜添夜夜av| 在线国产精品播放| 不用播放器成人网| 日韩免费中文字幕| 日韩一区二区三区国产| 亚洲精品wwww| 欧美另类交人妖| 欧美日韩国产精品| 大量国产精品视频| 欧美日韩国产黄| 久久五月情影视| 日日骚av一区| 黄色一区二区三区| 欧美黄色成人网| 欧美做爰性生交视频| 亚洲人成电影在线观看天堂色| 国产精品揄拍500视频| 欧美一级bbbbb性bbbb喷潮片| 国产精品日韩欧美综合| 国产主播精品在线| 2018中文字幕一区二区三区| 尤物九九久久国产精品的特点| 91精品国产高清久久久久久91| 欧美福利视频在线观看| 国产在线观看91精品一区| 国模精品一区二区三区色天香| 亚洲精品一区在线观看香蕉| 国产精品www色诱视频| 欧美一级高清免费| 亚洲国产高清自拍| 国产欧美日韩中文字幕在线| 91av福利视频| www.xxxx欧美| 日韩中文字幕欧美| 欧美激情精品久久久久久| 国产精品成人免费电影| 欧美老女人性生活| 亚洲视频在线免费观看| 国产成人精品视| 青青草原一区二区| 欧美电影免费观看电视剧大全| 欧美劲爆第一页| 日韩免费av在线| 国产乱肥老妇国产一区二| 不卡av在线播放| 日韩精品在线私人| 日韩在线免费高清视频| 最近的2019中文字幕免费一页| 91在线色戒在线| 青草成人免费视频| 91久久国产婷婷一区二区| 久久视频在线播放| 日韩在线中文字| 日韩欧美一区二区三区| 国产97在线亚洲| 国产精品一区二区久久国产| 精品国产视频在线| 亚洲精品国产精品自产a区红杏吧| 亚洲一区二区久久久久久久| 97婷婷大伊香蕉精品视频| 精品国产91久久久久久老师| 国产精品久久久久久久久久久久久久| 亚洲国产婷婷香蕉久久久久久| 国产视频亚洲精品| 综合激情国产一区| 亚洲乱码国产乱码精品精| 久久久久久国产精品三级玉女聊斋| 亚洲性69xxxbbb| 久久久成人av| 久久好看免费视频| 国产在线不卡精品| 91精品久久久久久久久久入口| 欧美色视频日本高清在线观看| 亚洲天堂免费视频| 国产精品视频中文字幕91| 成人性教育视频在线观看| 亚洲福利视频专区| 一区二区三区四区在线观看视频| 懂色av一区二区三区| 中文亚洲视频在线| 日韩成人激情影院| 日韩中文字幕精品| 亚洲aⅴ男人的天堂在线观看| 日韩av影院在线观看| 亚洲精品自拍偷拍| 日韩av在线一区| 日本aⅴ大伊香蕉精品视频| 亚洲福利视频二区| 欧美成人黄色小视频| 亚洲欧美中文字幕| 国产精品99久久99久久久二8| 亚洲深夜福利网站| 日韩暖暖在线视频| 亚洲曰本av电影| 超碰97人人做人人爱少妇| 亚洲精品第一页| 欧美一级大片在线观看| 亚洲综合社区网| 亚洲最大成人在线| 国产99久久精品一区二区永久免费| 日韩精品欧美激情| 国产成人精品在线观看| 91麻豆国产精品| 在线视频国产日韩| 精品久久久久久久久久ntr影视| 亚洲综合小说区| 国产精品自产拍在线观看中文| 欧美视频二区36p| 亚洲黄色www网站| 精品成人69xx.xyz| 国产日韩在线视频| 91免费欧美精品|