亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 數據庫 > Redis > 正文

關于redigo中PubSub的一點小坑分析

2020-10-28 21:31:20
字體:
來源:轉載
供稿:網友

前言

最近在用 golang 做一些 redis 相關的操作,選用了 redigo 這個第三方庫。然后在使用 Pub/Sub 的時候,卻發現了一個小坑……

Redis Client

首先,我們來初始化一個帶連接池的 Redis Client:

import (	"github.com/gomodule/redigo/redis")type RedisClient struct {	pool *redis.Pool}func NewRedisClient(addr string, db int, passwd string) *RedisClient {	pool := &redis.Pool{		MaxIdle:  10,		IdleTimeout: 300 * time.Second,		Dial: func() (redis.Conn, error) {			c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))			if err != nil {				return nil, err			}			return c, nil		},		TestOnBorrow: func(c redis.Conn, t time.Time) error {			if time.Since(t) < time.Minute {				return nil			}			_, err := c.Do("PING")			return err		},	}	log.Printf("new redis pool at %s", addr)	client := &RedisClient{		pool: pool,	}	return client}

Publish

然后我們可以簡單的實現一個 publish 方法:

func (r *RedisClient) Publish(channel, message string) (int, error) {	c := r.pool.Get()	defer c.Close()	n, err := redis.Int(c.Do("PUBLISH", channel, message))	if err != nil {		return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)	}	return n, nil}

Subscribe

接下來就是一個稍微復雜點的帶有心跳的 subscribe 方法:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}

最后,我們寫一個簡單地 main 函數來調用 publish & subscribe:

func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {	psc := redis.PubSubConn{Conn: r.pool.Get()}	defer psc.Close()	log.Printf("redis pubsub subscribe channel: %v", channel)	if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {		return err	}	done := make(chan error, 1)	// start a new goroutine to receive message	go func() {		for {			switch msg := psc.Receive().(type) {			case error:				done <- fmt.Errorf("redis pubsub receive err: %v", msg)				return			case redis.Message:				if err := consume(msg); err != nil {					done <- err					return				}			case redis.Subscription:				if msg.Count == 0 {					// all channels are unsubscribed					done <- nil					return				}			}		}	}()	// health check	tick := time.NewTicker(time.Minute)	defer tick.Stop()	for {		select {		case <-ctx.Done():			if err := psc.Unsubscribe(); err != nil {				return fmt.Errorf("redis pubsub unsubscribe err: %v", err)			}			return nil		case err := <-done:			return err		case <-tick.C:			if err := psc.Ping(""); err != nil {				return err			}		}	}	return nil}


咋一看之下,好像并沒有什么異常?然而,如果我們這時候去看 redis 的 tcp 連接,就可以發現一些貓膩:

$sudo netstat -antp | grep redistcp  0  0 0.0.0.0:6379   0.0.0.0:*    LISTEN  940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55010  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55015  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55009  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55005  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55012  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55011  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55013  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55007  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55006  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:55014  ESTABLISHED 940/redis-server 0. tcp  0  0 172.16.8.128:6379  172.16.8.1:54972  ESTABLISHED 940/redis-server 0. 

竟然是每一次 subscribe 就新建了一個連接,而 connection pool 似乎沒有什么作用。

更進一步地調試,我們發現在 defer psc.Close() 的時候就卡住了,也就是上面的 10 個 goroutine 其實并沒有正常退出。

Concurrent

排查許久之后,終于定位到了問題!引用 redigo 的說明:

Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.

For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.

也就是說,雖然一個連接可以在不同的 goroutine 并發調用 Receive() 和 Subscribe()(subscribe調用了send和flush) ,但是卻不能再有其他并發操作(比如 Close())。

其他相似的問題還可以參考 issue

Fix

知道了上面的原因之后,我們稍微修改一下 defer psc.Close() 的位置即可解決問題:

	// start a new goroutine to receive message	go func() {		// IMPORTANT!		defer psc.Close()		for {			switch msg := psc.Receive().(type) {			case error:

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,如果有疑問大家可以留言交流,謝謝大家對武林網的支持。

發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
国产成人+综合亚洲+天堂| 亚洲国产一区二区三区四区| 久久国产精品亚洲| 成人免费网站在线| 性欧美亚洲xxxx乳在线观看| 国产精品日韩一区| 成人a在线观看| 亲爱的老师9免费观看全集电视剧| 国产一区二区av| 亚洲欧美制服第一页| 91精品视频免费| 91精品国产91久久久久久不卡| 欧美华人在线视频| 国产精品久久久久久亚洲影视| 国产精品91久久| 精品国产欧美一区二区五十路| 亚洲一区二区在线| 国产91色在线|| 一区二区三区国产在线观看| 人人澡人人澡人人看欧美| 国产精品三级网站| 亚洲最大福利网站| 久久久久久久久91| 久久国产精品电影| 国产色综合天天综合网| 亚洲欧美中文在线视频| 日韩av电影在线免费播放| 欧美视频国产精品| 中文字幕av一区二区| 亚洲一区999| 欧美精品性视频| 国产精品久久久久久av下载红粉| 国产欧美日韩亚洲精品| 亚洲欧美国产精品久久久久久久| 欧美日韩美女在线| 国产精品视频色| 日韩欧美在线一区| 日韩在线小视频| 91精品国产91久久久久福利| 国产精品国产三级国产aⅴ9色| 亚洲美女精品成人在线视频| 久久久精品视频成人| 欧美日韩一区二区免费视频| 亚洲一区二区三区乱码aⅴ| 97国产suv精品一区二区62| 日韩精品在线观看视频| 日韩视频亚洲视频| 91亚洲精品久久久| 中文字幕亚洲欧美日韩高清| 欧美大片在线看| 九九热这里只有精品6| 日韩电影中文字幕一区| 精品国产乱码久久久久久天美| 国产亚洲欧洲高清一区| 日本成人在线视频网址| 欧洲美女7788成人免费视频| 日本免费久久高清视频| 亚洲欧美日韩一区二区在线| 欧美黑人xxxⅹ高潮交| 国产精品女主播| 国产成人小视频在线观看| 国产精品日本精品| 久久高清视频免费| 久久免费高清视频| 国产精品久久久久久久天堂| 国语自产在线不卡| 欧美激情一级精品国产| 午夜精品视频网站| 91精品啪aⅴ在线观看国产| 日韩在线欧美在线国产在线| 国产精品美女免费看| 欧美亚洲第一页| 亚洲v日韩v综合v精品v| 日本中文字幕不卡免费| 欧美做受高潮1| 欧美激情视频一区二区三区不卡| 欧美精品精品精品精品免费| 国语自产精品视频在线看| 欧美一乱一性一交一视频| 久久69精品久久久久久久电影好| 日韩欧美有码在线| 日本精品在线视频| 欧美亚洲激情在线| 不卡av在线网站| 日韩av高清不卡| 日韩最新av在线| 亚洲性生活视频| 日韩在线观看免费全集电视剧网站| 1769国内精品视频在线播放| 色777狠狠综合秋免鲁丝| 92看片淫黄大片欧美看国产片| 日韩中文字幕网址| 亚洲色图15p| 亚洲2020天天堂在线观看| 国产成人精品在线观看| 92看片淫黄大片欧美看国产片| 日本午夜精品理论片a级appf发布| 国产日韩欧美中文| 国产精品扒开腿做| 清纯唯美日韩制服另类| 欧美日韩一区二区精品| 一区二区av在线| 欧美xxxx综合视频| 97超级碰碰碰| 国产精品久久久久久久一区探花| 成人精品一区二区三区电影免费| 97视频人免费观看| 国产精品久久久久久久av电影| 91情侣偷在线精品国产| 亚洲精品国精品久久99热| 亚洲一级黄色av| 欧美激情综合亚洲一二区| 美女黄色丝袜一区| 中文字幕精品一区二区精品| 午夜剧场成人观在线视频免费观看| 亚洲精品美女免费| 成人高清视频观看www| 成人精品福利视频| 中文字幕亚洲欧美一区二区三区| 日韩成人激情视频| 欧美日韩成人免费| 色多多国产成人永久免费网站| 8x拔播拔播x8国产精品| 欧美日韩另类字幕中文| 亚洲综合中文字幕在线| 亚洲第一精品久久忘忧草社区| 亚洲国产天堂久久国产91| 成人黄色短视频在线观看| 国产美女91呻吟求| 欧美中文字幕在线| 国产精品成人久久久久| 91精品国产91久久| 精品香蕉一区二区三区| 国产成人午夜视频网址| 欧美二区乱c黑人| 97精品一区二区三区| 精品国产乱码久久久久久天美| 亚洲成avwww人| 久久久亚洲国产天美传媒修理工| 一区三区二区视频| 久久精品人人做人人爽| 亚洲国产精品久久久| 久久这里有精品| 亚洲色图国产精品| 国产视频久久久| 欧美精品xxx| 97香蕉久久超级碰碰高清版| 国产在线精品成人一区二区三区| 欧美日韩国产一区二区三区| 国产精品视频一区二区高潮| 日韩美女视频免费看| 正在播放欧美视频| 热久久这里只有| 97精品国产91久久久久久| 日韩在线一区二区三区免费视频| 日韩中文字幕在线视频| 在线观看欧美成人| 亚洲欧美国产制服动漫| 成人黄色午夜影院| 91色精品视频在线| 日韩精品视频免费在线观看| 国产精品久久久久久久一区探花| 在线午夜精品自拍| 欧美国产日本在线|