亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

源碼閱讀之storm操作zookeeper-cluster.clj

2024-07-13 10:12:56
字體:
來源:轉載
供稿:網友

storm操作zookeeper的主要函數都定義在命名空間backtype.storm.cluster中(即cluster.clj文件中)。 backtype.storm.cluster定義了兩個重要protocol:ClusterState和StormClusterState。

clojure中的protocol可以看成java中的接口,封裝了一組方法。ClusterState協議中封裝了一組與zookeeper進行交互的基礎函數,如獲取子節點函數,獲取子節點數據函數等,ClusterState協議定義如下:

ClusterState協議

(defprotocol ClusterState  (set-ephemeral-node [this path data])  (delete-node [this path])  (create-sequential [this path data])  ;; if node does not exist, create persistent with this data  (set-data [this path data])  (get-data [this path watch?])  (get-version [this path watch?])  (get-data-with-version [this path watch?])  (get-children [this path watch?])  (mkdirs [this path])  (close [this])  (register [this callback])  (unregister [this id]))

 

StormClusterState協議封裝了一組storm與zookeeper進行交互的函數,可以將StormClusterState協議中的函數看成ClusterState協議中函數的"組合"。StormClusterState協議定義如下:

StormClusterState協議

(defprotocol StormClusterState (assignments [this callback]) (assignment-info [this storm-id callback]) (assignment-info-with-version [this storm-id callback]) (assignment-version [this storm-id callback]) (active-storms [this]) (storm-base [this storm-id callback]) (get-worker-heartbeat [this storm-id node port]) (executor-beats [this storm-id executor->node+port]) (supervisors [this callback]) (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist (setup-heartbeats! [this storm-id]) (teardown-heartbeats! [this storm-id]) (teardown-topology-errors! [this storm-id]) (heartbeat-storms [this]) (error-topologies [this]) (worker-heartbeat! [this storm-id node port info]) (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) (remove-storm! [this storm-id]) (report-error [this storm-id task-id node port error]) (errors [this storm-id task-id]) (disconnect [this]))

命名空間backtype.storm.cluster除了定義ClusterState和StormClusterState這兩個重要協議外,還定義了兩個重要函數:mk-distributed-cluster-state和mk-storm-cluster-state。

mk-distributed-cluster-state函數如下:

該函數返回一個實現了ClusterState協議的對象,通過這個對象就可以與zookeeper進行交互了。

mk-distributed-cluster-state函數

(defn mk-distributed-cluster-state;; conf綁定了storm.yaml中的配置信息,是一個map對象[conf];; zk綁定一個zk client,Storm使用CuratorFramework與Zookeeper進行交互 (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT)            :auth-conf conf)]  ;; 創建storm集群在zookeeper上的根目錄,默認值為/storm  (zk/mkdirs zk (conf STORM-ZOOKEEPER-ROOT))  (.close zk));; callbacks綁定回調函數集合,是一個map對象(let [callbacks (atom {})  ;; active標示zookeeper集群狀態   active (atom true)  ;; zk重新綁定新的zk client,該zk client設置了watcher,這樣當zookeeper集群的狀態發生變化時,zk server會給zk client發送相應的event,zk client設置的watcher會調用callbacks中相應回調函數來處理event  ;; 啟動nimbus時,callbacks是一個空集合,所以nimbus端收到event后不會調用任何回調函數;但是啟動supervisor時,callbacks中注冊了回調函數,所以當supervisor收到zk server發送的event后,會調用相應的回調函數  ;; mk-client函數定義在zookeeper.clj文件中,請參見其定義部分  zk (zk/mk-client conf           (conf STORM-ZOOKEEPER-SERVERS)           (conf STORM-ZOOKEEPER-PORT)           :auth-conf conf           :root (conf STORM-ZOOKEEPER-ROOT)           ;; :watcher綁定一個函數,指定zk client的默認watcher函數,state標示當前zk client的狀態;type標示事件類型;path標示zookeeper上產生該事件的znode           ;; 該watcher函數主要功能就是執行callbacks集合中的函數,callbacks集合中的函數是在mk-storm-cluster-state函數中通過調用ClusterState的register函數添加的           :watcher (fn [state type path]                (when @active                 (when-not (= :connected state)                  (log-warn "Received event " state ":" type ":" path " with disconnected Zookeeper."))                 (when-not (= :none type)                  (doseq [callback (vals @callbacks)]                   (callback type path))))))];; reify相當于java中的implements,這里表示實現一個協議(reify ClusterState ;; register函數用于將回調函數加入callbacks中,key是一個32位的標識 (register  [this callback]  (let [id (uuid)]   (swap! callbacks assoc id callback)   id)) ;; unregister函數用于將指定key的回調函數從callbacks中刪除 (unregister  [this id]  (swap! callbacks dissoc id)) ;; 在zookeeper上添加一個臨時節點 (set-ephemeral-node  [this path data]  (zk/mkdirs zk (parent-path path))  (if (zk/exists zk path false)   (try-cause    (zk/set-data zk path data) ; should verify that it's ephemeral    (catch KeeperException$NoNodeException e     (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data")     (zk/create-node zk path data :ephemeral)     ))   (zk/create-node zk path data :ephemeral))) ;; 在zookeeper上添加一個順序節點 (create-sequential  [this path data]  (zk/create-node zk path data :sequential)) ;; 修改某個節點數據 (set-data  [this path data]  ;; note: this does not turn off any existing watches  (if (zk/exists zk path false)   (zk/set-data zk path data)   (do    (zk/mkdirs zk (parent-path path))    (zk/create-node zk path data :persistent)))) ;; 刪除指定節點 (delete-node  [this path]  (zk/delete-recursive zk path)) ;; 獲取指定節點數據。path標示節點路徑;watch?是一個布爾類型值,表示是否需要對該節點進行"觀察",如果watch?=true,當調用set-data函數修改該節點數據后, ;; 會給zk client發送一個事件,zk client接收事件后,會調用創建zk client時指定的默認watcher函數(即:watcher綁定的函數) (get-data  [this path watch?]  (zk/get-data zk path watch?)) ;; 與get-data函數的區別就是獲取指定節點數據的同時,獲取節點數據的version,version表示節點數據修改的次數 (get-data-with-version  [this path watch?]  (zk/get-data-with-version zk path watch?)) ;; 獲取指定節點的version,watch?的含義與get-data函數中的watch?相同 (get-version   [this path watch?]  (zk/get-version zk path watch?)) ;; 獲取指定節點的子節點列表,watch?的含義與get-data函數中的watch?相同 (get-children  [this path watch?]  (zk/get-children zk path watch?)) ;; 在zookeeper上創建一個節點 (mkdirs  [this path]  (zk/mkdirs zk path)) ;; 關閉zk client (close  [this]  (reset! active false)  (.close zk)))))

mk-storm-cluster-state函數定義如下:

mk-storm-cluster-state函數非常重要,該函數返回一個實現了StormClusterState協議的實例,通過該實例storm就可以更加方便與zookeeper進行交互。

在啟動nimbus和supervisor的函數中均調用了mk-storm-cluster-state函數。關于nimbus和supervisor的啟動將在之后的文章中介紹。

mk-storm-cluster-state函數

(defn mk-storm-cluster-state [cluster-state-spec] ;; satisfies?謂詞相當于java中的instanceof,判斷cluster-state-spec是不是ClusterState實例 (let [[solo? cluster-state] (if (satisfies? ClusterState cluster-state-spec)              [false cluster-state-spec]              [true (mk-distributed-cluster-state cluster-state-spec)])  ;; 綁定topology id->回調函數的map,當/assignments/{topology id}數據發生變化時,zk client執行assignment-info-callback中topology id所對應的回調函數  assignment-info-callback (atom {})  ;; assignment-info-with-version-callback與assignment-info-callback類似  assignment-info-with-version-callback (atom {})  ;; assignment-version-callback與assignments-callback類似  assignment-version-callback (atom {})  ;; 當/supervisors標示的znode的子節點發生變化時,zk client執行supervisors-callback指向的函數  supervisors-callback (atom nil)  ;; 當/assignments標示的znode的子節點發生變化時,zk client執行assignments-callback指向的函數  assignments-callback (atom nil)  ;; 當/storms/{topology id}標示的znode的數據發生變化時,zk client執行storm-base-callback中topology id所對應的回調函數  storm-base-callback (atom {})  ;; register函數將"回調函數(fn ...)"添加到cluster-state的callbacks集合中,并返回標示該回調函數的uuid  state-id (register        cluster-state        ;; 定義"回調函數",type標示事件類型,path標示znode        (fn [type path]         ;; subtree綁定路徑前綴如"assignments"、"storms"、"supervisors"等,args存放topology id         (let [[subtree & args] (tokenize-path path)]          ;; condp相當于java中的switch          (condp = subtree           ;; 當subtree="assignments"時,如果args為空,說明是/assignments的子節點發生變化,執行assignments-callback指向的回調函數,否則     ;; 說明/assignments/{topology id}標示的節點數據發生變化,執行assignment-info-callback指向的回調函數           ASSIGNMENTS-ROOT (if (empty? args)                    (issue-callback! assignments-callback)                    (issue-map-callback! assignment-info-callback (first args)))           ;; 當subtree="supervisors"時,說明是/supervisors的子節點發生變化,執行supervisors-callback指向的回調函數           SUPERVISORS-ROOT (issue-callback! supervisors-callback)           ;; 當subtree="storms"時,說明是/storms/{topology id}標示的節點數據發生變化,執行storm-base-callback指向的回調函數           STORMS-ROOT (issue-map-callback! storm-base-callback (first args))           ;; this should never happen           (exit-process! 30 "Unknown callback for subtree " subtree args)))))];; 在zookeeper上創建storm運行topology所必需的znode(doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE]] (mkdirs cluster-state p));; 返回一個實現StormClusterState協議的實例(reify StormClusterState ;; 獲取/assignments的子節點列表,如果callback不為空,將其賦值給assignments-callback,并對/assignments添加"節點觀察" (assignments  [this callback]  (when callback   (reset! assignments-callback callback))  (get-children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) ;; 獲取/assignments/{storm-id}節點數據,即storm-id的分配信息,如果callback不為空,將其添加到assignment-info-callback中,并對/assignments/{storm-id}添加"數據觀察" (assignment-info  [this storm-id callback]  (when callback   (swap! assignment-info-callback assoc storm-id callback))  (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)))) ;; 獲取/assignments/{storm-id}節點數據包括version信息,如果callback不為空,將其添加到assignment-info-with-version-callback中,并對/assignments/{storm-id}添加"數據觀察" (assignment-info-with-version   [this storm-id callback]  (when callback   (swap! assignment-info-with-version-callback assoc storm-id callback))  (let [{data :data version :version}      (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]  {:data (maybe-deserialize data)   :version version})) ;; 獲取/assignments/{storm-id}節點數據的version信息,如果callback不為空,將其添加到assignment-version-callback中,并對/assignments/{storm-id}添加"數據觀察" (assignment-version   [this storm-id callback]  (when callback   (swap! assignment-version-callback assoc storm-id callback))  (get-version cluster-state (assignment-path storm-id) (not-nil? callback))) ;; 獲取storm集群中正在運行的topology id即/storms的子節點列表 (active-storms  [this]  (get-children cluster-state STORMS-SUBTREE false)) ;; 獲取storm集群中所有有心跳的topology id即/workerbeats的子節點列表 (heartbeat-storms  [this]  (get-children cluster-state WORKERBEATS-SUBTREE false)) ;; 獲取所有有錯誤的topology id即/errors的子節點列表 (error-topologies  [this]  (get-children cluster-state ERRORS-SUBTREE false)) ;; 獲取指定storm-id進程的心跳信息,即/workerbeats/{storm-id}/{node-port}節點數據 (get-worker-heartbeat  [this storm-id node port]  (-> cluster-state    (get-data (workerbeat-path storm-id node port) false)    maybe-deserialize)) ;; 獲取指定進程中所有線程的心跳信息 (executor-beats  [this storm-id executor->node+port]  ;; need to take executor->node+port in explicitly so that we don't run into a situation where a  ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats  ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned,  ;; we avoid situations like that  (let [node+port->executors (reverse-map executor->node+port)     all-heartbeats (for [[[node port] executors] node+port->executors]              (->> (get-worker-heartbeat this storm-id node port)                (convert-executor-beats executors)                ))]   (apply merge all-heartbeats))) ;; 獲取/supervisors的子節點列表,如果callback不為空,將其賦值給supervisors-callback,并對/supervisors添加"節點觀察"  (supervisors  [this callback]  (when callback   (reset! supervisors-callback callback))  (get-children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) ;; 獲取/supervisors/{supervisor-id}節點數據,即supervisor的心跳信息 (supervisor-info  [this supervisor-id]  (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false))) ;; 設置進程心跳信息 (worker-heartbeat!  [this storm-id node port info]  (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info))) ;; 刪除進程心跳信息 (remove-worker-heartbeat!  [this storm-id node port]  (delete-node cluster-state (workerbeat-path storm-id node port))) ;; 創建指定storm-id的topology的用于存放心跳信息的節點 (setup-heartbeats!  [this storm-id]  (mkdirs cluster-state (workerbeat-storm-root storm-id))) ;; 刪除指定storm-id的topology的心跳信息節點 (teardown-heartbeats!  [this storm-id]  (try-cause   (delete-node cluster-state (workerbeat-storm-root storm-id))   (catch KeeperException e    (log-warn-error e "Could not teardown heartbeats for " storm-id)))) ;; 刪除指定storm-id的topology的錯誤信息節點 (teardown-topology-errors!  [this storm-id]  (try-cause   (delete-node cluster-state (error-storm-root storm-id))   (catch KeeperException e    (log-warn-error e "Could not teardown errors for " storm-id)))) ;; 創建臨時節點存放supervisor的心跳信息 (supervisor-heartbeat!  [this supervisor-id info]  (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))) ;; 創建/storms/{storm-id}節點 (activate-storm!  [this storm-id storm-base]  (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base))) ;; 更新topology對應的StormBase對象,即更新/storm/{storm-id}節點 (update-storm!  [this storm-id new-elems]  ;; base綁定storm-id在zookeeper上的StormBase對象  (let [base (storm-base this storm-id nil)     ;; executors綁定component名稱->組件并行度的map     executors (:component->executors base)     ;; new-elems綁定合并后的組件并行度map,update函數將組件新并行度map合并到舊map中     new-elems (update new-elems :component->executors (partial merge executors))]   ;; 更新StormBase對象中的組件并行度map,并寫入zookeeper的/storms/{storm-id}節點   (set-data cluster-state (storm-path storm-id)        (-> base          (merge new-elems)          Utils/serialize)))) ;; 獲取storm-id的StormBase對象,即讀取/storms/{storm-id}節點數據,如果callback不為空,將其賦值給storm-base-callback,并為/storms/{storm-id}節點添加"數據觀察" (storm-base  [this storm-id callback]  (when callback   (swap! storm-base-callback assoc storm-id callback))  (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)))) ;; 刪除storm-id的StormBase對象,即刪除/storms/{storm-id}節點 (remove-storm-base!  [this storm-id]  (delete-node cluster-state (storm-path storm-id))) ;; 更新storm-id的分配信息,即更新/assignments/{storm-id}節點數據 (set-assignment!  [this storm-id info]  (set-data cluster-state (assignment-path storm-id) (Utils/serialize info))) ;; 刪除storm-id的分配信息,同時刪除其StormBase信息,即刪除/assignments/{storm-id}節點和/storms/{storm-id}節點 (remove-storm!  [this storm-id]  (delete-node cluster-state (assignment-path storm-id))  (remove-storm-base! this storm-id)) ;; 將組件異常信息寫入zookeeper (report-error  [this storm-id component-id node port error]  ;; path綁定"/errors/{storm-id}/{component-id}"  (let [path (error-path storm-id component-id)     ;; data綁定異常信息,包括異常時間、異常堆棧信息、主機和端口     data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}     ;; 創建/errors/{storm-id}/{component-id}節點     _ (mkdirs cluster-state path)     ;; 創建/errors/{storm-id}/{component-id}的子順序節點,并寫入異常信息     _ (create-sequential cluster-state (str path "/e") (Utils/serialize data))     ;; to-kill綁定除去順序節點編號最大的前10個節點的剩余節點的集合     to-kill (->> (get-children cluster-state path false)            (sort-by parse-error-path)            reverse            (drop 10))]   ;; 刪除to-kill中包含的節點   (doseq [k to-kill]    (delete-node cluster-state (str path "/" k))))) ;; 得到給定的storm-id component-id下的異常信息 (errors  [this storm-id component-id]  (let [path (error-path storm-id component-id)     _ (mkdirs cluster-state path)     children (get-children cluster-state path false)     errors (dofor [c children]            (let [data (-> (get-data cluster-state (str path "/" c) false)                    maybe-deserialize)]             (when data              (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))              )))     ]   (->> (filter not-nil? errors)      (sort-by (comp - :time-secs))))) ;; 關閉連接,在關閉連接前,將回調函數從cluster-state的callbacks中刪除 (disconnect  [this]  (unregister cluster-state state-id)  (when solo?   (close cluster-state))))))

zookeeper.clj中mk-client函數

mk-client函數創建一個CuratorFramework實例,為該實例注冊了CuratorListener,當一個后臺操作完成或者指定的watch被觸發時將會執行CuratorListener中的eventReceived()。eventReceived中調用的wacher函數就是mk-distributed-cluster-state中:watcher綁定的函數。

(defnk mk-client [conf servers port  :root ""  :watcher default-watcher  :auth-conf nil] (let [fk (Utils/newCurator conf servers port root (when auth-conf (ZookeeperAuthInfo. auth-conf)))]  (.. fk    (getCuratorListenable)    (addListener     (reify CuratorListener      (^void eventReceived [this ^CuratorFramework _fk ^CuratorEvent e]        (when (= (.getType e) CuratorEventType/WATCHED)         (let [^WatchedEvent event (.getWatchedEvent e)]          (watcher (zk-keeper-states (.getState event))              (zk-event-types (.getType event))              (.getPath event))))))))  (.start fk)  fk)) 

以上就是storm與zookeeper進行交互的源碼分析,我覺得最重要的部分就是如何給zk client添加"wacher",storm的很多功能都是通過zookeeper的wacher機制實現的,如"分配信息領取"。添加"wacher"大概分為以下幾個步驟:

mk-distributed-cluster-state函數創建了一個zk client,并通過:watcher給該zk client指定了"wacher"函數,這個"wacher"函數只是簡單調用ClusterState的callbacks集合中的函數,這樣這個"wacher"函數執行 哪些函數將由ClusterState實例決定
ClusterState實例提供register函數來更新callbacks集合,ClusterState實例被傳遞給了mk-storm-cluster-state函數,在mk-storm-cluster-state中調用register添加了一個函數(fn [type path] ... ),這個函數實現了"watcher"函數的全部邏輯
mk-storm-cluster-state中注冊的函數執行的具體內容由StormClusterState實例決定,對zookeeper節點添加"觀察"也是通過StormClusterState實例實現的,這樣我們就可以通過StormClusterState實例對我們感興趣的節點添加"觀察"和"回調函數",當節點或節點數據發生變化后,zk server就會給zk client發送"通知",zk client中的"wather"函數將被調用,進而我們注冊的"回到函數"將被執行。

總結

這部分源碼與zookeeper聯系十分緊密,涉及了很多zookeeper中的概念和特性,如"數據觀察"和"節點觀察"等.以上就是本文關于源碼閱讀之storm操作zookeeper-cluster.clj的全部內容了,希望對大家有所幫助。感謝各位的閱讀!


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
91免费看片在线| 91欧美精品午夜性色福利在线| 日韩欧美中文在线| 精品色蜜蜜精品视频在线观看| 琪琪亚洲精品午夜在线| 亚洲最大福利视频网站| 日韩美女在线播放| 91高清视频在线免费观看| 亚洲欧洲一区二区三区久久| 精品国产91久久久| 国产美女精品视频免费观看| 国产成人一区二区| 一区二区三区动漫| 亚洲精品永久免费精品| 久99九色视频在线观看| 亚洲精品中文字幕av| 日韩网站免费观看| 成人午夜一级二级三级| 国产91在线视频| 另类少妇人与禽zozz0性伦| 日韩欧美视频一区二区三区| 亚洲影院色无极综合| 久久夜精品va视频免费观看| 97视频在线观看网址| 国产精品日韩在线观看| 国产成人在线播放| 91精品国产乱码久久久久久久久| 久久精品国产一区二区电影| 97久久伊人激情网| 一本色道久久综合狠狠躁篇怎么玩| 色偷偷888欧美精品久久久| 久久不射电影网| 久久国产视频网站| 久久精品国产精品亚洲| 日韩成人高清在线| 亚洲色图国产精品| 亚洲精品短视频| 国产一区二区av| 国产精品v日韩精品| 亚洲欧美日韩视频一区| 国产精品视频区1| 久久久国产精彩视频美女艺术照福利| 日本国产高清不卡| 亚洲欧美www| 欧美在线亚洲在线| 亚洲欧美www| 久久精品国产成人精品| 久久成人人人人精品欧| 国产综合在线看| 成人妇女免费播放久久久| 精品国产999| 亚洲精品v欧美精品v日韩精品| yellow中文字幕久久| 欧美激情一区二区三区高清视频| 日韩激情在线视频| 久久久久久久久91| 欧美丰满片xxx777| 国产精品第2页| 91久久精品视频| 精品美女永久免费视频| 亚洲欧洲高清在线| 日韩精品有码在线观看| 羞羞色国产精品| 一本大道香蕉久在线播放29| 69国产精品成人在线播放| 国产精品jvid在线观看蜜臀| 国产精品久久久久久搜索| 992tv在线成人免费观看| 欧美激情按摩在线| 久久噜噜噜精品国产亚洲综合| 久久精品中文字幕免费mv| 国产主播喷水一区二区| 久久久91精品国产| 欧美精品情趣视频| **欧美日韩vr在线| 播播国产欧美激情| 伊人久久精品视频| 亚洲人在线观看| 国产精品啪视频| 91福利视频网| 91高清在线免费观看| 国产亚洲精品一区二555| 久久久久这里只有精品| 欧美亚洲日本网站| 亚洲第一黄色网| 久久91亚洲人成电影网站| 精品久久久久久久久久久久久久| 亚洲精品aⅴ中文字幕乱码| 国产999精品久久久影片官网| 成人福利视频网| 亚洲精品日韩av| 亚洲成人黄色在线| 91po在线观看91精品国产性色| 欧美性视频网站| 亚洲社区在线观看| 日韩av电影在线播放| 国产欧美一区二区| 国产精品美女免费| 亚洲乱码av中文一区二区| 91免费国产网站| 欧美日韩国产精品一区二区三区四区| 日韩中文字幕在线看| 久久成人18免费网站| 91国产精品91| 久久精品视频网站| 美女少妇精品视频| 亚洲精品福利在线| 91夜夜未满十八勿入爽爽影院| 欧美日韩国产一区二区三区| www.日韩欧美| 久久久免费高清电视剧观看| 日韩在线免费视频| 亚洲国产成人精品久久久国产成人一区| 国产成人综合精品在线| 亚洲一区二区国产| 91sao在线观看国产| 欧美成人黑人xx视频免费观看| 欧美精品videossex性护士| 中文字幕免费国产精品| 欧美成人中文字幕| 中文字幕视频在线免费欧美日韩综合在线看| 久久久欧美一区二区| 九九久久国产精品| 欧美视频在线观看免费| 亚洲黄页网在线观看| www.欧美免费| xvideos成人免费中文版| 69影院欧美专区视频| 欧美性猛交xxxx富婆弯腰| 久久国产天堂福利天堂| 日韩国产在线播放| 久久久久久国产精品久久| 国产精品久久久久久亚洲影视| 亚洲性生活视频在线观看| 国外成人在线直播| 黑人精品xxx一区一二区| 国产一区二区三区在线观看网站| 国产精品直播网红| 国产综合福利在线| 中文字幕日韩欧美在线视频| 国内精品视频久久| 欧美成人黄色小视频| 91夜夜未满十八勿入爽爽影院| 欧美日韩精品在线| 日韩极品精品视频免费观看| 国产精品日韩专区| 欧美性xxxx在线播放| 欧美日韩国产中文精品字幕自在自线| 亚洲韩国日本中文字幕| 米奇精品一区二区三区在线观看| 成人亲热视频网站| 欧美成人免费网| 亚洲欧美国产精品久久久久久久| 亚洲国产天堂久久综合| 国产精品香蕉国产| 亚洲最新在线视频| 国产成+人+综合+亚洲欧美丁香花| 国产精品美女www爽爽爽视频| 欧美激情中文字幕乱码免费| 亚洲第一级黄色片| 亚洲精品欧美一区二区三区| 国产97在线亚洲| 成人情趣片在线观看免费| 成人黄色午夜影院|