亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 編程 > Golang > 正文

Go如何實現HTTP請求限流示例

2020-04-01 18:57:41
字體:
來源:轉載
供稿:網友

在開發高并發系統時有三把利器用來保護系統:緩存、降級和限流!為了保證在業務高峰期,線上系統也能保證一定的彈性和穩定性,最有效的方案就是進行服務降級了,而限流就是降級系統最常采用的方案之一。

這里為大家推薦一個開源庫 https://github.com/didip/tollbooth 但是,如果您想要一些簡單的、輕量級的或者只是想要學習的東西,實現自己的中間件來處理速率限制并不困難。今天我們就來聊聊如何實現自己的一個限流中間件

首先我們需要安裝一個提供了 Token bucket (令牌桶算法)的依賴包,上面提到的toolbooth 的實現也是基于它實現的

$ go get golang.org/x/time/rate

好了我們先看Demo代碼的實現:

limit.go

package mainimport (  "net/http"  "golang.org/x/time/rate")var limiter = rate.NewLimiter(2, 5)func limit(next http.Handler) http.Handler {  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {    if limiter.Allow() == false {      http.Error(w, http.StatusText(429), http.StatusTooManyRequests)      return    }    next.ServeHTTP(w, r)  })}

main.go

package mainimport (  "net/http")func main() {  mux := http.NewServeMux()  mux.HandleFunc("/", okHandler)  // Wrap the servemux with the limit middleware.  http.ListenAndServe(":4000", limit(mux))}func okHandler(w http.ResponseWriter, r *http.Request) {  w.Write([]byte("OK"))}

我們看看 rate.NewLimiter的源碼:

// Copyright 2015 The Go Authors. All rights reserved.// Use of this source code is governed by a BSD-style// license that can be found in the LICENSE file.// Package rate provides a rate limiter.package rateimport ( "fmt" "math" "sync" "time" "golang.org/x/net/context")// Limit defines the maximum frequency of some events.// Limit is represented as number of events per second.// A zero Limit allows no events.type Limit float64// Inf is the infinite rate limit; it allows all events (even if burst is zero).const Inf = Limit(math.MaxFloat64)// Every converts a minimum time interval between events to a Limit.func Every(interval time.Duration) Limit { if interval <= 0 {  return Inf } return 1 / Limit(interval.Seconds())}// A Limiter controls how frequently events are allowed to happen.// It implements a "token bucket" of size b, initially full and refilled// at rate r tokens per second.// Informally, in any large enough time interval, the Limiter limits the// rate to r tokens per second, with a maximum burst size of b events.// As a special case, if r == Inf (the infinite rate), b is ignored.// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets.//// The zero value is a valid Limiter, but it will reject all events.// Use NewLimiter to create non-zero Limiters.//// Limiter has three main methods, Allow, Reserve, and Wait.// Most callers should use Wait.//// Each of the three methods consumes a single token.// They differ in their behavior when no token is available.// If no token is available, Allow returns false.// If no token is available, Reserve returns a reservation for a future token// and the amount of time the caller must wait before using it.// If no token is available, Wait blocks until one can be obtained// or its associated context.Context is canceled.//// The methods AllowN, ReserveN, and WaitN consume n tokens.type Limiter struct { limit Limit burst int mu   sync.Mutex tokens float64 // last is the last time the limiter's tokens field was updated last time.Time // lastEvent is the latest time of a rate-limited event (past or future) lastEvent time.Time}// Limit returns the maximum overall event rate.func (lim *Limiter) Limit() Limit { lim.mu.Lock() defer lim.mu.Unlock() return lim.limit}// Burst returns the maximum burst size. Burst is the maximum number of tokens// that can be consumed in a single call to Allow, Reserve, or Wait, so higher// Burst values allow more events to happen at once.// A zero Burst allows no events, unless limit == Inf.func (lim *Limiter) Burst() int { return lim.burst}// NewLimiter returns a new Limiter that allows events up to rate r and permits// bursts of at most b tokens.func NewLimiter(r Limit, b int) *Limiter { return &Limiter{  limit: r,  burst: b, }}// Allow is shorthand for AllowN(time.Now(), 1).func (lim *Limiter) Allow() bool { return lim.AllowN(time.Now(), 1)}// AllowN reports whether n events may happen at time now.// Use this method if you intend to drop / skip events that exceed the rate limit.// Otherwise use Reserve or Wait.func (lim *Limiter) AllowN(now time.Time, n int) bool { return lim.reserveN(now, n, 0).ok}// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.// A Reservation may be canceled, which may enable the Limiter to permit additional events.type Reservation struct { ok    bool lim    *Limiter tokens  int timeToAct time.Time // This is the Limit at reservation time, it can change later. limit Limit}// OK returns whether the limiter can provide the requested number of tokens// within the maximum wait time. If OK is false, Delay returns InfDuration, and// Cancel does nothing.func (r *Reservation) OK() bool { return r.ok}// Delay is shorthand for DelayFrom(time.Now()).func (r *Reservation) Delay() time.Duration { return r.DelayFrom(time.Now())}// InfDuration is the duration returned by Delay when a Reservation is not OK.const InfDuration = time.Duration(1<<63 - 1)// DelayFrom returns the duration for which the reservation holder must wait// before taking the reserved action. Zero duration means act immediately.// InfDuration means the limiter cannot grant the tokens requested in this// Reservation within the maximum wait time.func (r *Reservation) DelayFrom(now time.Time) time.Duration { if !r.ok {  return InfDuration } delay := r.timeToAct.Sub(now) if delay < 0 {  return 0 } return delay}// Cancel is shorthand for CancelAt(time.Now()).func (r *Reservation) Cancel() { r.CancelAt(time.Now()) return}// CancelAt indicates that the reservation holder will not perform the reserved action// and reverses the effects of this Reservation on the rate limit as much as possible,// considering that other reservations may have already been made.func (r *Reservation) CancelAt(now time.Time) { if !r.ok {  return } r.lim.mu.Lock() defer r.lim.mu.Unlock() if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) {  return } // calculate tokens to restore // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved // after r was obtained. These tokens should not be restored. restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) if restoreTokens <= 0 {  return } // advance time to now now, _, tokens := r.lim.advance(now) // calculate new number of tokens tokens += restoreTokens if burst := float64(r.lim.burst); tokens > burst {  tokens = burst } // update state r.lim.last = now r.lim.tokens = tokens if r.timeToAct == r.lim.lastEvent {  prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens)))  if !prevEvent.Before(now) {   r.lim.lastEvent = prevEvent  } } return}// Reserve is shorthand for ReserveN(time.Now(), 1).func (lim *Limiter) Reserve() *Reservation { return lim.ReserveN(time.Now(), 1)}// ReserveN returns a Reservation that indicates how long the caller must wait before n events happen.// The Limiter takes this Reservation into account when allowing future events.// ReserveN returns false if n exceeds the Limiter's burst size.// Usage example://  r, ok := lim.ReserveN(time.Now(), 1)//  if !ok {//   // Not allowed to act! Did you remember to set lim.burst to be > 0 ?//  }//  time.Sleep(r.Delay())//  Act()// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.// If you need to respect a deadline or cancel the delay, use Wait instead.// To drop or skip events exceeding rate limit, use Allow instead.func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { r := lim.reserveN(now, n, InfDuration) return &r}// Wait is shorthand for WaitN(ctx, 1).func (lim *Limiter) Wait(ctx context.Context) (err error) { return lim.WaitN(ctx, 1)}// WaitN blocks until lim permits n events to happen.// It returns an error if n exceeds the Limiter's burst size, the Context is// canceled, or the expected wait time exceeds the Context's Deadline.func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { if n > lim.burst {  return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) } // Check if ctx is already cancelled select { case <-ctx.Done():  return ctx.Err() default: } // Determine wait limit now := time.Now() waitLimit := InfDuration if deadline, ok := ctx.Deadline(); ok {  waitLimit = deadline.Sub(now) } // Reserve r := lim.reserveN(now, n, waitLimit) if !r.ok {  return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) } // Wait t := time.NewTimer(r.DelayFrom(now)) defer t.Stop() select { case <-t.C:  // We can proceed.  return nil case <-ctx.Done():  // Context was canceled before we could proceed. Cancel the  // reservation, which may permit other events to proceed sooner.  r.Cancel()  return ctx.Err() }}// SetLimit is shorthand for SetLimitAt(time.Now(), newLimit).func (lim *Limiter) SetLimit(newLimit Limit) { lim.SetLimitAt(time.Now(), newLimit)}// SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated// or underutilized by those which reserved (using Reserve or Wait) but did not yet act// before SetLimitAt was called.func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { lim.mu.Lock() defer lim.mu.Unlock() now, _, tokens := lim.advance(now) lim.last = now lim.tokens = tokens lim.limit = newLimit}// reserveN is a helper method for AllowN, ReserveN, and WaitN.// maxFutureReserve specifies the maximum reservation wait duration allowed.// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { lim.mu.Lock() defer lim.mu.Unlock() if lim.limit == Inf {  return Reservation{   ok:    true,   lim:    lim,   tokens:  n,   timeToAct: now,  } } now, last, tokens := lim.advance(now) // Calculate the remaining number of tokens resulting from the request. tokens -= float64(n) // Calculate the wait duration var waitDuration time.Duration if tokens < 0 {  waitDuration = lim.limit.durationFromTokens(-tokens) } // Decide result ok := n <= lim.burst && waitDuration <= maxFutureReserve // Prepare reservation r := Reservation{  ok:  ok,  lim:  lim,  limit: lim.limit, } if ok {  r.tokens = n  r.timeToAct = now.Add(waitDuration) } // Update state if ok {  lim.last = now  lim.tokens = tokens  lim.lastEvent = r.timeToAct } else {  lim.last = last } return r}// advance calculates and returns an updated state for lim resulting from the passage of time.// lim is not changed.func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { last := lim.last if now.Before(last) {  last = now } // Avoid making delta overflow below when last is very old. maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) elapsed := now.Sub(last) if elapsed > maxElapsed {  elapsed = maxElapsed } // Calculate the new number of tokens, due to time that passed. delta := lim.limit.tokensFromDuration(elapsed) tokens := lim.tokens + delta if burst := float64(lim.burst); tokens > burst {  tokens = burst } return now, last, tokens}// durationFromTokens is a unit conversion function from the number of tokens to the duration// of time it takes to accumulate them at a rate of limit tokens per second.func (limit Limit) durationFromTokens(tokens float64) time.Duration { seconds := tokens / float64(limit) return time.Nanosecond * time.Duration(1e9*seconds)}// tokensFromDuration is a unit conversion function from a time duration to the number of tokens// which could be accumulated during that duration at a rate of limit tokens per second.func (limit Limit) tokensFromDuration(d time.Duration) float64 { return d.Seconds() * float64(limit)}

算法描述:

用戶配置的平均發送速率為r,則每隔1/r秒一個令牌被加入到桶中(每秒會有r個令牌放入桶中),桶中最多可以存放b個令牌。如果令牌到達時令牌桶已經滿了,那么這個令牌會被丟棄;

實現用戶粒度的限流

雖然在某些情況下使用單個全局速率限制器非常有用,但另一種常見情況是基于IP地址或API密鑰等標識符為每個用戶實施速率限制器。我們將使用IP地址作為標識符。簡單實現代碼如下:

package mainimport (  "net/http"  "sync"  "time"  "golang.org/x/time/rate")// Create a custom visitor struct which holds the rate limiter for each// visitor and the last time that the visitor was seen.type visitor struct {  limiter *rate.Limiter  lastSeen time.Time}// Change the the map to hold values of the type visitor.var visitors = make(map[string]*visitor)var mtx sync.Mutex// Run a background goroutine to remove old entries from the visitors map.func init() {  go cleanupVisitors()}func addVisitor(ip string) *rate.Limiter {  limiter := rate.NewLimiter(2, 5)  mtx.Lock()  // Include the current time when creating a new visitor.  visitors[ip] = &visitor{limiter, time.Now()}  mtx.Unlock()  return limiter}func getVisitor(ip string) *rate.Limiter {  mtx.Lock()  v, exists := visitors[ip]  if !exists {    mtx.Unlock()    return addVisitor(ip)  }  // Update the last seen time for the visitor.  v.lastSeen = time.Now()  mtx.Unlock()  return v.limiter}// Every minute check the map for visitors that haven't been seen for// more than 3 minutes and delete the entries.func cleanupVisitors() {  for {    time.Sleep(time.Minute)    mtx.Lock()    for ip, v := range visitors {      if time.Now().Sub(v.lastSeen) > 3*time.Minute {        delete(visitors, ip)      }    }    mtx.Unlock()  }}func limit(next http.Handler) http.Handler {  return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {    limiter := getVisitor(r.RemoteAddr)    if limiter.Allow() == false {      http.Error(w, http.StatusText(429), http.StatusTooManyRequests)      return    }    next.ServeHTTP(w, r)  })}

當然這只是一個簡單的實現方案,如果我們要在微服務的API-GateWay中去實現限流還是要考慮很多東西的。建議大家可以看看 https://github.com/didip/tollbooth 的源碼。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VEVB武林網。


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲精品国产精品国自产观看浪潮| 伊人久久久久久久久久久| 亚洲综合中文字幕68页| 欧美亚洲国产日韩2020| 久久精品免费电影| 91欧美精品午夜性色福利在线| 国内精品在线一区| 国产精品小说在线| 亚洲香蕉av在线一区二区三区| 欧美激情a∨在线视频播放| 国产精品欧美日韩| 亚洲欧美在线x视频| 欧美劲爆第一页| 国产精品av在线| 操人视频在线观看欧美| 国产精品嫩草影院一区二区| 国语自产精品视频在线看抢先版图片| 久久久久中文字幕| 亚洲丝袜av一区| 欧美午夜性色大片在线观看| 91精品国产免费久久久久久| 欧美视频一区二区三区…| 久久在线精品视频| 欧美日本黄视频| 亚洲欧洲日产国产网站| 日本成人精品在线| 麻豆一区二区在线观看| 久久精品91久久久久久再现| 日韩在线免费av| 亚洲性日韩精品一区二区| 欧美性视频网站| 国产成人自拍视频在线观看| 国产三级精品网站| 动漫精品一区二区| 国产精品嫩草影院一区二区| 国产精品美女午夜av| 国产成人精品一区二区| 91在线网站视频| 国模极品一区二区三区| 2019最新中文字幕| 日韩在线国产精品| 亚洲天堂免费视频| 国产精品自拍视频| 国产99久久精品一区二区 夜夜躁日日躁| 97涩涩爰在线观看亚洲| 中文字幕亚洲情99在线| 欧美日韩色婷婷| 精品国产一区久久久| 亚洲专区在线视频| 国产精品美腿一区在线看| 国产精品成熟老女人| 国产专区欧美专区| 亚洲国内精品视频| 色哟哟亚洲精品一区二区| 91精品国产网站| 亚洲男人天天操| 日韩av免费在线观看| 在线亚洲国产精品网| 亚洲精品www久久久久久广东| 亚洲黄色www| 2020欧美日韩在线视频| 亚洲一区二区精品| 成人国产精品色哟哟| 久久久999精品| 亚洲成人久久网| 91网站在线免费观看| 日韩欧美视频一区二区三区| 国产精品毛片a∨一区二区三区|国| 色偷偷亚洲男人天堂| 国产丝袜精品第一页| 97av视频在线| www高清在线视频日韩欧美| 91在线免费视频| 国产日韩av在线播放| 欧美激情欧美激情在线五月| 粗暴蹂躏中文一区二区三区| 欧美一级大片视频| 久久久久免费精品国产| 国产自摸综合网| 亚洲最新av在线网站| 色琪琪综合男人的天堂aⅴ视频| 川上优av一区二区线观看| 成人黄色av免费在线观看| 成人黄色在线播放| 精品福利免费观看| 在线观看免费高清视频97| 日韩高清免费观看| 欧美大尺度电影在线观看| 国产亚洲日本欧美韩国| 欧美激情2020午夜免费观看| 亚洲国产精品女人久久久| 97视频在线观看免费高清完整版在线观看| 精品电影在线观看| 欧美麻豆久久久久久中文| 国产精品丝袜久久久久久不卡| 91精品国产91久久| 国语自产精品视频在线看| 国产精品女主播| www.亚洲男人天堂| 日韩av在线电影网| 精品国产欧美一区二区三区成人| 亚洲午夜未满十八勿入免费观看全集| 成人在线播放av| 久久久亚洲影院你懂的| 精品国产成人av| 91免费精品国偷自产在线| 精品久久久久久久中文字幕| 欧美日韩亚洲一区二区三区| 国产精品啪视频| 国产精品自拍网| 亚洲美女中文字幕| 在线看国产精品| 日本成人在线视频网址| 精品国产31久久久久久| 日本不卡高字幕在线2019| 久久精品99久久香蕉国产色戒| 国产日韩欧美夫妻视频在线观看| 日韩国产欧美精品一区二区三区| 奇米成人av国产一区二区三区| 庆余年2免费日韩剧观看大牛| 亚洲最大av在线| 欧美超级免费视 在线| 久久久精品一区二区| 高清欧美电影在线| 成人综合网网址| 欧美激情视频网址| 精品无人区乱码1区2区3区在线| 精品自在线视频| 主播福利视频一区| 日韩国产欧美区| 另类美女黄大片| 精品久久久久国产| 国产精品福利久久久| 国产精品亚发布| 日韩av在线影视| 成人欧美一区二区三区在线| 亚洲高清一区二| 欧美日韩一区二区三区在线免费观看| 日韩三级影视基地| 亚洲肉体裸体xxxx137| 亚洲欧美激情四射在线日| 在线精品高清中文字幕| 亚洲一区美女视频在线观看免费| 久久久国产精品视频| 日韩欧美高清在线视频| 日韩欧美成人网| 91理论片午午论夜理片久久| 欧美日韩美女在线| 亚洲图片在线综合| 亚洲影院色在线观看免费| 国产精品自产拍高潮在线观看| 亚洲美女在线观看| 亚洲欧美精品suv| 亚洲精品网站在线播放gif| 精品欧美aⅴ在线网站| 亚洲欧洲国产伦综合| 中文字幕欧美日韩va免费视频| 992tv成人免费影院| 国产精品久久久精品| 国产精品流白浆视频| 亚洲精品久久久久中文字幕欢迎你| 亚洲美女精品久久| 国产精品福利在线观看| 亚洲欧美日韩直播|